/src/rocksdb/utilities/transactions/write_prepared_txn.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "utilities/transactions/write_prepared_txn.h" |
7 | | |
8 | | #include <cinttypes> |
9 | | #include <map> |
10 | | #include <set> |
11 | | |
12 | | #include "db/attribute_group_iterator_impl.h" |
13 | | #include "db/column_family.h" |
14 | | #include "db/db_impl/db_impl.h" |
15 | | #include "rocksdb/db.h" |
16 | | #include "rocksdb/status.h" |
17 | | #include "rocksdb/utilities/transaction_db.h" |
18 | | #include "util/cast_util.h" |
19 | | #include "utilities/transactions/pessimistic_transaction.h" |
20 | | #include "utilities/transactions/write_prepared_txn_db.h" |
21 | | |
22 | | namespace ROCKSDB_NAMESPACE { |
23 | | |
24 | | struct WriteOptions; |
25 | | |
26 | | WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, |
27 | | const WriteOptions& write_options, |
28 | | const TransactionOptions& txn_options) |
29 | 0 | : PessimisticTransaction(txn_db, write_options, txn_options, false), |
30 | 0 | wpt_db_(txn_db) { |
31 | | // Call Initialize outside PessimisticTransaction constructor otherwise it |
32 | | // would skip overridden functions in WritePreparedTxn since they are not |
33 | | // defined yet in the constructor of PessimisticTransaction |
34 | 0 | Initialize(txn_options); |
35 | 0 | } |
36 | | |
37 | 0 | void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { |
38 | 0 | PessimisticTransaction::Initialize(txn_options); |
39 | 0 | prepare_batch_cnt_ = 0; |
40 | 0 | } |
41 | | |
42 | | void WritePreparedTxn::MultiGet(const ReadOptions& _read_options, |
43 | | ColumnFamilyHandle* column_family, |
44 | | const size_t num_keys, const Slice* keys, |
45 | | PinnableSlice* values, Status* statuses, |
46 | 0 | const bool sorted_input) { |
47 | 0 | if (_read_options.io_activity != Env::IOActivity::kUnknown && |
48 | 0 | _read_options.io_activity != Env::IOActivity::kMultiGet) { |
49 | 0 | Status s = Status::InvalidArgument( |
50 | 0 | "Can only call MultiGet with `ReadOptions::io_activity` is " |
51 | 0 | "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`"); |
52 | |
|
53 | 0 | for (size_t i = 0; i < num_keys; ++i) { |
54 | 0 | if (statuses[i].ok()) { |
55 | 0 | statuses[i] = s; |
56 | 0 | } |
57 | 0 | } |
58 | 0 | return; |
59 | 0 | } |
60 | 0 | ReadOptions read_options(_read_options); |
61 | 0 | if (read_options.io_activity == Env::IOActivity::kUnknown) { |
62 | 0 | read_options.io_activity = Env::IOActivity::kMultiGet; |
63 | 0 | } |
64 | |
|
65 | 0 | SequenceNumber min_uncommitted, snap_seq; |
66 | 0 | const SnapshotBackup backed_by_snapshot = wpt_db_->AssignMinMaxSeqs( |
67 | 0 | read_options.snapshot, &min_uncommitted, &snap_seq); |
68 | 0 | WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted, |
69 | 0 | backed_by_snapshot); |
70 | 0 | write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family, |
71 | 0 | num_keys, keys, values, statuses, |
72 | 0 | sorted_input, &callback); |
73 | 0 | if (UNLIKELY(!callback.valid() || |
74 | 0 | !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { |
75 | 0 | wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); |
76 | 0 | for (size_t i = 0; i < num_keys; i++) { |
77 | 0 | statuses[i] = Status::TryAgain(); |
78 | 0 | } |
79 | 0 | } |
80 | 0 | } |
81 | | |
82 | | Status WritePreparedTxn::Get(const ReadOptions& _read_options, |
83 | | ColumnFamilyHandle* column_family, |
84 | 0 | const Slice& key, PinnableSlice* pinnable_val) { |
85 | 0 | if (_read_options.io_activity != Env::IOActivity::kUnknown && |
86 | 0 | _read_options.io_activity != Env::IOActivity::kGet) { |
87 | 0 | return Status::InvalidArgument( |
88 | 0 | "Can only call Get with `ReadOptions::io_activity` is " |
89 | 0 | "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`"); |
90 | 0 | } |
91 | 0 | ReadOptions read_options(_read_options); |
92 | 0 | if (read_options.io_activity == Env::IOActivity::kUnknown) { |
93 | 0 | read_options.io_activity = Env::IOActivity::kGet; |
94 | 0 | } |
95 | |
|
96 | 0 | return GetImpl(read_options, column_family, key, pinnable_val); |
97 | 0 | } |
98 | | |
99 | | Status WritePreparedTxn::GetImpl(const ReadOptions& options, |
100 | | ColumnFamilyHandle* column_family, |
101 | | const Slice& key, |
102 | 0 | PinnableSlice* pinnable_val) { |
103 | 0 | SequenceNumber min_uncommitted, snap_seq; |
104 | 0 | const SnapshotBackup backed_by_snapshot = |
105 | 0 | wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); |
106 | 0 | WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted, |
107 | 0 | backed_by_snapshot); |
108 | 0 | Status res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, |
109 | 0 | pinnable_val, &callback); |
110 | 0 | const bool callback_valid = |
111 | 0 | callback.valid(); // NOTE: validity of callback must always be checked |
112 | | // before it is destructed |
113 | 0 | if (res.ok()) { |
114 | 0 | if (!LIKELY(callback_valid && |
115 | 0 | wpt_db_->ValidateSnapshot(callback.max_visible_seq(), |
116 | 0 | backed_by_snapshot))) { |
117 | 0 | wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN); |
118 | 0 | res = Status::TryAgain(); |
119 | 0 | } |
120 | 0 | } |
121 | |
|
122 | 0 | return res; |
123 | 0 | } |
124 | | |
125 | 0 | Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) { |
126 | 0 | return GetIterator(options, wpt_db_->DefaultColumnFamily()); |
127 | 0 | } |
128 | | |
129 | | Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, |
130 | 0 | ColumnFamilyHandle* column_family) { |
131 | | // Make sure to get iterator from WritePrepareTxnDB, not the root db. |
132 | 0 | Iterator* db_iter = wpt_db_->NewIterator(options, column_family); |
133 | 0 | assert(db_iter); |
134 | |
|
135 | 0 | return write_batch_.NewIteratorWithBase(column_family, db_iter, &options); |
136 | 0 | } |
137 | | |
138 | | std::unique_ptr<Iterator> WritePreparedTxn::GetCoalescingIterator( |
139 | | const ReadOptions& /* read_options */, |
140 | 0 | const std::vector<ColumnFamilyHandle*>& /* column_families */) { |
141 | 0 | return std::unique_ptr<Iterator>(NewErrorIterator( |
142 | 0 | Status::NotSupported("GetCoalescingIterator not supported for " |
143 | 0 | "write-prepared/write-unprepared transactions"))); |
144 | 0 | } |
145 | | |
146 | | std::unique_ptr<AttributeGroupIterator> |
147 | | WritePreparedTxn::GetAttributeGroupIterator( |
148 | | const ReadOptions& /* read_options */, |
149 | 0 | const std::vector<ColumnFamilyHandle*>& /* column_families */) { |
150 | 0 | return NewAttributeGroupErrorIterator( |
151 | 0 | Status::NotSupported("GetAttributeGroupIterator not supported for " |
152 | 0 | "write-prepared/write-unprepared transactions")); |
153 | 0 | } |
154 | | |
155 | 0 | Status WritePreparedTxn::PrepareInternal() { |
156 | 0 | WriteOptions write_options = write_options_; |
157 | 0 | write_options.disableWAL = false; |
158 | 0 | const bool WRITE_AFTER_COMMIT = true; |
159 | 0 | const bool kFirstPrepareBatch = true; |
160 | 0 | auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), |
161 | 0 | name_, !WRITE_AFTER_COMMIT); |
162 | 0 | assert(s.ok()); |
163 | | // For each duplicate key we account for a new sub-batch |
164 | 0 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); |
165 | | // Having AddPrepared in the PreReleaseCallback allows in-order addition of |
166 | | // prepared entries to PreparedHeap and hence enables an optimization. Refer |
167 | | // to SmallestUnCommittedSeq for more details. |
168 | 0 | AddPreparedCallback add_prepared_callback( |
169 | 0 | wpt_db_, db_impl_, prepare_batch_cnt_, |
170 | 0 | db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch); |
171 | 0 | const bool DISABLE_MEMTABLE = true; |
172 | 0 | uint64_t seq_used = kMaxSequenceNumber; |
173 | 0 | s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), |
174 | 0 | /*callback*/ nullptr, /*user_write_cb=*/nullptr, |
175 | 0 | &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE, |
176 | 0 | &seq_used, prepare_batch_cnt_, |
177 | 0 | &add_prepared_callback); |
178 | 0 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
179 | 0 | auto prepare_seq = seq_used; |
180 | 0 | SetId(prepare_seq); |
181 | 0 | return s; |
182 | 0 | } |
183 | | |
184 | 0 | Status WritePreparedTxn::CommitWithoutPrepareInternal() { |
185 | | // For each duplicate key we account for a new sub-batch |
186 | 0 | const size_t batch_cnt = GetWriteBatch()->SubBatchCnt(); |
187 | 0 | return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt); |
188 | 0 | } |
189 | | |
190 | | Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch, |
191 | 0 | size_t batch_cnt) { |
192 | 0 | return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this); |
193 | 0 | } |
194 | | |
195 | 0 | Status WritePreparedTxn::CommitInternal() { |
196 | 0 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
197 | 0 | "CommitInternal prepare_seq: %" PRIu64, GetID()); |
198 | | // We take the commit-time batch and append the Commit marker. |
199 | | // The Memtable will ignore the Commit marker in non-recovery mode |
200 | 0 | WriteBatch* working_batch = GetCommitTimeWriteBatch(); |
201 | 0 | const bool empty = working_batch->Count() == 0; |
202 | 0 | auto s = WriteBatchInternal::MarkCommit(working_batch, name_); |
203 | 0 | assert(s.ok()); |
204 | |
|
205 | 0 | const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; |
206 | 0 | if (!empty) { |
207 | | // When not writing to memtable, we can still cache the latest write batch. |
208 | | // The cached batch will be written to memtable in WriteRecoverableState |
209 | | // during FlushMemTable |
210 | 0 | if (for_recovery) { |
211 | 0 | WriteBatchInternal::SetAsLatestPersistentState(working_batch); |
212 | 0 | } else { |
213 | 0 | return Status::InvalidArgument( |
214 | 0 | "Commit-time-batch can only be used if " |
215 | 0 | "use_only_the_last_commit_time_batch_for_recovery is true"); |
216 | 0 | } |
217 | 0 | } |
218 | | |
219 | 0 | auto prepare_seq = GetId(); |
220 | 0 | const bool includes_data = !empty && !for_recovery; |
221 | 0 | assert(prepare_batch_cnt_); |
222 | 0 | size_t commit_batch_cnt = 0; |
223 | 0 | if (UNLIKELY(includes_data)) { |
224 | 0 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, |
225 | 0 | "Duplicate key overhead"); |
226 | 0 | SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); |
227 | 0 | s = working_batch->Iterate(&counter); |
228 | 0 | assert(s.ok()); |
229 | 0 | commit_batch_cnt = counter.BatchCount(); |
230 | 0 | } |
231 | 0 | const bool disable_memtable = !includes_data; |
232 | 0 | const bool do_one_write = |
233 | 0 | !db_impl_->immutable_db_options().two_write_queues || disable_memtable; |
234 | 0 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( |
235 | 0 | wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt); |
236 | | // This is to call AddPrepared on CommitTimeWriteBatch |
237 | 0 | const bool kFirstPrepareBatch = true; |
238 | 0 | AddPreparedCallback add_prepared_callback( |
239 | 0 | wpt_db_, db_impl_, commit_batch_cnt, |
240 | 0 | db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); |
241 | 0 | PreReleaseCallback* pre_release_callback; |
242 | 0 | if (do_one_write) { |
243 | 0 | pre_release_callback = &update_commit_map; |
244 | 0 | } else { |
245 | 0 | pre_release_callback = &add_prepared_callback; |
246 | 0 | } |
247 | 0 | uint64_t seq_used = kMaxSequenceNumber; |
248 | | // Since the prepared batch is directly written to memtable, there is already |
249 | | // a connection between the memtable and its WAL, so there is no need to |
250 | | // redundantly reference the log that contains the prepared data. |
251 | 0 | const uint64_t zero_log_number = 0ull; |
252 | 0 | size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; |
253 | | // If `two_write_queues && includes_data`, then `do_one_write` is false. The |
254 | | // following `WriteImpl` will insert the data of the commit-time-batch into |
255 | | // the database before updating the commit cache. Therefore, the data of the |
256 | | // commmit-time-batch is considered uncommitted. Furthermore, since data of |
257 | | // the commit-time-batch are not locked, it is possible for two uncommitted |
258 | | // versions of the same key to co-exist for a (short) period of time until |
259 | | // the commit cache is updated by the second write. If the two uncommitted |
260 | | // keys are compacted to the bottommost level in the meantime, it is possible |
261 | | // that compaction iterator will zero out the sequence numbers of both, thus |
262 | | // violating the invariant that an SST does not have two identical internal |
263 | | // keys. To prevent this situation, we should allow the usage of |
264 | | // commit-time-batch only if the user sets |
265 | | // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery to |
266 | | // true. See the comments about GetCommitTimeWriteBatch() in |
267 | | // include/rocksdb/utilities/transaction.h. |
268 | 0 | s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, |
269 | 0 | /*user_write_cb=*/nullptr, nullptr, zero_log_number, |
270 | 0 | disable_memtable, &seq_used, batch_cnt, |
271 | 0 | pre_release_callback); |
272 | 0 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
273 | 0 | const SequenceNumber commit_batch_seq = seq_used; |
274 | 0 | if (LIKELY(do_one_write || !s.ok())) { |
275 | 0 | if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues && |
276 | 0 | s.ok())) { |
277 | | // Note: RemovePrepared should be called after WriteImpl that publishsed |
278 | | // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. |
279 | 0 | wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); |
280 | 0 | } // else RemovePrepared is called from within PreReleaseCallback |
281 | 0 | if (UNLIKELY(!do_one_write)) { |
282 | 0 | assert(!s.ok()); |
283 | | // Cleanup the prepared entry we added with add_prepared_callback |
284 | 0 | wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); |
285 | 0 | } |
286 | 0 | return s; |
287 | 0 | } // else do the 2nd write to publish seq |
288 | | // Note: the 2nd write comes with a performance penality. So if we have too |
289 | | // many of commits accompanied with ComitTimeWriteBatch and yet we cannot |
290 | | // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, |
291 | | // two_write_queues should be disabled to avoid many additional writes here. |
292 | 0 | const size_t kZeroData = 0; |
293 | | // Update commit map only from the 2nd queue |
294 | 0 | WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch( |
295 | 0 | wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData, |
296 | 0 | commit_batch_seq, commit_batch_cnt); |
297 | 0 | WriteBatch empty_batch; |
298 | 0 | s = empty_batch.PutLogData(Slice()); |
299 | 0 | assert(s.ok()); |
300 | | // In the absence of Prepare markers, use Noop as a batch separator |
301 | 0 | s = WriteBatchInternal::InsertNoop(&empty_batch); |
302 | 0 | assert(s.ok()); |
303 | 0 | const bool DISABLE_MEMTABLE = true; |
304 | 0 | const size_t ONE_BATCH = 1; |
305 | 0 | const uint64_t NO_REF_LOG = 0; |
306 | 0 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, |
307 | 0 | /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG, |
308 | 0 | DISABLE_MEMTABLE, &seq_used, ONE_BATCH, |
309 | 0 | &update_commit_map_with_aux_batch); |
310 | 0 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
311 | 0 | return s; |
312 | 0 | } |
313 | | |
314 | 0 | Status WritePreparedTxn::RollbackInternal() { |
315 | 0 | ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, |
316 | 0 | "RollbackInternal prepare_seq: %" PRIu64, GetId()); |
317 | |
|
318 | 0 | assert(db_impl_); |
319 | 0 | assert(wpt_db_); |
320 | |
|
321 | 0 | WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */, |
322 | 0 | write_options_.protection_bytes_per_key, |
323 | 0 | 0 /* default_cf_ts_sz */); |
324 | 0 | assert(GetId() != kMaxSequenceNumber); |
325 | 0 | assert(GetId() > 0); |
326 | 0 | auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap(); |
327 | 0 | auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap(); |
328 | 0 | auto read_at_seq = kMaxSequenceNumber; |
329 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
330 | 0 | ReadOptions roptions; |
331 | | // to prevent callback's seq to be overrriden inside DBImpk::Get |
332 | 0 | roptions.snapshot = wpt_db_->GetMaxSnapshot(); |
333 | 0 | struct RollbackWriteBatchBuilder : public WriteBatch::Handler { |
334 | 0 | DBImpl* const db_; |
335 | 0 | WritePreparedTxnDB* const wpt_db_; |
336 | 0 | WritePreparedTxnReadCallback callback_; |
337 | 0 | WriteBatch* rollback_batch_; |
338 | 0 | std::map<uint32_t, const Comparator*>& comparators_; |
339 | 0 | std::map<uint32_t, ColumnFamilyHandle*>& handles_; |
340 | 0 | using CFKeys = std::set<Slice, SetComparator>; |
341 | 0 | std::map<uint32_t, CFKeys> keys_; |
342 | 0 | bool rollback_merge_operands_; |
343 | 0 | ReadOptions roptions_; |
344 | |
|
345 | 0 | RollbackWriteBatchBuilder( |
346 | 0 | DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, |
347 | 0 | WriteBatch* dst_batch, |
348 | 0 | std::map<uint32_t, const Comparator*>& comparators, |
349 | 0 | std::map<uint32_t, ColumnFamilyHandle*>& handles, |
350 | 0 | bool rollback_merge_operands, const ReadOptions& _roptions) |
351 | 0 | : db_(db), |
352 | 0 | wpt_db_(wpt_db), |
353 | 0 | callback_(wpt_db, snap_seq), // disable min_uncommitted optimization |
354 | 0 | rollback_batch_(dst_batch), |
355 | 0 | comparators_(comparators), |
356 | 0 | handles_(handles), |
357 | 0 | rollback_merge_operands_(rollback_merge_operands), |
358 | 0 | roptions_(_roptions) {} |
359 | |
|
360 | 0 | Status Rollback(uint32_t cf, const Slice& key) { |
361 | 0 | Status s; |
362 | 0 | CFKeys& cf_keys = keys_[cf]; |
363 | 0 | if (cf_keys.size() == 0) { // just inserted |
364 | 0 | auto cmp = comparators_[cf]; |
365 | 0 | keys_[cf] = CFKeys(SetComparator(cmp)); |
366 | 0 | } |
367 | 0 | auto it = cf_keys.insert(key); |
368 | | // second is false if a element already existed. |
369 | 0 | if (it.second == false) { |
370 | 0 | return s; |
371 | 0 | } |
372 | | |
373 | 0 | PinnableSlice pinnable_val; |
374 | 0 | bool not_used; |
375 | 0 | auto cf_handle = handles_[cf]; |
376 | 0 | DBImpl::GetImplOptions get_impl_options; |
377 | 0 | get_impl_options.column_family = cf_handle; |
378 | 0 | get_impl_options.value = &pinnable_val; |
379 | 0 | get_impl_options.value_found = ¬_used; |
380 | 0 | get_impl_options.callback = &callback_; |
381 | 0 | s = db_->GetImpl(roptions_, key, get_impl_options); |
382 | 0 | assert(s.ok() || s.IsNotFound()); |
383 | 0 | if (s.ok()) { |
384 | 0 | s = rollback_batch_->Put(cf_handle, key, pinnable_val); |
385 | 0 | assert(s.ok()); |
386 | 0 | } else if (s.IsNotFound()) { |
387 | | // There has been no readable value before txn. By adding a delete we |
388 | | // make sure that there will be none afterwards either. |
389 | 0 | if (wpt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) { |
390 | 0 | s = rollback_batch_->SingleDelete(cf_handle, key); |
391 | 0 | } else { |
392 | 0 | s = rollback_batch_->Delete(cf_handle, key); |
393 | 0 | } |
394 | 0 | assert(s.ok()); |
395 | 0 | } else { |
396 | | // Unexpected status. Return it to the user. |
397 | 0 | } |
398 | 0 | return s; |
399 | 0 | } |
400 | |
|
401 | 0 | Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override { |
402 | 0 | return Rollback(cf, key); |
403 | 0 | } |
404 | |
|
405 | 0 | Status DeleteCF(uint32_t cf, const Slice& key) override { |
406 | 0 | return Rollback(cf, key); |
407 | 0 | } |
408 | |
|
409 | 0 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { |
410 | 0 | return Rollback(cf, key); |
411 | 0 | } |
412 | |
|
413 | 0 | Status MergeCF(uint32_t cf, const Slice& key, |
414 | 0 | const Slice& /*val*/) override { |
415 | 0 | if (rollback_merge_operands_) { |
416 | 0 | return Rollback(cf, key); |
417 | 0 | } else { |
418 | 0 | return Status::OK(); |
419 | 0 | } |
420 | 0 | } |
421 | |
|
422 | 0 | Status MarkNoop(bool) override { return Status::OK(); } |
423 | 0 | Status MarkBeginPrepare(bool) override { return Status::OK(); } |
424 | 0 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } |
425 | 0 | Status MarkCommit(const Slice&) override { return Status::OK(); } |
426 | 0 | Status MarkRollback(const Slice&) override { |
427 | 0 | return Status::InvalidArgument(); |
428 | 0 | } |
429 | |
|
430 | 0 | protected: |
431 | 0 | Handler::OptionState WriteAfterCommit() const override { |
432 | 0 | return Handler::OptionState::kDisabled; |
433 | 0 | } |
434 | 0 | } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch, |
435 | 0 | *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), |
436 | 0 | wpt_db_->txn_db_options_.rollback_merge_operands, |
437 | 0 | roptions); |
438 | 0 | auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); |
439 | 0 | if (!s.ok()) { |
440 | 0 | return s; |
441 | 0 | } |
442 | | // The Rollback marker will be used as a batch separator |
443 | 0 | s = WriteBatchInternal::MarkRollback(&rollback_batch, name_); |
444 | 0 | assert(s.ok()); |
445 | 0 | bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; |
446 | 0 | const bool DISABLE_MEMTABLE = true; |
447 | 0 | const uint64_t NO_REF_LOG = 0; |
448 | 0 | uint64_t seq_used = kMaxSequenceNumber; |
449 | 0 | const size_t ONE_BATCH = 1; |
450 | 0 | const bool kFirstPrepareBatch = true; |
451 | | // We commit the rolled back prepared batches. Although this is |
452 | | // counter-intuitive, i) it is safe to do so, since the prepared batches are |
453 | | // already canceled out by the rollback batch, ii) adding the commit entry to |
454 | | // CommitCache will allow us to benefit from the existing mechanism in |
455 | | // CommitCache that keeps an entry evicted due to max advance and yet overlaps |
456 | | // with a live snapshot around so that the live snapshot properly skips the |
457 | | // entry even if its prepare seq is lower than max_evicted_seq_. |
458 | 0 | AddPreparedCallback add_prepared_callback( |
459 | 0 | wpt_db_, db_impl_, ONE_BATCH, |
460 | 0 | db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); |
461 | 0 | WritePreparedCommitEntryPreReleaseCallback update_commit_map( |
462 | 0 | wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH); |
463 | 0 | PreReleaseCallback* pre_release_callback; |
464 | 0 | if (do_one_write) { |
465 | 0 | pre_release_callback = &update_commit_map; |
466 | 0 | } else { |
467 | 0 | pre_release_callback = &add_prepared_callback; |
468 | 0 | } |
469 | | // Note: the rollback batch does not need AddPrepared since it is written to |
470 | | // DB in one shot. min_uncommitted still works since it requires capturing |
471 | | // data that is written to DB but not yet committed, while |
472 | | // the rollback batch commits with PreReleaseCallback. |
473 | 0 | s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, |
474 | 0 | /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG, |
475 | 0 | !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, |
476 | 0 | pre_release_callback); |
477 | 0 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
478 | 0 | if (!s.ok()) { |
479 | 0 | return s; |
480 | 0 | } |
481 | 0 | if (do_one_write) { |
482 | 0 | assert(!db_impl_->immutable_db_options().two_write_queues); |
483 | 0 | wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); |
484 | 0 | return s; |
485 | 0 | } // else do the 2nd write for commit |
486 | 0 | uint64_t rollback_seq = seq_used; |
487 | 0 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
488 | 0 | "RollbackInternal 2nd write rollback_seq: %" PRIu64, |
489 | 0 | rollback_seq); |
490 | | // Commit the batch by writing an empty batch to the queue that will release |
491 | | // the commit sequence number to readers. |
492 | 0 | WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare( |
493 | 0 | wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_); |
494 | 0 | WriteBatch empty_batch; |
495 | 0 | s = empty_batch.PutLogData(Slice()); |
496 | 0 | assert(s.ok()); |
497 | | // In the absence of Prepare markers, use Noop as a batch separator |
498 | 0 | s = WriteBatchInternal::InsertNoop(&empty_batch); |
499 | 0 | assert(s.ok()); |
500 | 0 | s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, |
501 | 0 | /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG, |
502 | 0 | DISABLE_MEMTABLE, &seq_used, ONE_BATCH, |
503 | 0 | &update_commit_map_with_prepare); |
504 | 0 | assert(!s.ok() || seq_used != kMaxSequenceNumber); |
505 | 0 | ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, |
506 | 0 | "RollbackInternal (status=%s) commit: %" PRIu64, |
507 | 0 | s.ToString().c_str(), GetId()); |
508 | | // TODO(lth): For WriteUnPrepared that rollback is called frequently, |
509 | | // RemovePrepared could be moved to the callback to reduce lock contention. |
510 | 0 | if (s.ok()) { |
511 | 0 | wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); |
512 | 0 | } |
513 | | // Note: RemovePrepared for prepared batch is called from within |
514 | | // PreReleaseCallback |
515 | 0 | wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH); |
516 | |
|
517 | 0 | return s; |
518 | 0 | } |
519 | | |
520 | | Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, |
521 | | const Slice& key, |
522 | 0 | SequenceNumber* tracked_at_seq) { |
523 | 0 | assert(snapshot_); |
524 | |
|
525 | 0 | SequenceNumber min_uncommitted = |
526 | 0 | static_cast_with_check<const SnapshotImpl>(snapshot_.get()) |
527 | 0 | ->min_uncommitted_; |
528 | 0 | SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); |
529 | | // tracked_at_seq is either max or the last snapshot with which this key was |
530 | | // trackeed so there is no need to apply the IsInSnapshot to this comparison |
531 | | // here as tracked_at_seq is not a prepare seq. |
532 | 0 | if (*tracked_at_seq <= snap_seq) { |
533 | | // If the key has been previous validated at a sequence number earlier |
534 | | // than the curent snapshot's sequence number, we already know it has not |
535 | | // been modified. |
536 | 0 | return Status::OK(); |
537 | 0 | } |
538 | | |
539 | 0 | *tracked_at_seq = snap_seq; |
540 | |
|
541 | 0 | ColumnFamilyHandle* cfh = |
542 | 0 | column_family ? column_family : db_impl_->DefaultColumnFamily(); |
543 | |
|
544 | 0 | WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted, |
545 | 0 | kBackedByDBSnapshot); |
546 | | // TODO(yanqin): support user-defined timestamp |
547 | 0 | return TransactionUtil::CheckKeyForConflicts( |
548 | 0 | db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr, |
549 | 0 | false /* cache_only */, &snap_checker, min_uncommitted, |
550 | 0 | txn_db_impl_->GetTxnDBOptions().enable_udt_validation); |
551 | 0 | } |
552 | | |
553 | 0 | void WritePreparedTxn::SetSnapshot() { |
554 | 0 | const bool kForWWConflictCheck = true; |
555 | 0 | SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck); |
556 | 0 | SetSnapshotInternal(snapshot); |
557 | 0 | } |
558 | | |
559 | 0 | Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { |
560 | 0 | auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); |
561 | 0 | prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); |
562 | 0 | return ret; |
563 | 0 | } |
564 | | |
565 | | } // namespace ROCKSDB_NAMESPACE |