/src/rocksdb/utilities/transactions/transaction_util.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "utilities/transactions/transaction_util.h" |
7 | | |
8 | | #include <cinttypes> |
9 | | #include <string> |
10 | | #include <vector> |
11 | | |
12 | | #include "db/db_impl/db_impl.h" |
13 | | #include "rocksdb/status.h" |
14 | | #include "rocksdb/utilities/write_batch_with_index.h" |
15 | | #include "util/cast_util.h" |
16 | | #include "util/string_util.h" |
17 | | |
18 | | namespace ROCKSDB_NAMESPACE { |
19 | | |
20 | | Status TransactionUtil::CheckKeyForConflicts( |
21 | | DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, |
22 | | SequenceNumber snap_seq, const std::string* const read_ts, bool cache_only, |
23 | | ReadCallback* snap_checker, SequenceNumber min_uncommitted, |
24 | 0 | bool enable_udt_validation) { |
25 | 0 | Status result; |
26 | |
|
27 | 0 | auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); |
28 | 0 | auto cfd = cfh->cfd(); |
29 | 0 | SuperVersion* sv = db_impl->GetAndRefSuperVersion(cfd); |
30 | |
|
31 | 0 | if (sv == nullptr) { |
32 | 0 | result = Status::InvalidArgument("Could not access column family " + |
33 | 0 | cfh->GetName()); |
34 | 0 | } |
35 | |
|
36 | 0 | if (result.ok()) { |
37 | 0 | SequenceNumber earliest_seq = |
38 | 0 | db_impl->GetEarliestMemTableSequenceNumber(sv, true); |
39 | |
|
40 | 0 | result = |
41 | 0 | CheckKey(db_impl, sv, earliest_seq, snap_seq, key, read_ts, cache_only, |
42 | 0 | snap_checker, min_uncommitted, enable_udt_validation); |
43 | |
|
44 | 0 | db_impl->ReturnAndCleanupSuperVersion(cfd, sv); |
45 | 0 | } |
46 | |
|
47 | 0 | return result; |
48 | 0 | } |
49 | | |
50 | | Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, |
51 | | SequenceNumber earliest_seq, |
52 | | SequenceNumber snap_seq, |
53 | | const std::string& key, |
54 | | const std::string* const read_ts, |
55 | | bool cache_only, ReadCallback* snap_checker, |
56 | | SequenceNumber min_uncommitted, |
57 | 0 | bool enable_udt_validation) { |
58 | | // When `min_uncommitted` is provided, keys are not always committed |
59 | | // in sequence number order, and `snap_checker` is used to check whether |
60 | | // specific sequence number is in the database is visible to the transaction. |
61 | | // So `snap_checker` must be provided. |
62 | 0 | assert(min_uncommitted == kMaxSequenceNumber || snap_checker != nullptr); |
63 | |
|
64 | 0 | Status result; |
65 | 0 | bool need_to_read_sst = false; |
66 | | |
67 | | // Since it would be too slow to check the SST files, we will only use |
68 | | // the memtables to check whether there have been any recent writes |
69 | | // to this key after it was accessed in this transaction. But if the |
70 | | // Memtables do not contain a long enough history, we must fail the |
71 | | // transaction. |
72 | 0 | if (earliest_seq == kMaxSequenceNumber) { |
73 | | // The age of this memtable is unknown. Cannot rely on it to check |
74 | | // for recent writes. This error shouldn't happen often in practice as |
75 | | // the Memtable should have a valid earliest sequence number except in some |
76 | | // corner cases (such as error cases during recovery). |
77 | 0 | need_to_read_sst = true; |
78 | |
|
79 | 0 | if (cache_only) { |
80 | 0 | result = Status::TryAgain( |
81 | 0 | "Transaction could not check for conflicts as the MemTable does not " |
82 | 0 | "contain a long enough history to check write at SequenceNumber: ", |
83 | 0 | std::to_string(snap_seq)); |
84 | 0 | } |
85 | 0 | } else if (snap_seq < earliest_seq || min_uncommitted <= earliest_seq) { |
86 | | // Use <= for min_uncommitted since earliest_seq is actually the largest sec |
87 | | // before this memtable was created |
88 | 0 | need_to_read_sst = true; |
89 | |
|
90 | 0 | if (cache_only) { |
91 | | // The age of this memtable is too new to use to check for recent |
92 | | // writes. |
93 | 0 | char msg[300]; |
94 | 0 | snprintf(msg, sizeof(msg), |
95 | 0 | "Transaction could not check for conflicts for operation at " |
96 | 0 | "SequenceNumber %" PRIu64 |
97 | 0 | " as the MemTable only contains changes newer than " |
98 | 0 | "SequenceNumber %" PRIu64 |
99 | 0 | ". Increasing the value of the " |
100 | 0 | "max_write_buffer_size_to_maintain option could reduce the " |
101 | 0 | "frequency " |
102 | 0 | "of this error.", |
103 | 0 | snap_seq, earliest_seq); |
104 | 0 | result = Status::TryAgain(msg); |
105 | 0 | } |
106 | 0 | } |
107 | |
|
108 | 0 | if (result.ok()) { |
109 | 0 | SequenceNumber seq = kMaxSequenceNumber; |
110 | 0 | std::string timestamp; |
111 | 0 | bool found_record_for_key = false; |
112 | | |
113 | | // When min_uncommitted == kMaxSequenceNumber, writes are committed in |
114 | | // sequence number order, so only keys larger than `snap_seq` can cause |
115 | | // conflict. |
116 | | // When min_uncommitted != kMaxSequenceNumber, keys lower than |
117 | | // min_uncommitted will not triggered conflicts, while keys larger than |
118 | | // min_uncommitted might create conflicts, so we need to read them out |
119 | | // from the DB, and call callback to snap_checker to determine. So only |
120 | | // keys lower than min_uncommitted can be skipped. |
121 | 0 | SequenceNumber lower_bound_seq = |
122 | 0 | (min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted; |
123 | 0 | Status s = db_impl->GetLatestSequenceForKey( |
124 | 0 | sv, key, !need_to_read_sst, lower_bound_seq, &seq, |
125 | 0 | !read_ts ? nullptr : ×tamp, &found_record_for_key, |
126 | 0 | /*is_blob_index=*/nullptr); |
127 | |
|
128 | 0 | if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { |
129 | 0 | result = s; |
130 | 0 | } else if (found_record_for_key) { |
131 | 0 | bool write_conflict = snap_checker == nullptr |
132 | 0 | ? snap_seq < seq |
133 | 0 | : !snap_checker->IsVisible(seq); |
134 | | // Perform conflict checking based on timestamp if applicable. |
135 | 0 | if (enable_udt_validation && !write_conflict && read_ts != nullptr) { |
136 | 0 | ColumnFamilyData* cfd = sv->cfd; |
137 | 0 | assert(cfd); |
138 | 0 | const Comparator* const ucmp = cfd->user_comparator(); |
139 | 0 | assert(ucmp); |
140 | 0 | assert(read_ts->size() == ucmp->timestamp_size()); |
141 | 0 | assert(read_ts->size() == timestamp.size()); |
142 | | // Write conflict if *ts < timestamp. |
143 | 0 | write_conflict = ucmp->CompareTimestamp(*read_ts, timestamp) < 0; |
144 | 0 | } |
145 | 0 | if (write_conflict) { |
146 | 0 | result = Status::Busy(); |
147 | 0 | } |
148 | 0 | } |
149 | 0 | } |
150 | |
|
151 | 0 | return result; |
152 | 0 | } |
153 | | |
154 | | Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl, |
155 | | const LockTracker& tracker, |
156 | 0 | bool cache_only) { |
157 | 0 | Status result; |
158 | |
|
159 | 0 | std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it( |
160 | 0 | tracker.GetColumnFamilyIterator()); |
161 | 0 | assert(cf_it != nullptr); |
162 | 0 | while (cf_it->HasNext()) { |
163 | 0 | ColumnFamilyId cf = cf_it->Next(); |
164 | |
|
165 | 0 | SuperVersion* sv = db_impl->GetAndRefSuperVersion(cf); |
166 | 0 | if (sv == nullptr) { |
167 | 0 | result = Status::InvalidArgument("Could not access column family " + |
168 | 0 | std::to_string(cf)); |
169 | 0 | break; |
170 | 0 | } |
171 | | |
172 | 0 | SequenceNumber earliest_seq = |
173 | 0 | db_impl->GetEarliestMemTableSequenceNumber(sv, true); |
174 | | |
175 | | // For each of the keys in this transaction, check to see if someone has |
176 | | // written to this key since the start of the transaction. |
177 | 0 | std::unique_ptr<LockTracker::KeyIterator> key_it( |
178 | 0 | tracker.GetKeyIterator(cf)); |
179 | 0 | assert(key_it != nullptr); |
180 | 0 | while (key_it->HasNext()) { |
181 | 0 | const std::string& key = key_it->Next(); |
182 | 0 | PointLockStatus status = tracker.GetPointLockStatus(cf, key); |
183 | 0 | const SequenceNumber key_seq = status.seq; |
184 | | |
185 | | // TODO: support timestamp-based conflict checking. |
186 | | // CheckKeysForConflicts() is currently used only by optimistic |
187 | | // transactions. |
188 | 0 | result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, |
189 | 0 | /*read_ts=*/nullptr, cache_only); |
190 | 0 | if (!result.ok()) { |
191 | 0 | break; |
192 | 0 | } |
193 | 0 | } |
194 | |
|
195 | 0 | db_impl->ReturnAndCleanupSuperVersion(cf, sv); |
196 | |
|
197 | 0 | if (!result.ok()) { |
198 | 0 | break; |
199 | 0 | } |
200 | 0 | } |
201 | |
|
202 | 0 | return result; |
203 | 0 | } |
204 | | |
205 | | } // namespace ROCKSDB_NAMESPACE |