/src/rocksdb/utilities/transactions/write_prepared_txn_db.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #pragma once |
7 | | |
8 | | #include <cinttypes> |
9 | | #include <mutex> |
10 | | #include <queue> |
11 | | #include <set> |
12 | | #include <string> |
13 | | #include <unordered_map> |
14 | | #include <vector> |
15 | | |
16 | | #include "db/attribute_group_iterator_impl.h" |
17 | | #include "db/db_iter.h" |
18 | | #include "db/pre_release_callback.h" |
19 | | #include "db/read_callback.h" |
20 | | #include "db/snapshot_checker.h" |
21 | | #include "logging/logging.h" |
22 | | #include "rocksdb/db.h" |
23 | | #include "rocksdb/options.h" |
24 | | #include "rocksdb/utilities/transaction_db.h" |
25 | | #include "util/cast_util.h" |
26 | | #include "util/set_comparator.h" |
27 | | #include "util/string_util.h" |
28 | | #include "utilities/transactions/pessimistic_transaction.h" |
29 | | #include "utilities/transactions/pessimistic_transaction_db.h" |
30 | | #include "utilities/transactions/write_prepared_txn.h" |
31 | | |
32 | | namespace ROCKSDB_NAMESPACE { |
33 | | enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot }; |
34 | | |
35 | | // A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC. |
36 | | // In this way some data in the DB might not be committed. The DB provides |
37 | | // mechanisms to tell such data apart from committed data. |
38 | | class WritePreparedTxnDB : public PessimisticTransactionDB { |
39 | | public: |
40 | | explicit WritePreparedTxnDB(DB* db, |
41 | | const TransactionDBOptions& txn_db_options) |
42 | 0 | : PessimisticTransactionDB(db, txn_db_options), |
43 | 0 | SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), |
44 | 0 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), |
45 | 0 | COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), |
46 | 0 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), |
47 | 0 | FORMAT(COMMIT_CACHE_BITS) { |
48 | 0 | Init(txn_db_options); |
49 | 0 | } |
50 | | |
51 | | explicit WritePreparedTxnDB(StackableDB* db, |
52 | | const TransactionDBOptions& txn_db_options) |
53 | 0 | : PessimisticTransactionDB(db, txn_db_options), |
54 | 0 | SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits), |
55 | 0 | SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)), |
56 | 0 | COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits), |
57 | 0 | COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)), |
58 | 0 | FORMAT(COMMIT_CACHE_BITS) { |
59 | 0 | Init(txn_db_options); |
60 | 0 | } |
61 | | |
62 | | virtual ~WritePreparedTxnDB(); |
63 | | |
64 | | Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices, |
65 | | const std::vector<ColumnFamilyHandle*>& handles) override; |
66 | | |
67 | | Transaction* BeginTransaction(const WriteOptions& write_options, |
68 | | const TransactionOptions& txn_options, |
69 | | Transaction* old_txn) override; |
70 | | |
71 | | using TransactionDB::Write; |
72 | | Status Write(const WriteOptions& opts, WriteBatch* updates) override; |
73 | | |
74 | | // Optimized version of ::Write that receives more optimization request such |
75 | | // as skip_concurrency_control. |
76 | | using PessimisticTransactionDB::Write; |
77 | | Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&, |
78 | | WriteBatch* updates) override; |
79 | | |
80 | | // Write the batch to the underlying DB and mark it as committed. Could be |
81 | | // used by both directly from TxnDB or through a transaction. |
82 | | Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch, |
83 | | size_t batch_cnt, WritePreparedTxn* txn); |
84 | | |
85 | | using DB::Get; |
86 | | Status Get(const ReadOptions& _read_options, |
87 | | ColumnFamilyHandle* column_family, const Slice& key, |
88 | | PinnableSlice* value, std::string* timestamp) override; |
89 | | |
90 | | using DB::MultiGet; |
91 | | void MultiGet(const ReadOptions& _read_options, const size_t num_keys, |
92 | | ColumnFamilyHandle** column_families, const Slice* keys, |
93 | | PinnableSlice* values, std::string* timestamps, |
94 | | Status* statuses, const bool sorted_input) override; |
95 | | |
96 | | using DB::NewIterator; |
97 | | Iterator* NewIterator(const ReadOptions& _read_options, |
98 | | ColumnFamilyHandle* column_family) override; |
99 | | |
100 | | using DB::NewIterators; |
101 | | Status NewIterators(const ReadOptions& _read_options, |
102 | | const std::vector<ColumnFamilyHandle*>& column_families, |
103 | | std::vector<Iterator*>* iterators) override; |
104 | | |
105 | | using DB::NewCoalescingIterator; |
106 | | std::unique_ptr<Iterator> NewCoalescingIterator( |
107 | | const ReadOptions& /*options*/, |
108 | 0 | const std::vector<ColumnFamilyHandle*>& /*column_families*/) override { |
109 | 0 | return std::unique_ptr<Iterator>( |
110 | 0 | NewErrorIterator(Status::NotSupported("Not supported yet"))); |
111 | 0 | } |
112 | | |
113 | | using DB::NewAttributeGroupIterator; |
114 | | std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator( |
115 | | const ReadOptions& /*options*/, |
116 | 0 | const std::vector<ColumnFamilyHandle*>& /*column_families*/) override { |
117 | 0 | return NewAttributeGroupErrorIterator( |
118 | 0 | Status::NotSupported("Not supported yet")); |
119 | 0 | } |
120 | | |
121 | | // Check whether the transaction that wrote the value with sequence number seq |
122 | | // is visible to the snapshot with sequence number snapshot_seq. |
123 | | // Returns true if commit_seq <= snapshot_seq |
124 | | // If the snapshot_seq is already released and snapshot_seq <= max, sets |
125 | | // *snap_released to true and returns true as well. |
126 | | inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, |
127 | | uint64_t min_uncommitted = kMinUnCommittedSeq, |
128 | 0 | bool* snap_released = nullptr) const { |
129 | 0 | ROCKS_LOG_DETAILS(info_log_, |
130 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 |
131 | 0 | " min_uncommitted %" PRIu64, |
132 | 0 | prep_seq, snapshot_seq, min_uncommitted); |
133 | 0 | assert(min_uncommitted >= kMinUnCommittedSeq); |
134 | | // Caller is responsible to initialize snap_released. |
135 | 0 | assert(snap_released == nullptr || *snap_released == false); |
136 | | // Here we try to infer the return value without looking into prepare list. |
137 | | // This would help avoiding synchronization over a shared map. |
138 | | // TODO(myabandeh): optimize this. This sequence of checks must be correct |
139 | | // but not necessary efficient |
140 | 0 | if (prep_seq == 0) { |
141 | | // Compaction will output keys to bottom-level with sequence number 0 if |
142 | | // it is visible to the earliest snapshot. |
143 | 0 | ROCKS_LOG_DETAILS( |
144 | 0 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, |
145 | 0 | prep_seq, snapshot_seq, 1); |
146 | 0 | return true; |
147 | 0 | } |
148 | 0 | if (snapshot_seq < prep_seq) { |
149 | | // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq |
150 | 0 | ROCKS_LOG_DETAILS( |
151 | 0 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, |
152 | 0 | prep_seq, snapshot_seq, 0); |
153 | 0 | return false; |
154 | 0 | } |
155 | 0 | if (prep_seq < min_uncommitted) { |
156 | 0 | ROCKS_LOG_DETAILS(info_log_, |
157 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 |
158 | 0 | " returns %" PRId32 |
159 | 0 | " because of min_uncommitted %" PRIu64, |
160 | 0 | prep_seq, snapshot_seq, 1, min_uncommitted); |
161 | 0 | return true; |
162 | 0 | } |
163 | | // Commit of delayed prepared has two non-atomic steps: add to commit cache, |
164 | | // remove from delayed prepared. Our reads from these two is also |
165 | | // non-atomic. By looking into commit cache first thus we might not find the |
166 | | // prep_seq neither in commit cache not in delayed_prepared_. To fix that i) |
167 | | // we check if there was any delayed prepared BEFORE looking into commit |
168 | | // cache, ii) if there was, we complete the search steps to be these: i) |
169 | | // commit cache, ii) delayed prepared, commit cache again. In this way if |
170 | | // the first query to commit cache missed the commit, the 2nd will catch it. |
171 | 0 | bool was_empty; |
172 | 0 | SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub; |
173 | 0 | CommitEntry64b dont_care; |
174 | 0 | auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; |
175 | 0 | size_t repeats = 0; |
176 | 0 | do { |
177 | 0 | repeats++; |
178 | 0 | assert(repeats < 100); |
179 | 0 | if (UNLIKELY(repeats >= 100)) { |
180 | 0 | throw std::runtime_error( |
181 | 0 | "The read was intrupted 100 times by update to max_evicted_seq_. " |
182 | 0 | "This is unexpected in all setups"); |
183 | 0 | } |
184 | 0 | max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire); |
185 | 0 | TEST_SYNC_POINT( |
186 | 0 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause"); |
187 | 0 | TEST_SYNC_POINT( |
188 | 0 | "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"); |
189 | 0 | was_empty = delayed_prepared_empty_.load(std::memory_order_acquire); |
190 | 0 | TEST_SYNC_POINT( |
191 | 0 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause"); |
192 | 0 | TEST_SYNC_POINT( |
193 | 0 | "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"); |
194 | 0 | CommitEntry cached; |
195 | 0 | bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); |
196 | 0 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause"); |
197 | 0 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"); |
198 | 0 | if (exist && prep_seq == cached.prep_seq) { |
199 | | // It is committed and also not evicted from commit cache |
200 | 0 | ROCKS_LOG_DETAILS( |
201 | 0 | info_log_, |
202 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, |
203 | 0 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); |
204 | 0 | return cached.commit_seq <= snapshot_seq; |
205 | 0 | } |
206 | | // else it could be committed but not inserted in the map which could |
207 | | // happen after recovery, or it could be committed and evicted by another |
208 | | // commit, or never committed. |
209 | | |
210 | | // At this point we don't know if it was committed or it is still prepared |
211 | 0 | max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); |
212 | 0 | if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) { |
213 | 0 | continue; |
214 | 0 | } |
215 | | // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub |
216 | 0 | if (max_evicted_seq_ub < prep_seq) { |
217 | | // Not evicted from cache and also not present, so must be still |
218 | | // prepared |
219 | 0 | ROCKS_LOG_DETAILS(info_log_, |
220 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 |
221 | 0 | " returns %" PRId32, |
222 | 0 | prep_seq, snapshot_seq, 0); |
223 | 0 | return false; |
224 | 0 | } |
225 | 0 | TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause"); |
226 | 0 | TEST_SYNC_POINT( |
227 | 0 | "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"); |
228 | 0 | if (!was_empty) { |
229 | | // We should not normally reach here |
230 | 0 | WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); |
231 | 0 | ReadLock rl(&prepared_mutex_); |
232 | 0 | ROCKS_LOG_WARN( |
233 | 0 | info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64, |
234 | 0 | static_cast<uint64_t>(delayed_prepared_.size()), prep_seq); |
235 | 0 | if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { |
236 | | // This is the order: 1) delayed_prepared_commits_ update, 2) publish |
237 | | // 3) delayed_prepared_ clean up. So check if it is the case of a late |
238 | | // clenaup. |
239 | 0 | auto it = delayed_prepared_commits_.find(prep_seq); |
240 | 0 | if (it == delayed_prepared_commits_.end()) { |
241 | | // Then it is not committed yet |
242 | 0 | ROCKS_LOG_DETAILS(info_log_, |
243 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 |
244 | 0 | " returns %" PRId32, |
245 | 0 | prep_seq, snapshot_seq, 0); |
246 | 0 | return false; |
247 | 0 | } else { |
248 | 0 | ROCKS_LOG_DETAILS(info_log_, |
249 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 |
250 | 0 | " commit: %" PRIu64 " returns %" PRId32, |
251 | 0 | prep_seq, snapshot_seq, it->second, |
252 | 0 | snapshot_seq <= it->second); |
253 | 0 | return it->second <= snapshot_seq; |
254 | 0 | } |
255 | 0 | } else { |
256 | | // 2nd query to commit cache. Refer to was_empty comment above. |
257 | 0 | exist = GetCommitEntry(indexed_seq, &dont_care, &cached); |
258 | 0 | if (exist && prep_seq == cached.prep_seq) { |
259 | 0 | ROCKS_LOG_DETAILS( |
260 | 0 | info_log_, |
261 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, |
262 | 0 | prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); |
263 | 0 | return cached.commit_seq <= snapshot_seq; |
264 | 0 | } |
265 | 0 | max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire); |
266 | 0 | } |
267 | 0 | } |
268 | 0 | } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)); |
269 | | // When advancing max_evicted_seq_, we move older entires from prepared to |
270 | | // delayed_prepared_. Also we move evicted entries from commit cache to |
271 | | // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= |
272 | | // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in |
273 | | // old_commit_map_, iii) committed with no conflict with any snapshot. Case |
274 | | // (i) delayed_prepared_ is checked above |
275 | 0 | if (max_evicted_seq_ub < snapshot_seq) { // then (ii) cannot be the case |
276 | | // only (iii) is the case: committed |
277 | | // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < |
278 | | // snapshot_seq |
279 | 0 | ROCKS_LOG_DETAILS( |
280 | 0 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, |
281 | 0 | prep_seq, snapshot_seq, 1); |
282 | 0 | return true; |
283 | 0 | } |
284 | | // else (ii) might be the case: check the commit data saved for this |
285 | | // snapshot. If there was no overlapping commit entry, then it is committed |
286 | | // with a commit_seq lower than any live snapshot, including snapshot_seq. |
287 | 0 | if (old_commit_map_empty_.load(std::memory_order_acquire)) { |
288 | 0 | ROCKS_LOG_DETAILS(info_log_, |
289 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 |
290 | 0 | " returns %" PRId32 " released=1", |
291 | 0 | prep_seq, snapshot_seq, 0); |
292 | 0 | assert(snap_released); |
293 | | // This snapshot is not valid anymore. We cannot tell if prep_seq is |
294 | | // committed before or after the snapshot. Return true but also set |
295 | | // snap_released to true. |
296 | 0 | *snap_released = true; |
297 | 0 | return true; |
298 | 0 | } |
299 | 0 | { |
300 | | // We should not normally reach here unless sapshot_seq is old. This is a |
301 | | // rare case and it is ok to pay the cost of mutex ReadLock for such old, |
302 | | // reading transactions. |
303 | 0 | WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); |
304 | 0 | ReadLock rl(&old_commit_map_mutex_); |
305 | 0 | auto prep_set_entry = old_commit_map_.find(snapshot_seq); |
306 | 0 | bool found = prep_set_entry != old_commit_map_.end(); |
307 | 0 | if (found) { |
308 | 0 | auto& vec = prep_set_entry->second; |
309 | 0 | found = std::binary_search(vec.begin(), vec.end(), prep_seq); |
310 | 0 | } else { |
311 | | // coming from compaction |
312 | 0 | ROCKS_LOG_DETAILS(info_log_, |
313 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 |
314 | 0 | " returns %" PRId32 " released=1", |
315 | 0 | prep_seq, snapshot_seq, 0); |
316 | | // This snapshot is not valid anymore. We cannot tell if prep_seq is |
317 | | // committed before or after the snapshot. Return true but also set |
318 | | // snap_released to true. |
319 | 0 | assert(snap_released); |
320 | 0 | *snap_released = true; |
321 | 0 | return true; |
322 | 0 | } |
323 | | |
324 | 0 | if (!found) { |
325 | 0 | ROCKS_LOG_DETAILS(info_log_, |
326 | 0 | "IsInSnapshot %" PRIu64 " in %" PRIu64 |
327 | 0 | " returns %" PRId32, |
328 | 0 | prep_seq, snapshot_seq, 1); |
329 | 0 | return true; |
330 | 0 | } |
331 | 0 | } |
332 | | // (ii) it the case: it is committed but after the snapshot_seq |
333 | 0 | ROCKS_LOG_DETAILS( |
334 | 0 | info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, |
335 | 0 | prep_seq, snapshot_seq, 0); |
336 | 0 | return false; |
337 | 0 | } |
338 | | |
339 | | // Add the transaction with prepare sequence seq to the prepared list. |
340 | | // Note: must be called serially with increasing seq on each call. |
341 | | // locked is true if prepared_mutex_ is already locked. |
342 | | void AddPrepared(uint64_t seq, bool locked = false); |
343 | | // Check if any of the prepared txns are less than new max_evicted_seq_. Must |
344 | | // be called with prepared_mutex_ write locked. |
345 | | void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked); |
346 | | // Remove the transaction with prepare sequence seq from the prepared list |
347 | | void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1); |
348 | | // Add the transaction with prepare sequence prepare_seq and commit sequence |
349 | | // commit_seq to the commit map. loop_cnt is to detect infinite loops. |
350 | | // Note: must be called serially. |
351 | | void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, |
352 | | uint8_t loop_cnt = 0); |
353 | | |
354 | | struct CommitEntry { |
355 | | uint64_t prep_seq; |
356 | | uint64_t commit_seq; |
357 | 0 | CommitEntry() : prep_seq(0), commit_seq(0) {} |
358 | 0 | CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} |
359 | 0 | bool operator==(const CommitEntry& rhs) const { |
360 | 0 | return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; |
361 | 0 | } |
362 | | }; |
363 | | |
364 | | struct CommitEntry64bFormat { |
365 | | explicit CommitEntry64bFormat(size_t index_bits) |
366 | 0 | : INDEX_BITS(index_bits), |
367 | 0 | PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)), |
368 | 0 | COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)), |
369 | 0 | COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)), |
370 | 0 | DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {} |
371 | | // Number of higher bits of a sequence number that is not used. They are |
372 | | // used to encode the value type, ... |
373 | | const size_t PAD_BITS = static_cast<size_t>(8); |
374 | | // Number of lower bits from prepare seq that can be skipped as they are |
375 | | // implied by the index of the entry in the array |
376 | | const size_t INDEX_BITS; |
377 | | // Number of bits we use to encode the prepare seq |
378 | | const size_t PREP_BITS; |
379 | | // Number of bits we use to encode the commit seq. |
380 | | const size_t COMMIT_BITS; |
381 | | // Filter to encode/decode commit seq |
382 | | const uint64_t COMMIT_FILTER; |
383 | | // The value of commit_seq - prepare_seq + 1 must be less than this bound |
384 | | const uint64_t DELTA_UPPERBOUND; |
385 | | }; |
386 | | |
387 | | // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... |
388 | | // INDEX Delta Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... |
389 | | // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA |
390 | | // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and |
391 | | // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the |
392 | | // bits that do not have to be encoded (will be provided externally) DELTA: |
393 | | // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of |
394 | | // index bits + PADs |
395 | | struct CommitEntry64b { |
396 | 0 | constexpr CommitEntry64b() noexcept : rep_(0) {} |
397 | | |
398 | | CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) |
399 | 0 | : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} |
400 | | |
401 | | CommitEntry64b(const uint64_t ps, const uint64_t cs, |
402 | 0 | const CommitEntry64bFormat& format) { |
403 | 0 | assert(ps < static_cast<uint64_t>( |
404 | 0 | (1ull << (format.PREP_BITS + format.INDEX_BITS)))); |
405 | 0 | assert(ps <= cs); |
406 | 0 | uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 |
407 | | // zero is reserved for uninitialized entries |
408 | 0 | assert(0 < delta); |
409 | 0 | assert(delta < format.DELTA_UPPERBOUND); |
410 | 0 | if (delta >= format.DELTA_UPPERBOUND) { |
411 | 0 | throw std::runtime_error( |
412 | 0 | "commit_seq >> prepare_seq. The allowed distance is " + |
413 | 0 | std::to_string(format.DELTA_UPPERBOUND) + " commit_seq is " + |
414 | 0 | std::to_string(cs) + " prepare_seq is " + std::to_string(ps)); |
415 | 0 | } |
416 | 0 | rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; |
417 | 0 | rep_ = rep_ | delta; |
418 | 0 | } |
419 | | |
420 | | // Return false if the entry is empty |
421 | | bool Parse(const uint64_t indexed_seq, CommitEntry* entry, |
422 | 0 | const CommitEntry64bFormat& format) { |
423 | 0 | uint64_t delta = rep_ & format.COMMIT_FILTER; |
424 | | // zero is reserved for uninitialized entries |
425 | 0 | assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS))); |
426 | 0 | if (delta == 0) { |
427 | 0 | return false; // initialized entry would have non-zero delta |
428 | 0 | } |
429 | | |
430 | 0 | assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS))); |
431 | 0 | uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; |
432 | 0 | prep_up >>= format.PAD_BITS; |
433 | 0 | const uint64_t& prep_low = indexed_seq; |
434 | 0 | entry->prep_seq = prep_up | prep_low; |
435 | |
|
436 | 0 | entry->commit_seq = entry->prep_seq + delta - 1; |
437 | 0 | return true; |
438 | 0 | } |
439 | | |
440 | | private: |
441 | | uint64_t rep_; |
442 | | }; |
443 | | |
444 | | // Struct to hold ownership of snapshot and read callback for cleanup. |
445 | | struct IteratorState; |
446 | | |
447 | 0 | std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() { |
448 | 0 | return cf_map_; |
449 | 0 | } |
450 | 0 | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() { |
451 | 0 | return handle_map_; |
452 | 0 | } |
453 | | void UpdateCFComparatorMap( |
454 | | const std::vector<ColumnFamilyHandle*>& handles) override; |
455 | | void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override; |
456 | | |
457 | | const Snapshot* GetSnapshot() override; |
458 | | SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check); |
459 | | |
460 | | protected: |
461 | | Status VerifyCFOptions(const ColumnFamilyOptions& cf_options) override; |
462 | | // Assign the min and max sequence numbers for reading from the db. A seq > |
463 | | // max is not valid, and a seq < min is valid, and a min <= seq < max requires |
464 | | // further checking. Normally max is defined by the snapshot and min is by |
465 | | // minimum uncommitted seq. |
466 | | inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot, |
467 | | SequenceNumber* min, |
468 | | SequenceNumber* max); |
469 | | // Validate is a snapshot sequence number is still valid based on the latest |
470 | | // db status. backed_by_snapshot specifies if the number is baked by an actual |
471 | | // snapshot object. order specified the memory order with which we load the |
472 | | // atomic variables: relax is enough for the default since we care about last |
473 | | // value seen by same thread. |
474 | | inline bool ValidateSnapshot( |
475 | | const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot, |
476 | | std::memory_order order = std::memory_order_relaxed); |
477 | | // Get a dummy snapshot that refers to kMaxSequenceNumber |
478 | 0 | Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; } |
479 | | |
480 | | bool ShouldRollbackWithSingleDelete(ColumnFamilyHandle* column_family, |
481 | 0 | const Slice& key) { |
482 | 0 | return rollback_deletion_type_callback_ |
483 | 0 | ? rollback_deletion_type_callback_(this, column_family, key) |
484 | 0 | : false; |
485 | 0 | } |
486 | | |
487 | | std::function<bool(TransactionDB*, ColumnFamilyHandle*, const Slice&)> |
488 | | rollback_deletion_type_callback_; |
489 | | |
490 | | private: |
491 | | friend class AddPreparedCallback; |
492 | | friend class PreparedHeap_BasicsTest_Test; |
493 | | friend class PreparedHeap_Concurrent_Test; |
494 | | friend class PreparedHeap_EmptyAtTheEnd_Test; |
495 | | friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test; |
496 | | friend class WritePreparedCommitEntryPreReleaseCallback; |
497 | | friend class WritePreparedTransactionTestBase; |
498 | | friend class WritePreparedTxn; |
499 | | friend class WritePreparedTxnDBMock; |
500 | | friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test; |
501 | | friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test; |
502 | | friend class |
503 | | WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test; |
504 | | friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test; |
505 | | friend class WritePreparedTransactionTest_BasicRecovery_Test; |
506 | | friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test; |
507 | | friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test; |
508 | | friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test; |
509 | | friend class WritePreparedTransactionTest_CommitMap_Test; |
510 | | friend class WritePreparedTransactionTest_DoubleSnapshot_Test; |
511 | | friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test; |
512 | | friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test; |
513 | | friend class WritePreparedTransactionTest_IsInSnapshot_Test; |
514 | | friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test; |
515 | | friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test; |
516 | | friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test; |
517 | | friend class |
518 | | WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test; |
519 | | friend class |
520 | | WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test; |
521 | | friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test; |
522 | | friend class WritePreparedTransactionTest_OldCommitMapGC_Test; |
523 | | friend class WritePreparedTransactionTest_Rollback_Test; |
524 | | friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test; |
525 | | friend class WriteUnpreparedTxn; |
526 | | friend class WriteUnpreparedTxnDB; |
527 | | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; |
528 | | friend class MultiOpsTxnsStressTest; |
529 | | |
530 | | void Init(const TransactionDBOptions& txn_db_opts); |
531 | | |
532 | 0 | void WPRecordTick(uint32_t ticker_type) const { |
533 | 0 | RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type); |
534 | 0 | } |
535 | | |
536 | | Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, |
537 | 0 | const Slice& key, std::string* value) { |
538 | 0 | assert(value != nullptr); |
539 | 0 | PinnableSlice pinnable_val(value); |
540 | 0 | assert(!pinnable_val.IsPinned()); |
541 | 0 | auto s = GetImpl(options, column_family, key, &pinnable_val); |
542 | 0 | if (s.ok() && pinnable_val.IsPinned()) { |
543 | 0 | value->assign(pinnable_val.data(), pinnable_val.size()); |
544 | 0 | } // else value is already assigned |
545 | 0 | return s; |
546 | 0 | } |
547 | | |
548 | | Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, |
549 | | const Slice& key, PinnableSlice* value); |
550 | | |
551 | | // A heap with the amortized O(1) complexity for erase. It uses one extra heap |
552 | | // to keep track of erased entries that are not yet on top of the main heap. |
553 | | class PreparedHeap { |
554 | | // The mutex is required for push and pop from PreparedHeap. ::erase will |
555 | | // use external synchronization via prepared_mutex_. |
556 | | port::Mutex push_pop_mutex_; |
557 | | std::deque<uint64_t> heap_; |
558 | | std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> |
559 | | erased_heap_; |
560 | | std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber}; |
561 | | // True when testing crash recovery |
562 | | bool TEST_CRASH_ = false; |
563 | | friend class WritePreparedTxnDB; |
564 | | |
565 | | public: |
566 | 0 | ~PreparedHeap() { |
567 | 0 | if (!TEST_CRASH_) { |
568 | 0 | assert(heap_.empty()); |
569 | 0 | assert(erased_heap_.empty()); |
570 | 0 | } |
571 | 0 | } |
572 | 0 | port::Mutex* push_pop_mutex() { return &push_pop_mutex_; } |
573 | | |
574 | 0 | inline bool empty() { return top() == kMaxSequenceNumber; } |
575 | | // Returns kMaxSequenceNumber if empty() and the smallest otherwise. |
576 | 0 | inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); } |
577 | 0 | inline void push(uint64_t v) { |
578 | 0 | push_pop_mutex_.AssertHeld(); |
579 | 0 | if (heap_.empty()) { |
580 | 0 | heap_top_.store(v, std::memory_order_release); |
581 | 0 | } else { |
582 | 0 | assert(heap_top_.load() < v); |
583 | 0 | } |
584 | 0 | heap_.push_back(v); |
585 | 0 | } |
586 | 0 | void pop(bool locked = false) { |
587 | 0 | if (!locked) { |
588 | 0 | push_pop_mutex()->Lock(); |
589 | 0 | } |
590 | 0 | push_pop_mutex_.AssertHeld(); |
591 | 0 | heap_.pop_front(); |
592 | 0 | while (!heap_.empty() && !erased_heap_.empty() && |
593 | | // heap_.top() > erased_heap_.top() could happen if we have erased |
594 | | // a non-existent entry. Ideally the user should not do that but we |
595 | | // should be resilient against it. |
596 | 0 | heap_.front() >= erased_heap_.top()) { |
597 | 0 | if (heap_.front() == erased_heap_.top()) { |
598 | 0 | heap_.pop_front(); |
599 | 0 | } |
600 | 0 | uint64_t erased __attribute__((__unused__)); |
601 | 0 | erased = erased_heap_.top(); |
602 | 0 | erased_heap_.pop(); |
603 | | // No duplicate prepare sequence numbers |
604 | 0 | assert(erased_heap_.empty() || erased_heap_.top() != erased); |
605 | 0 | } |
606 | 0 | while (heap_.empty() && !erased_heap_.empty()) { |
607 | 0 | erased_heap_.pop(); |
608 | 0 | } |
609 | 0 | heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber, |
610 | 0 | std::memory_order_release); |
611 | 0 | if (!locked) { |
612 | 0 | push_pop_mutex()->Unlock(); |
613 | 0 | } |
614 | 0 | } |
615 | | // Concurrrent calls needs external synchronization. It is safe to be called |
616 | | // concurrent to push and pop though. |
617 | 0 | void erase(uint64_t seq) { |
618 | 0 | if (!empty()) { |
619 | 0 | auto top_seq = top(); |
620 | 0 | if (seq < top_seq) { |
621 | | // Already popped, ignore it. |
622 | 0 | } else if (top_seq == seq) { |
623 | 0 | pop(); |
624 | | #ifndef NDEBUG |
625 | | MutexLock ml(push_pop_mutex()); |
626 | | assert(heap_.empty() || heap_.front() != seq); |
627 | | #endif |
628 | 0 | } else { // top() > seq |
629 | | // Down the heap, remember to pop it later |
630 | 0 | erased_heap_.push(seq); |
631 | 0 | } |
632 | 0 | } |
633 | 0 | } |
634 | | }; |
635 | | |
636 | 0 | void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; } |
637 | | |
638 | | // Get the commit entry with index indexed_seq from the commit table. It |
639 | | // returns true if such entry exists. |
640 | | bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, |
641 | | CommitEntry* entry) const; |
642 | | |
643 | | // Rewrite the entry with the index indexed_seq in the commit table with the |
644 | | // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction, |
645 | | // sets the evicted_entry and returns true. |
646 | | bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, |
647 | | CommitEntry* evicted_entry); |
648 | | |
649 | | // Rewrite the entry with the index indexed_seq in the commit table with the |
650 | | // commit entry new_entry only if the existing entry matches the |
651 | | // expected_entry. Returns false otherwise. |
652 | | bool ExchangeCommitEntry(const uint64_t indexed_seq, |
653 | | CommitEntry64b& expected_entry, |
654 | | const CommitEntry& new_entry); |
655 | | |
656 | | // Increase max_evicted_seq_ from the previous value prev_max to the new |
657 | | // value. This also involves taking care of prepared txns that are not |
658 | | // committed before new_max, as well as updating the list of live snapshots at |
659 | | // the time of updating the max. Thread-safety: this function can be called |
660 | | // concurrently. The concurrent invocations of this function is equivalent to |
661 | | // a serial invocation in which the last invocation is the one with the |
662 | | // largest new_max value. |
663 | | void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, |
664 | | const SequenceNumber& new_max); |
665 | | |
666 | 0 | inline SequenceNumber SmallestUnCommittedSeq() { |
667 | | // Note: We have two lists to look into, but for performance reasons they |
668 | | // are not read atomically. Since CheckPreparedAgainstMax copies the entry |
669 | | // to delayed_prepared_ before removing it from prepared_txns_, to ensure |
670 | | // that a prepared entry will not go unmissed, we look into them in opposite |
671 | | // order: first read prepared_txns_ and then delayed_prepared_. |
672 | | |
673 | | // This must be called before calling ::top. This is because the concurrent |
674 | | // thread would call ::RemovePrepared before updating |
675 | | // GetLatestSequenceNumber(). Reading then in opposite order here guarantees |
676 | | // that the ::top that we read would be lower the ::top if we had otherwise |
677 | | // update/read them atomically. |
678 | 0 | auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1; |
679 | 0 | auto min_prepare = prepared_txns_.top(); |
680 | | // Since we update the prepare_heap always from the main write queue via |
681 | | // PreReleaseCallback, the prepared_txns_.top() indicates the smallest |
682 | | // prepared data in 2pc transactions. For non-2pc transactions that are |
683 | | // written in two steps, we also update prepared_txns_ at the first step |
684 | | // (via the same mechanism) so that their uncommitted data is reflected in |
685 | | // SmallestUnCommittedSeq. |
686 | 0 | if (!delayed_prepared_empty_.load()) { |
687 | 0 | ReadLock rl(&prepared_mutex_); |
688 | 0 | if (!delayed_prepared_.empty()) { |
689 | 0 | return *delayed_prepared_.begin(); |
690 | 0 | } |
691 | 0 | } |
692 | 0 | bool empty = min_prepare == kMaxSequenceNumber; |
693 | 0 | if (empty) { |
694 | | // Since GetLatestSequenceNumber is updated |
695 | | // after prepared_txns_ are, the value of GetLatestSequenceNumber would |
696 | | // reflect any uncommitted data that is not added to prepared_txns_ yet. |
697 | | // Otherwise, if there is no concurrent txn, this value simply reflects |
698 | | // that latest value in the memtable. |
699 | 0 | return next_prepare; |
700 | 0 | } else { |
701 | 0 | return std::min(min_prepare, next_prepare); |
702 | 0 | } |
703 | 0 | } |
704 | | |
705 | | // Enhance the snapshot object by recording in it the smallest uncommitted seq |
706 | | inline void EnhanceSnapshot(SnapshotImpl* snapshot, |
707 | 0 | SequenceNumber min_uncommitted) { |
708 | 0 | assert(snapshot); |
709 | 0 | assert(min_uncommitted <= snapshot->number_ + 1); |
710 | 0 | snapshot->min_uncommitted_ = min_uncommitted; |
711 | 0 | } |
712 | | |
713 | | virtual const std::vector<SequenceNumber> GetSnapshotListFromDB( |
714 | | SequenceNumber max); |
715 | | |
716 | | // Will be called by the public ReleaseSnapshot method. Does the maintenance |
717 | | // internal to WritePreparedTxnDB |
718 | | void ReleaseSnapshotInternal(const SequenceNumber snap_seq); |
719 | | |
720 | | // Update the list of snapshots corresponding to the soon-to-be-updated |
721 | | // max_evicted_seq_. Thread-safety: this function can be called concurrently. |
722 | | // The concurrent invocations of this function is equivalent to a serial |
723 | | // invocation in which the last invocation is the one with the largest |
724 | | // version value. |
725 | | void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots, |
726 | | const SequenceNumber& version); |
727 | | // Check the new list of new snapshots against the old one to see if any of |
728 | | // the snapshots are released and to do the cleanup for the released snapshot. |
729 | | void CleanupReleasedSnapshots( |
730 | | const std::vector<SequenceNumber>& new_snapshots, |
731 | | const std::vector<SequenceNumber>& old_snapshots); |
732 | | |
733 | | // Check an evicted entry against live snapshots to see if it should be kept |
734 | | // around or it can be safely discarded (and hence assume committed for all |
735 | | // snapshots). Thread-safety: this function can be called concurrently. If it |
736 | | // is called concurrently with multiple UpdateSnapshots, the result is the |
737 | | // same as checking the intersection of the snapshot list before updates with |
738 | | // the snapshot list of all the concurrent updates. |
739 | | void CheckAgainstSnapshots(const CommitEntry& evicted); |
740 | | |
741 | | // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq < |
742 | | // commit_seq. Return false if checking the next snapshot(s) is not needed. |
743 | | // This is the case if none of the next snapshots could satisfy the condition. |
744 | | // next_is_larger: the next snapshot will be a larger value |
745 | | bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq, |
746 | | const uint64_t& commit_seq, |
747 | | const uint64_t& snapshot_seq, |
748 | | const bool next_is_larger); |
749 | | |
750 | | // A trick to increase the last visible sequence number by one and also wait |
751 | | // for the in-flight commits to be visible. |
752 | | void AdvanceSeqByOne(); |
753 | | |
754 | | // The list of live snapshots at the last time that max_evicted_seq_ advanced. |
755 | | // The list stored into two data structures: in snapshot_cache_ that is |
756 | | // efficient for concurrent reads, and in snapshots_ if the data does not fit |
757 | | // into snapshot_cache_. The total number of snapshots in the two lists |
758 | | std::atomic<size_t> snapshots_total_ = {}; |
759 | | // The list sorted in ascending order. Thread-safety for writes is provided |
760 | | // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for |
761 | | // each entry. In x86_64 architecture such reads are compiled to simple read |
762 | | // instructions. |
763 | | const size_t SNAPSHOT_CACHE_BITS; |
764 | | const size_t SNAPSHOT_CACHE_SIZE; |
765 | | std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_; |
766 | | // 2nd list for storing snapshots. The list sorted in ascending order. |
767 | | // Thread-safety is provided with snapshots_mutex_. |
768 | | std::vector<SequenceNumber> snapshots_; |
769 | | // The list of all snapshots: snapshots_ + snapshot_cache_. This list although |
770 | | // redundant but simplifies CleanupOldSnapshots implementation. |
771 | | // Thread-safety is provided with snapshots_mutex_. |
772 | | std::vector<SequenceNumber> snapshots_all_; |
773 | | // The version of the latest list of snapshots. This can be used to avoid |
774 | | // rewriting a list that is concurrently updated with a more recent version. |
775 | | SequenceNumber snapshots_version_ = 0; |
776 | | |
777 | | // A heap of prepared transactions. Thread-safety is provided with |
778 | | // prepared_mutex_. |
779 | | PreparedHeap prepared_txns_; |
780 | | const size_t COMMIT_CACHE_BITS; |
781 | | const size_t COMMIT_CACHE_SIZE; |
782 | | const CommitEntry64bFormat FORMAT; |
783 | | // commit_cache_ must be initialized to zero to tell apart an empty index from |
784 | | // a filled one. Thread-safety is provided with commit_cache_mutex_. |
785 | | std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_; |
786 | | // The largest evicted *commit* sequence number from the commit_cache_. If a |
787 | | // seq is smaller than max_evicted_seq_ is might or might not be present in |
788 | | // commit_cache_. So commit_cache_ must first be checked before consulting |
789 | | // with max_evicted_seq_. |
790 | | std::atomic<uint64_t> max_evicted_seq_ = {}; |
791 | | // Order: 1) update future_max_evicted_seq_ = new_max, 2) |
792 | | // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since |
793 | | // GetSnapshotInternal guarantess that the snapshot seq is larger than |
794 | | // future_max_evicted_seq_, this guarantes that if a snapshot is not larger |
795 | | // than max has already being looked at via a GetSnapshotListFromDB(new_max). |
796 | | std::atomic<uint64_t> future_max_evicted_seq_ = {}; |
797 | | // Advance max_evicted_seq_ by this value each time it needs an update. The |
798 | | // larger the value, the less frequent advances we would have. We do not want |
799 | | // it to be too large either as it would cause stalls by doing too much |
800 | | // maintenance work under the lock. |
801 | | size_t INC_STEP_FOR_MAX_EVICTED = 1; |
802 | | // A map from old snapshots (expected to be used by a few read-only txns) to |
803 | | // prepared sequence number of the evicted entries from commit_cache_ that |
804 | | // overlaps with such snapshot. These are the prepared sequence numbers that |
805 | | // the snapshot, to which they are mapped, cannot assume to be committed just |
806 | | // because it is no longer in the commit_cache_. The vector must be sorted |
807 | | // after each update. |
808 | | // Thread-safety is provided with old_commit_map_mutex_. |
809 | | std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_; |
810 | | // A set of long-running prepared transactions that are not finished by the |
811 | | // time max_evicted_seq_ advances their sequence number. This is expected to |
812 | | // be empty normally. Thread-safety is provided with prepared_mutex_. |
813 | | std::set<uint64_t> delayed_prepared_; |
814 | | // Commit of a delayed prepared: 1) update commit cache, 2) update |
815 | | // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_. |
816 | | // delayed_prepared_commits_ will help us tell apart the unprepared txns from |
817 | | // the ones that are committed but not cleaned up yet. |
818 | | std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_; |
819 | | // Update when delayed_prepared_.empty() changes. Expected to be true |
820 | | // normally. |
821 | | std::atomic<bool> delayed_prepared_empty_ = {true}; |
822 | | // Update when old_commit_map_.empty() changes. Expected to be true normally. |
823 | | std::atomic<bool> old_commit_map_empty_ = {true}; |
824 | | mutable port::RWMutex prepared_mutex_; |
825 | | mutable port::RWMutex old_commit_map_mutex_; |
826 | | mutable port::RWMutex commit_cache_mutex_; |
827 | | mutable port::RWMutex snapshots_mutex_; |
828 | | // A cache of the cf comparators |
829 | | // Thread safety: since it is a const it is safe to read it concurrently |
830 | | std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_; |
831 | | // A cache of the cf handles |
832 | | // Thread safety: since the handle is read-only object it is a const it is |
833 | | // safe to read it concurrently |
834 | | std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_; |
835 | | // A dummy snapshot object that refers to kMaxSequenceNumber |
836 | | SnapshotImpl dummy_max_snapshot_; |
837 | | }; |
838 | | |
839 | | class WritePreparedTxnReadCallback : public ReadCallback { |
840 | | public: |
841 | | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) |
842 | 0 | : ReadCallback(snapshot), |
843 | 0 | db_(db), |
844 | 0 | backed_by_snapshot_(kBackedByDBSnapshot) {} |
845 | | WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, |
846 | | SequenceNumber min_uncommitted, |
847 | | SnapshotBackup backed_by_snapshot) |
848 | 0 | : ReadCallback(snapshot, min_uncommitted), |
849 | 0 | db_(db), |
850 | 0 | backed_by_snapshot_(backed_by_snapshot) { |
851 | 0 | (void)backed_by_snapshot_; // to silence unused private field warning |
852 | 0 | } |
853 | | |
854 | 0 | virtual ~WritePreparedTxnReadCallback() { |
855 | | // If it is not backed by snapshot, the caller must check validity |
856 | 0 | assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot); |
857 | 0 | } |
858 | | |
859 | | // Will be called to see if the seq number visible; if not it moves on to |
860 | | // the next seq number. |
861 | 0 | inline bool IsVisibleFullCheck(SequenceNumber seq) override { |
862 | 0 | auto snapshot = max_visible_seq_; |
863 | 0 | bool snap_released = false; |
864 | 0 | auto ret = |
865 | 0 | db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released); |
866 | 0 | assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot); |
867 | 0 | snap_released_ |= snap_released; |
868 | 0 | return ret; |
869 | 0 | } |
870 | | |
871 | 0 | inline bool valid() { |
872 | 0 | valid_checked_ = true; |
873 | 0 | return snap_released_ == false; |
874 | 0 | } |
875 | | |
876 | | // TODO(myabandeh): override Refresh when Iterator::Refresh is supported |
877 | | private: |
878 | | WritePreparedTxnDB* db_; |
879 | | // Whether max_visible_seq_ is backed by a snapshot |
880 | | const SnapshotBackup backed_by_snapshot_; |
881 | | bool snap_released_ = false; |
882 | | // Safety check to ensure that the caller has checked invalid statuses |
883 | | bool valid_checked_ = false; |
884 | | }; |
885 | | |
886 | | class AddPreparedCallback : public PreReleaseCallback { |
887 | | public: |
888 | | AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl, |
889 | | size_t sub_batch_cnt, bool two_write_queues, |
890 | | bool first_prepare_batch) |
891 | 0 | : db_(db), |
892 | 0 | db_impl_(db_impl), |
893 | 0 | sub_batch_cnt_(sub_batch_cnt), |
894 | 0 | two_write_queues_(two_write_queues), |
895 | 0 | first_prepare_batch_(first_prepare_batch) { |
896 | 0 | (void)two_write_queues_; // to silence unused private field warning |
897 | 0 | } |
898 | | Status Callback(SequenceNumber prepare_seq, |
899 | | bool is_mem_disabled __attribute__((__unused__)), |
900 | 0 | uint64_t log_number, size_t index, size_t total) override { |
901 | 0 | assert(index < total); |
902 | | // To reduce the cost of lock acquisition competing with the concurrent |
903 | | // prepare requests, lock on the first callback and unlock on the last. |
904 | 0 | const bool do_lock = !two_write_queues_ || index == 0; |
905 | 0 | const bool do_unlock = !two_write_queues_ || index + 1 == total; |
906 | | // Always Prepare from the main queue |
907 | 0 | assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue |
908 | 0 | TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause"); |
909 | 0 | TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume"); |
910 | 0 | if (do_lock) { |
911 | 0 | db_->prepared_txns_.push_pop_mutex()->Lock(); |
912 | 0 | } |
913 | 0 | const bool kLocked = true; |
914 | 0 | for (size_t i = 0; i < sub_batch_cnt_; i++) { |
915 | 0 | db_->AddPrepared(prepare_seq + i, kLocked); |
916 | 0 | } |
917 | 0 | if (do_unlock) { |
918 | 0 | db_->prepared_txns_.push_pop_mutex()->Unlock(); |
919 | 0 | } |
920 | 0 | TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end"); |
921 | 0 | if (first_prepare_batch_) { |
922 | 0 | assert(log_number != 0); |
923 | 0 | db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( |
924 | 0 | log_number); |
925 | 0 | } |
926 | 0 | return Status::OK(); |
927 | 0 | } |
928 | | |
929 | | private: |
930 | | WritePreparedTxnDB* db_; |
931 | | DBImpl* db_impl_; |
932 | | size_t sub_batch_cnt_; |
933 | | bool two_write_queues_; |
934 | | // It is 2PC and this is the first prepare batch. Always the case in 2PC |
935 | | // unless it is WriteUnPrepared. |
936 | | bool first_prepare_batch_; |
937 | | }; |
938 | | |
939 | | class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { |
940 | | public: |
941 | | // includes_data indicates that the commit also writes non-empty |
942 | | // CommitTimeWriteBatch to memtable, which needs to be committed separately. |
943 | | WritePreparedCommitEntryPreReleaseCallback( |
944 | | WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq, |
945 | | size_t prep_batch_cnt, size_t data_batch_cnt = 0, |
946 | | SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0) |
947 | 0 | : db_(db), |
948 | 0 | db_impl_(db_impl), |
949 | 0 | prep_seq_(prep_seq), |
950 | 0 | prep_batch_cnt_(prep_batch_cnt), |
951 | 0 | data_batch_cnt_(data_batch_cnt), |
952 | 0 | includes_data_(data_batch_cnt_ > 0), |
953 | 0 | aux_seq_(aux_seq), |
954 | 0 | aux_batch_cnt_(aux_batch_cnt), |
955 | 0 | includes_aux_batch_(aux_batch_cnt > 0) { |
956 | 0 | assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor |
957 | 0 | assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0); |
958 | 0 | assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber)); // xor |
959 | 0 | } |
960 | | |
961 | | Status Callback(SequenceNumber commit_seq, |
962 | | bool is_mem_disabled __attribute__((__unused__)), uint64_t, |
963 | 0 | size_t /*index*/, size_t /*total*/) override { |
964 | | // Always commit from the 2nd queue |
965 | 0 | assert(!db_impl_->immutable_db_options().two_write_queues || |
966 | 0 | is_mem_disabled); |
967 | 0 | assert(includes_data_ || prep_seq_ != kMaxSequenceNumber); |
968 | | // Data batch is what accompanied with the commit marker and affects the |
969 | | // last seq in the commit batch. |
970 | 0 | const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) |
971 | 0 | ? commit_seq |
972 | 0 | : commit_seq + data_batch_cnt_ - 1; |
973 | 0 | if (prep_seq_ != kMaxSequenceNumber) { |
974 | 0 | for (size_t i = 0; i < prep_batch_cnt_; i++) { |
975 | 0 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); |
976 | 0 | } |
977 | 0 | } // else there was no prepare phase |
978 | 0 | if (includes_aux_batch_) { |
979 | 0 | for (size_t i = 0; i < aux_batch_cnt_; i++) { |
980 | 0 | db_->AddCommitted(aux_seq_ + i, last_commit_seq); |
981 | 0 | } |
982 | 0 | } |
983 | 0 | if (includes_data_) { |
984 | 0 | assert(data_batch_cnt_); |
985 | | // Commit the data that is accompanied with the commit request |
986 | 0 | for (size_t i = 0; i < data_batch_cnt_; i++) { |
987 | | // For commit seq of each batch use the commit seq of the last batch. |
988 | | // This would make debugging easier by having all the batches having |
989 | | // the same sequence number. |
990 | 0 | db_->AddCommitted(commit_seq + i, last_commit_seq); |
991 | 0 | } |
992 | 0 | } |
993 | 0 | if (db_impl_->immutable_db_options().two_write_queues) { |
994 | 0 | assert(is_mem_disabled); // implies the 2nd queue |
995 | | // Publish the sequence number. We can do that here assuming the callback |
996 | | // is invoked only from one write queue, which would guarantee that the |
997 | | // publish sequence numbers will be in order, i.e., once a seq is |
998 | | // published all the seq prior to that are also publishable. |
999 | 0 | db_impl_->SetLastPublishedSequence(last_commit_seq); |
1000 | | // Note RemovePrepared should be called after publishing the seq. |
1001 | | // Otherwise SmallestUnCommittedSeq optimization breaks. |
1002 | 0 | if (prep_seq_ != kMaxSequenceNumber) { |
1003 | 0 | db_->RemovePrepared(prep_seq_, prep_batch_cnt_); |
1004 | 0 | } // else there was no prepare phase |
1005 | 0 | if (includes_aux_batch_) { |
1006 | 0 | db_->RemovePrepared(aux_seq_, aux_batch_cnt_); |
1007 | 0 | } |
1008 | 0 | } |
1009 | | // else SequenceNumber that is updated as part of the write already does the |
1010 | | // publishing |
1011 | 0 | return Status::OK(); |
1012 | 0 | } |
1013 | | |
1014 | | private: |
1015 | | WritePreparedTxnDB* db_; |
1016 | | DBImpl* db_impl_; |
1017 | | // kMaxSequenceNumber if there was no prepare phase |
1018 | | SequenceNumber prep_seq_; |
1019 | | size_t prep_batch_cnt_; |
1020 | | size_t data_batch_cnt_; |
1021 | | // Data here is the batch that is written with the commit marker, either |
1022 | | // because it is commit without prepare or commit has a CommitTimeWriteBatch. |
1023 | | bool includes_data_; |
1024 | | // Auxiliary batch (if there is any) is a batch that is written before, but |
1025 | | // gets the same commit seq as prepare batch or data batch. This is used in |
1026 | | // two write queues where the CommitTimeWriteBatch becomes the aux batch and |
1027 | | // we do a separate write to actually commit everything. |
1028 | | SequenceNumber aux_seq_; |
1029 | | size_t aux_batch_cnt_; |
1030 | | bool includes_aux_batch_; |
1031 | | }; |
1032 | | |
1033 | | // For two_write_queues commit both the aborted batch and the cleanup batch and |
1034 | | // then published the seq |
1035 | | class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { |
1036 | | public: |
1037 | | WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db, |
1038 | | DBImpl* db_impl, |
1039 | | SequenceNumber prep_seq, |
1040 | | SequenceNumber rollback_seq, |
1041 | | size_t prep_batch_cnt) |
1042 | 0 | : db_(db), |
1043 | 0 | db_impl_(db_impl), |
1044 | 0 | prep_seq_(prep_seq), |
1045 | 0 | rollback_seq_(rollback_seq), |
1046 | 0 | prep_batch_cnt_(prep_batch_cnt) { |
1047 | 0 | assert(prep_seq != kMaxSequenceNumber); |
1048 | 0 | assert(rollback_seq != kMaxSequenceNumber); |
1049 | 0 | assert(prep_batch_cnt_ > 0); |
1050 | 0 | } |
1051 | | |
1052 | | Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t, |
1053 | 0 | size_t /*index*/, size_t /*total*/) override { |
1054 | | // Always commit from the 2nd queue |
1055 | 0 | assert(is_mem_disabled); // implies the 2nd queue |
1056 | 0 | assert(db_impl_->immutable_db_options().two_write_queues); |
1057 | 0 | #ifdef NDEBUG |
1058 | 0 | (void)is_mem_disabled; |
1059 | 0 | #endif |
1060 | 0 | const uint64_t last_commit_seq = commit_seq; |
1061 | 0 | db_->AddCommitted(rollback_seq_, last_commit_seq); |
1062 | 0 | for (size_t i = 0; i < prep_batch_cnt_; i++) { |
1063 | 0 | db_->AddCommitted(prep_seq_ + i, last_commit_seq); |
1064 | 0 | } |
1065 | 0 | db_impl_->SetLastPublishedSequence(last_commit_seq); |
1066 | 0 | return Status::OK(); |
1067 | 0 | } |
1068 | | |
1069 | | private: |
1070 | | WritePreparedTxnDB* db_; |
1071 | | DBImpl* db_impl_; |
1072 | | SequenceNumber prep_seq_; |
1073 | | SequenceNumber rollback_seq_; |
1074 | | size_t prep_batch_cnt_; |
1075 | | }; |
1076 | | |
1077 | | // Count the number of sub-batches inside a batch. A sub-batch does not have |
1078 | | // duplicate keys. |
1079 | | struct SubBatchCounter : public WriteBatch::Handler { |
1080 | | explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators) |
1081 | 0 | : comparators_(comparators), batches_(1) {} |
1082 | | std::map<uint32_t, const Comparator*>& comparators_; |
1083 | | using CFKeys = std::set<Slice, SetComparator>; |
1084 | | std::map<uint32_t, CFKeys> keys_; |
1085 | | size_t batches_; |
1086 | 0 | size_t BatchCount() { return batches_; } |
1087 | | void AddKey(const uint32_t cf, const Slice& key); |
1088 | | void InitWithComp(const uint32_t cf); |
1089 | 0 | Status MarkNoop(bool) override { return Status::OK(); } |
1090 | 0 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } |
1091 | 0 | Status MarkCommit(const Slice&) override { return Status::OK(); } |
1092 | 0 | Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { |
1093 | 0 | AddKey(cf, key); |
1094 | 0 | return Status::OK(); |
1095 | 0 | } |
1096 | 0 | Status DeleteCF(uint32_t cf, const Slice& key) override { |
1097 | 0 | AddKey(cf, key); |
1098 | 0 | return Status::OK(); |
1099 | 0 | } |
1100 | 0 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { |
1101 | 0 | AddKey(cf, key); |
1102 | 0 | return Status::OK(); |
1103 | 0 | } |
1104 | 0 | Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { |
1105 | 0 | AddKey(cf, key); |
1106 | 0 | return Status::OK(); |
1107 | 0 | } |
1108 | 0 | Status MarkBeginPrepare(bool) override { return Status::OK(); } |
1109 | 0 | Status MarkRollback(const Slice&) override { return Status::OK(); } |
1110 | 0 | Handler::OptionState WriteAfterCommit() const override { |
1111 | 0 | return Handler::OptionState::kDisabled; |
1112 | 0 | } |
1113 | | }; |
1114 | | |
1115 | | SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot, |
1116 | | SequenceNumber* min, |
1117 | 0 | SequenceNumber* max) { |
1118 | 0 | if (snapshot != nullptr) { |
1119 | 0 | *min = |
1120 | 0 | static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_; |
1121 | 0 | *max = static_cast_with_check<const SnapshotImpl>(snapshot)->number_; |
1122 | | // A duplicate of the check in EnhanceSnapshot(). |
1123 | 0 | assert(*min <= *max + 1); |
1124 | 0 | return kBackedByDBSnapshot; |
1125 | 0 | } else { |
1126 | 0 | *min = SmallestUnCommittedSeq(); |
1127 | 0 | *max = 0; // to be assigned later after sv is referenced. |
1128 | 0 | return kUnbackedByDBSnapshot; |
1129 | 0 | } |
1130 | 0 | } |
1131 | | |
1132 | | bool WritePreparedTxnDB::ValidateSnapshot( |
1133 | | const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot, |
1134 | 0 | std::memory_order order) { |
1135 | 0 | if (backed_by_snapshot == kBackedByDBSnapshot) { |
1136 | 0 | return true; |
1137 | 0 | } else { |
1138 | 0 | SequenceNumber max = max_evicted_seq_.load(order); |
1139 | | // Validate that max has not advanced the snapshot seq that is not backed |
1140 | | // by a real snapshot. This is a very rare case that should not happen in |
1141 | | // real workloads. |
1142 | 0 | if (UNLIKELY(snap_seq <= max && snap_seq != 0)) { |
1143 | 0 | return false; |
1144 | 0 | } |
1145 | 0 | } |
1146 | 0 | return true; |
1147 | 0 | } |
1148 | | |
1149 | | } // namespace ROCKSDB_NAMESPACE |