/src/rocksdb/utilities/transactions/transaction_base.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "utilities/transactions/transaction_base.h" |
7 | | |
8 | | #include <cinttypes> |
9 | | |
10 | | #include "db/attribute_group_iterator_impl.h" |
11 | | #include "db/coalescing_iterator.h" |
12 | | #include "db/column_family.h" |
13 | | #include "db/db_impl/db_impl.h" |
14 | | #include "logging/logging.h" |
15 | | #include "rocksdb/comparator.h" |
16 | | #include "rocksdb/db.h" |
17 | | #include "rocksdb/status.h" |
18 | | #include "util/cast_util.h" |
19 | | #include "util/string_util.h" |
20 | | #include "utilities/transactions/lock/lock_tracker.h" |
21 | | |
22 | | namespace ROCKSDB_NAMESPACE { |
23 | | |
24 | | Status Transaction::CommitAndTryCreateSnapshot( |
25 | | std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts, |
26 | 0 | std::shared_ptr<const Snapshot>* snapshot) { |
27 | 0 | if (snapshot) { |
28 | 0 | snapshot->reset(); |
29 | 0 | } |
30 | 0 | TxnTimestamp commit_ts = GetCommitTimestamp(); |
31 | 0 | if (commit_ts == kMaxTxnTimestamp) { |
32 | 0 | if (ts == kMaxTxnTimestamp) { |
33 | 0 | return Status::InvalidArgument("Commit timestamp unset"); |
34 | 0 | } else { |
35 | 0 | const Status s = SetCommitTimestamp(ts); |
36 | 0 | if (!s.ok()) { |
37 | 0 | return s; |
38 | 0 | } |
39 | 0 | } |
40 | 0 | } else if (ts != kMaxTxnTimestamp) { |
41 | 0 | if (ts != commit_ts) { |
42 | | // For now we treat this as error. |
43 | 0 | return Status::InvalidArgument("Different commit ts specified"); |
44 | 0 | } |
45 | 0 | } |
46 | 0 | SetSnapshotOnNextOperation(notifier); |
47 | 0 | Status s = Commit(); |
48 | 0 | if (!s.ok()) { |
49 | 0 | return s; |
50 | 0 | } |
51 | 0 | assert(s.ok()); |
52 | | // If we reach here, we must return ok status for this function. |
53 | 0 | std::shared_ptr<const Snapshot> new_snapshot = GetTimestampedSnapshot(); |
54 | |
|
55 | 0 | if (snapshot) { |
56 | 0 | *snapshot = new_snapshot; |
57 | 0 | } |
58 | 0 | return Status::OK(); |
59 | 0 | } |
60 | | |
61 | | TransactionBaseImpl::TransactionBaseImpl( |
62 | | DB* db, const WriteOptions& write_options, |
63 | | const LockTrackerFactory& lock_tracker_factory) |
64 | 0 | : db_(db), |
65 | 0 | dbimpl_(static_cast_with_check<DBImpl>(db)), |
66 | 0 | write_options_(write_options), |
67 | 0 | cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), |
68 | 0 | lock_tracker_factory_(lock_tracker_factory), |
69 | 0 | start_time_(dbimpl_->GetSystemClock()->NowMicros()), |
70 | 0 | write_batch_(cmp_, 0, true, 0, write_options.protection_bytes_per_key), |
71 | 0 | tracked_locks_(lock_tracker_factory_.Create()), |
72 | 0 | commit_time_batch_(0 /* reserved_bytes */, 0 /* max_bytes */, |
73 | 0 | write_options.protection_bytes_per_key, |
74 | 0 | 0 /* default_cf_ts_sz */), |
75 | 0 | indexing_enabled_(true) { |
76 | 0 | assert(dynamic_cast<DBImpl*>(db_) != nullptr); |
77 | 0 | log_number_ = 0; |
78 | 0 | if (dbimpl_->allow_2pc()) { |
79 | 0 | InitWriteBatch(); |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | 0 | TransactionBaseImpl::~TransactionBaseImpl() { |
84 | | // Release snapshot if snapshot is set |
85 | 0 | SetSnapshotInternal(nullptr); |
86 | 0 | } |
87 | | |
88 | 0 | void TransactionBaseImpl::Clear() { |
89 | 0 | save_points_.reset(nullptr); |
90 | 0 | write_batch_.Clear(); |
91 | 0 | commit_time_batch_.Clear(); |
92 | 0 | tracked_locks_->Clear(); |
93 | 0 | num_puts_ = 0; |
94 | 0 | num_put_entities_ = 0; |
95 | 0 | num_deletes_ = 0; |
96 | 0 | num_merges_ = 0; |
97 | |
|
98 | 0 | if (dbimpl_->allow_2pc()) { |
99 | 0 | InitWriteBatch(); |
100 | 0 | } |
101 | 0 | } |
102 | | |
103 | | void TransactionBaseImpl::Reinitialize(DB* db, |
104 | 0 | const WriteOptions& write_options) { |
105 | 0 | Clear(); |
106 | 0 | ClearSnapshot(); |
107 | 0 | id_ = 0; |
108 | 0 | db_ = db; |
109 | 0 | name_.clear(); |
110 | 0 | log_number_ = 0; |
111 | 0 | write_options_ = write_options; |
112 | 0 | start_time_ = dbimpl_->GetSystemClock()->NowMicros(); |
113 | 0 | indexing_enabled_ = true; |
114 | 0 | cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily()); |
115 | 0 | WriteBatchInternal::SetDefaultColumnFamilyTimestampSize( |
116 | 0 | write_batch_.GetWriteBatch(), cmp_->timestamp_size()); |
117 | 0 | WriteBatchInternal::UpdateProtectionInfo( |
118 | 0 | write_batch_.GetWriteBatch(), write_options_.protection_bytes_per_key) |
119 | 0 | .PermitUncheckedError(); |
120 | 0 | WriteBatchInternal::UpdateProtectionInfo( |
121 | 0 | &commit_time_batch_, write_options_.protection_bytes_per_key) |
122 | 0 | .PermitUncheckedError(); |
123 | 0 | } |
124 | | |
125 | 0 | void TransactionBaseImpl::SetSnapshot() { |
126 | 0 | const Snapshot* snapshot = dbimpl_->GetSnapshotForWriteConflictBoundary(); |
127 | 0 | SetSnapshotInternal(snapshot); |
128 | 0 | } |
129 | | |
130 | 0 | void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) { |
131 | | // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to |
132 | | // be released, not deleted when it is no longer referenced. |
133 | 0 | snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot, |
134 | 0 | this, std::placeholders::_1, db_)); |
135 | 0 | snapshot_needed_ = false; |
136 | 0 | snapshot_notifier_ = nullptr; |
137 | 0 | } |
138 | | |
139 | | void TransactionBaseImpl::SetSnapshotOnNextOperation( |
140 | 0 | std::shared_ptr<TransactionNotifier> notifier) { |
141 | 0 | snapshot_needed_ = true; |
142 | 0 | snapshot_notifier_ = notifier; |
143 | 0 | } |
144 | | |
145 | 0 | void TransactionBaseImpl::SetSnapshotIfNeeded() { |
146 | 0 | if (snapshot_needed_) { |
147 | 0 | std::shared_ptr<TransactionNotifier> notifier = snapshot_notifier_; |
148 | 0 | SetSnapshot(); |
149 | 0 | if (notifier != nullptr) { |
150 | 0 | notifier->SnapshotCreated(GetSnapshot()); |
151 | 0 | } |
152 | 0 | } |
153 | 0 | } |
154 | | |
155 | | Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, |
156 | | const SliceParts& key, bool read_only, |
157 | | bool exclusive, const bool do_validate, |
158 | 0 | const bool assume_tracked) { |
159 | 0 | size_t key_size = 0; |
160 | 0 | for (int i = 0; i < key.num_parts; ++i) { |
161 | 0 | key_size += key.parts[i].size(); |
162 | 0 | } |
163 | |
|
164 | 0 | std::string str; |
165 | 0 | str.reserve(key_size); |
166 | |
|
167 | 0 | for (int i = 0; i < key.num_parts; ++i) { |
168 | 0 | str.append(key.parts[i].data(), key.parts[i].size()); |
169 | 0 | } |
170 | |
|
171 | 0 | return TryLock(column_family, str, read_only, exclusive, do_validate, |
172 | 0 | assume_tracked); |
173 | 0 | } |
174 | | |
175 | 0 | void TransactionBaseImpl::SetSavePoint() { |
176 | 0 | if (save_points_ == nullptr) { |
177 | 0 | save_points_.reset( |
178 | 0 | new std::stack<TransactionBaseImpl::SavePoint, |
179 | 0 | autovector<TransactionBaseImpl::SavePoint>>()); |
180 | 0 | } |
181 | 0 | save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_, |
182 | 0 | num_puts_, num_put_entities_, num_deletes_, num_merges_, |
183 | 0 | lock_tracker_factory_); |
184 | 0 | write_batch_.SetSavePoint(); |
185 | 0 | } |
186 | | |
187 | 0 | Status TransactionBaseImpl::RollbackToSavePoint() { |
188 | 0 | if (save_points_ != nullptr && save_points_->size() > 0) { |
189 | | // Restore saved SavePoint |
190 | 0 | TransactionBaseImpl::SavePoint& save_point = save_points_->top(); |
191 | 0 | snapshot_ = save_point.snapshot_; |
192 | 0 | snapshot_needed_ = save_point.snapshot_needed_; |
193 | 0 | snapshot_notifier_ = save_point.snapshot_notifier_; |
194 | 0 | num_puts_ = save_point.num_puts_; |
195 | 0 | num_put_entities_ = save_point.num_put_entities_; |
196 | 0 | num_deletes_ = save_point.num_deletes_; |
197 | 0 | num_merges_ = save_point.num_merges_; |
198 | | |
199 | | // Rollback batch |
200 | 0 | Status s = write_batch_.RollbackToSavePoint(); |
201 | 0 | assert(s.ok()); |
202 | | |
203 | | // Rollback any keys that were tracked since the last savepoint |
204 | 0 | tracked_locks_->Subtract(*save_point.new_locks_); |
205 | |
|
206 | 0 | save_points_->pop(); |
207 | |
|
208 | 0 | return s; |
209 | 0 | } else { |
210 | 0 | assert(write_batch_.RollbackToSavePoint().IsNotFound()); |
211 | 0 | return Status::NotFound(); |
212 | 0 | } |
213 | 0 | } |
214 | | |
215 | 0 | Status TransactionBaseImpl::PopSavePoint() { |
216 | 0 | if (save_points_ == nullptr || save_points_->empty()) { |
217 | | // No SavePoint yet. |
218 | 0 | assert(write_batch_.PopSavePoint().IsNotFound()); |
219 | 0 | return Status::NotFound(); |
220 | 0 | } |
221 | | |
222 | 0 | assert(!save_points_->empty()); |
223 | | // If there is another savepoint A below the current savepoint B, then A needs |
224 | | // to inherit tracked_keys in B so that if we rollback to savepoint A, we |
225 | | // remember to unlock keys in B. If there is no other savepoint below, then we |
226 | | // can safely discard savepoint info. |
227 | 0 | if (save_points_->size() == 1) { |
228 | 0 | save_points_->pop(); |
229 | 0 | } else { |
230 | 0 | TransactionBaseImpl::SavePoint top(lock_tracker_factory_); |
231 | 0 | std::swap(top, save_points_->top()); |
232 | 0 | save_points_->pop(); |
233 | |
|
234 | 0 | save_points_->top().new_locks_->Merge(*top.new_locks_); |
235 | 0 | } |
236 | |
|
237 | 0 | return write_batch_.PopSavePoint(); |
238 | 0 | } |
239 | | |
240 | | Status TransactionBaseImpl::Get(const ReadOptions& _read_options, |
241 | | ColumnFamilyHandle* column_family, |
242 | 0 | const Slice& key, std::string* value) { |
243 | 0 | if (_read_options.io_activity != Env::IOActivity::kUnknown && |
244 | 0 | _read_options.io_activity != Env::IOActivity::kGet) { |
245 | 0 | return Status::InvalidArgument( |
246 | 0 | "Can only call Get with `ReadOptions::io_activity` is " |
247 | 0 | "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`"); |
248 | 0 | } |
249 | 0 | ReadOptions read_options(_read_options); |
250 | 0 | if (read_options.io_activity == Env::IOActivity::kUnknown) { |
251 | 0 | read_options.io_activity = Env::IOActivity::kGet; |
252 | 0 | } |
253 | 0 | auto s = GetImpl(read_options, column_family, key, value); |
254 | 0 | return s; |
255 | 0 | } |
256 | | |
257 | | Status TransactionBaseImpl::GetImpl(const ReadOptions& read_options, |
258 | | ColumnFamilyHandle* column_family, |
259 | 0 | const Slice& key, std::string* value) { |
260 | 0 | assert(value != nullptr); |
261 | 0 | PinnableSlice pinnable_val(value); |
262 | 0 | assert(!pinnable_val.IsPinned()); |
263 | 0 | auto s = GetImpl(read_options, column_family, key, &pinnable_val); |
264 | 0 | if (s.ok() && pinnable_val.IsPinned()) { |
265 | 0 | value->assign(pinnable_val.data(), pinnable_val.size()); |
266 | 0 | } // else value is already assigned |
267 | 0 | return s; |
268 | 0 | } |
269 | | |
270 | | Status TransactionBaseImpl::Get(const ReadOptions& _read_options, |
271 | | ColumnFamilyHandle* column_family, |
272 | 0 | const Slice& key, PinnableSlice* pinnable_val) { |
273 | 0 | if (_read_options.io_activity != Env::IOActivity::kUnknown && |
274 | 0 | _read_options.io_activity != Env::IOActivity::kGet) { |
275 | 0 | return Status::InvalidArgument( |
276 | 0 | "Can only call Get with `ReadOptions::io_activity` is " |
277 | 0 | "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`"); |
278 | 0 | } |
279 | 0 | ReadOptions read_options(_read_options); |
280 | 0 | if (read_options.io_activity == Env::IOActivity::kUnknown) { |
281 | 0 | read_options.io_activity = Env::IOActivity::kGet; |
282 | 0 | } |
283 | 0 | return GetImpl(read_options, column_family, key, pinnable_val); |
284 | 0 | } |
285 | | |
286 | | Status TransactionBaseImpl::GetImpl(const ReadOptions& read_options, |
287 | | ColumnFamilyHandle* column_family, |
288 | | const Slice& key, |
289 | 0 | PinnableSlice* pinnable_val) { |
290 | 0 | return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key, |
291 | 0 | pinnable_val); |
292 | 0 | } |
293 | | |
294 | | Status TransactionBaseImpl::GetEntity(const ReadOptions& read_options, |
295 | | ColumnFamilyHandle* column_family, |
296 | | const Slice& key, |
297 | 0 | PinnableWideColumns* columns) { |
298 | 0 | return GetEntityImpl(read_options, column_family, key, columns); |
299 | 0 | } |
300 | | |
301 | | Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, |
302 | | ColumnFamilyHandle* column_family, |
303 | | const Slice& key, std::string* value, |
304 | | bool exclusive, |
305 | 0 | const bool do_validate) { |
306 | 0 | if (!do_validate && read_options.snapshot != nullptr) { |
307 | 0 | return Status::InvalidArgument( |
308 | 0 | "If do_validate is false then GetForUpdate with snapshot is not " |
309 | 0 | "defined."); |
310 | 0 | } |
311 | 0 | if (read_options.io_activity != Env::IOActivity::kUnknown) { |
312 | 0 | return Status::InvalidArgument( |
313 | 0 | "Cannot call GetForUpdate with `ReadOptions::io_activity` != " |
314 | 0 | "`Env::IOActivity::kUnknown`"); |
315 | 0 | } |
316 | 0 | Status s = |
317 | 0 | TryLock(column_family, key, true /* read_only */, exclusive, do_validate); |
318 | |
|
319 | 0 | if (s.ok() && value != nullptr) { |
320 | 0 | assert(value != nullptr); |
321 | 0 | PinnableSlice pinnable_val(value); |
322 | 0 | assert(!pinnable_val.IsPinned()); |
323 | 0 | s = GetImpl(read_options, column_family, key, &pinnable_val); |
324 | 0 | if (s.ok() && pinnable_val.IsPinned()) { |
325 | 0 | value->assign(pinnable_val.data(), pinnable_val.size()); |
326 | 0 | } // else value is already assigned |
327 | 0 | } |
328 | 0 | return s; |
329 | 0 | } |
330 | | |
331 | | Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, |
332 | | ColumnFamilyHandle* column_family, |
333 | | const Slice& key, |
334 | | PinnableSlice* pinnable_val, |
335 | | bool exclusive, |
336 | 0 | const bool do_validate) { |
337 | 0 | if (!do_validate && read_options.snapshot != nullptr) { |
338 | 0 | return Status::InvalidArgument( |
339 | 0 | "If do_validate is false then GetForUpdate with snapshot is not " |
340 | 0 | "defined."); |
341 | 0 | } |
342 | 0 | if (read_options.io_activity != Env::IOActivity::kUnknown) { |
343 | 0 | return Status::InvalidArgument( |
344 | 0 | "Cannot call GetForUpdate with `ReadOptions::io_activity` != " |
345 | 0 | "`Env::IOActivity::kUnknown`"); |
346 | 0 | } |
347 | 0 | Status s = |
348 | 0 | TryLock(column_family, key, true /* read_only */, exclusive, do_validate); |
349 | |
|
350 | 0 | if (s.ok() && pinnable_val != nullptr) { |
351 | 0 | s = GetImpl(read_options, column_family, key, pinnable_val); |
352 | 0 | } |
353 | 0 | return s; |
354 | 0 | } |
355 | | |
356 | | Status TransactionBaseImpl::GetEntityForUpdate( |
357 | | const ReadOptions& read_options, ColumnFamilyHandle* column_family, |
358 | | const Slice& key, PinnableWideColumns* columns, bool exclusive, |
359 | 0 | bool do_validate) { |
360 | 0 | if (!do_validate && read_options.snapshot != nullptr) { |
361 | 0 | return Status::InvalidArgument( |
362 | 0 | "Snapshot must not be set if validation is disabled"); |
363 | 0 | } |
364 | | |
365 | 0 | const Status s = |
366 | 0 | TryLock(column_family, key, true /* read_only */, exclusive, do_validate); |
367 | 0 | if (!s.ok()) { |
368 | 0 | return s; |
369 | 0 | } |
370 | | |
371 | 0 | return GetEntityImpl(read_options, column_family, key, columns); |
372 | 0 | } |
373 | | |
374 | | std::vector<Status> TransactionBaseImpl::MultiGet( |
375 | | const ReadOptions& _read_options, |
376 | | const std::vector<ColumnFamilyHandle*>& column_family, |
377 | 0 | const std::vector<Slice>& keys, std::vector<std::string>* values) { |
378 | 0 | size_t num_keys = keys.size(); |
379 | 0 | std::vector<Status> stat_list(num_keys); |
380 | 0 | if (_read_options.io_activity != Env::IOActivity::kUnknown && |
381 | 0 | _read_options.io_activity != Env::IOActivity::kMultiGet) { |
382 | 0 | Status s = Status::InvalidArgument( |
383 | 0 | "Can only call MultiGet with `ReadOptions::io_activity` is " |
384 | 0 | "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`"); |
385 | |
|
386 | 0 | for (size_t i = 0; i < num_keys; ++i) { |
387 | 0 | stat_list[i] = s; |
388 | 0 | } |
389 | 0 | return stat_list; |
390 | 0 | } |
391 | 0 | ReadOptions read_options(_read_options); |
392 | 0 | if (read_options.io_activity == Env::IOActivity::kUnknown) { |
393 | 0 | read_options.io_activity = Env::IOActivity::kMultiGet; |
394 | 0 | } |
395 | |
|
396 | 0 | values->resize(num_keys); |
397 | 0 | for (size_t i = 0; i < num_keys; ++i) { |
398 | 0 | stat_list[i] = |
399 | 0 | GetImpl(read_options, column_family[i], keys[i], &(*values)[i]); |
400 | 0 | } |
401 | |
|
402 | 0 | return stat_list; |
403 | 0 | } |
404 | | |
405 | | void TransactionBaseImpl::MultiGet(const ReadOptions& _read_options, |
406 | | ColumnFamilyHandle* column_family, |
407 | | const size_t num_keys, const Slice* keys, |
408 | | PinnableSlice* values, Status* statuses, |
409 | 0 | const bool sorted_input) { |
410 | 0 | if (_read_options.io_activity != Env::IOActivity::kUnknown && |
411 | 0 | _read_options.io_activity != Env::IOActivity::kMultiGet) { |
412 | 0 | Status s = Status::InvalidArgument( |
413 | 0 | "Can only call MultiGet with `ReadOptions::io_activity` is " |
414 | 0 | "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`"); |
415 | 0 | for (size_t i = 0; i < num_keys; ++i) { |
416 | 0 | if (statuses[i].ok()) { |
417 | 0 | statuses[i] = s; |
418 | 0 | } |
419 | 0 | } |
420 | 0 | return; |
421 | 0 | } |
422 | 0 | ReadOptions read_options(_read_options); |
423 | 0 | if (read_options.io_activity == Env::IOActivity::kUnknown) { |
424 | 0 | read_options.io_activity = Env::IOActivity::kMultiGet; |
425 | 0 | } |
426 | 0 | write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family, |
427 | 0 | num_keys, keys, values, statuses, |
428 | 0 | sorted_input); |
429 | 0 | } |
430 | | |
431 | | void TransactionBaseImpl::MultiGetEntity(const ReadOptions& read_options, |
432 | | ColumnFamilyHandle* column_family, |
433 | | size_t num_keys, const Slice* keys, |
434 | | PinnableWideColumns* results, |
435 | 0 | Status* statuses, bool sorted_input) { |
436 | 0 | MultiGetEntityImpl(read_options, column_family, num_keys, keys, results, |
437 | 0 | statuses, sorted_input); |
438 | 0 | } |
439 | | |
440 | | std::vector<Status> TransactionBaseImpl::MultiGetForUpdate( |
441 | | const ReadOptions& read_options, |
442 | | const std::vector<ColumnFamilyHandle*>& column_family, |
443 | 0 | const std::vector<Slice>& keys, std::vector<std::string>* values) { |
444 | 0 | size_t num_keys = keys.size(); |
445 | 0 | if (read_options.io_activity != Env::IOActivity::kUnknown) { |
446 | 0 | Status s = Status::InvalidArgument( |
447 | 0 | "Cannot call MultiGetForUpdate with `ReadOptions::io_activity` != " |
448 | 0 | "`Env::IOActivity::kUnknown`"); |
449 | 0 | return std::vector<Status>(num_keys, s); |
450 | 0 | } |
451 | | // Regardless of whether the MultiGet succeeded, track these keys. |
452 | 0 | values->resize(num_keys); |
453 | | |
454 | | // Lock all keys |
455 | 0 | for (size_t i = 0; i < num_keys; ++i) { |
456 | 0 | Status s = TryLock(column_family[i], keys[i], true /* read_only */, |
457 | 0 | true /* exclusive */); |
458 | 0 | if (!s.ok()) { |
459 | | // Fail entire multiget if we cannot lock all keys |
460 | 0 | return std::vector<Status>(num_keys, s); |
461 | 0 | } |
462 | 0 | } |
463 | | |
464 | | // TODO(agiardullo): optimize multiget? |
465 | 0 | std::vector<Status> stat_list(num_keys); |
466 | 0 | for (size_t i = 0; i < num_keys; ++i) { |
467 | 0 | stat_list[i] = |
468 | 0 | GetImpl(read_options, column_family[i], keys[i], &(*values)[i]); |
469 | 0 | } |
470 | |
|
471 | 0 | return stat_list; |
472 | 0 | } |
473 | | |
474 | 0 | Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) { |
475 | 0 | Iterator* db_iter = db_->NewIterator(read_options); |
476 | 0 | assert(db_iter); |
477 | |
|
478 | 0 | return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter, |
479 | 0 | &read_options); |
480 | 0 | } |
481 | | |
482 | | Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, |
483 | 0 | ColumnFamilyHandle* column_family) { |
484 | 0 | Iterator* db_iter = db_->NewIterator(read_options, column_family); |
485 | 0 | assert(db_iter); |
486 | |
|
487 | 0 | return write_batch_.NewIteratorWithBase(column_family, db_iter, |
488 | 0 | &read_options); |
489 | 0 | } |
490 | | |
491 | | template <typename IterType, typename ImplType, typename ErrorIteratorFuncType> |
492 | | std::unique_ptr<IterType> TransactionBaseImpl::NewMultiCfIterator( |
493 | | const ReadOptions& read_options, |
494 | | const std::vector<ColumnFamilyHandle*>& column_families, |
495 | 0 | ErrorIteratorFuncType error_iterator_func) { |
496 | 0 | if (column_families.empty()) { |
497 | 0 | return error_iterator_func( |
498 | 0 | Status::InvalidArgument("No Column Family was provided")); |
499 | 0 | } |
500 | | |
501 | 0 | const Comparator* const first_comparator = |
502 | 0 | column_families[0]->GetComparator(); |
503 | 0 | assert(first_comparator); |
504 | |
|
505 | 0 | for (size_t i = 1; i < column_families.size(); ++i) { |
506 | 0 | const Comparator* cf_comparator = column_families[i]->GetComparator(); |
507 | 0 | assert(cf_comparator); |
508 | |
|
509 | 0 | if (first_comparator != cf_comparator && |
510 | 0 | first_comparator->GetId() != cf_comparator->GetId()) { |
511 | 0 | return error_iterator_func(Status::InvalidArgument( |
512 | 0 | "Different comparators are being used across CFs")); |
513 | 0 | } |
514 | 0 | } |
515 | | |
516 | 0 | std::vector<Iterator*> child_iterators; |
517 | 0 | const Status s = |
518 | 0 | db_->NewIterators(read_options, column_families, &child_iterators); |
519 | 0 | if (!s.ok()) { |
520 | 0 | return error_iterator_func(s); |
521 | 0 | } |
522 | | |
523 | 0 | assert(column_families.size() == child_iterators.size()); |
524 | |
|
525 | 0 | std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>> |
526 | 0 | cfh_iter_pairs; |
527 | 0 | cfh_iter_pairs.reserve(column_families.size()); |
528 | 0 | for (size_t i = 0; i < column_families.size(); ++i) { |
529 | 0 | cfh_iter_pairs.emplace_back( |
530 | 0 | column_families[i], |
531 | 0 | write_batch_.NewIteratorWithBase(column_families[i], child_iterators[i], |
532 | 0 | &read_options)); |
533 | 0 | } |
534 | |
|
535 | 0 | return std::make_unique<ImplType>(read_options, |
536 | 0 | column_families[0]->GetComparator(), |
537 | 0 | std::move(cfh_iter_pairs)); |
538 | 0 | } Unexecuted instantiation: transaction_base.cc:std::__1::unique_ptr<rocksdb::Iterator, std::__1::default_delete<rocksdb::Iterator> > rocksdb::TransactionBaseImpl::NewMultiCfIterator<rocksdb::Iterator, rocksdb::CoalescingIterator, rocksdb::TransactionBaseImpl::GetCoalescingIterator(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&)::$_0>(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&, rocksdb::TransactionBaseImpl::GetCoalescingIterator(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&)::$_0) Unexecuted instantiation: transaction_base.cc:std::__1::unique_ptr<rocksdb::AttributeGroupIterator, std::__1::default_delete<rocksdb::AttributeGroupIterator> > rocksdb::TransactionBaseImpl::NewMultiCfIterator<rocksdb::AttributeGroupIterator, rocksdb::AttributeGroupIteratorImpl, rocksdb::TransactionBaseImpl::GetAttributeGroupIterator(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&)::$_0>(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&, rocksdb::TransactionBaseImpl::GetAttributeGroupIterator(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&)::$_0) |
539 | | |
540 | | std::unique_ptr<Iterator> TransactionBaseImpl::GetCoalescingIterator( |
541 | | const ReadOptions& read_options, |
542 | 0 | const std::vector<ColumnFamilyHandle*>& column_families) { |
543 | 0 | return NewMultiCfIterator<Iterator, CoalescingIterator>( |
544 | 0 | read_options, column_families, [](const Status& s) { |
545 | 0 | return std::unique_ptr<Iterator>(NewErrorIterator(s)); |
546 | 0 | }); |
547 | 0 | } |
548 | | |
549 | | std::unique_ptr<AttributeGroupIterator> |
550 | | TransactionBaseImpl::GetAttributeGroupIterator( |
551 | | const ReadOptions& read_options, |
552 | 0 | const std::vector<ColumnFamilyHandle*>& column_families) { |
553 | 0 | return NewMultiCfIterator<AttributeGroupIterator, AttributeGroupIteratorImpl>( |
554 | 0 | read_options, column_families, |
555 | 0 | [](const Status& s) { return NewAttributeGroupErrorIterator(s); }); |
556 | 0 | } |
557 | | |
558 | | Status TransactionBaseImpl::PutEntityImpl(ColumnFamilyHandle* column_family, |
559 | | const Slice& key, |
560 | | const WideColumns& columns, |
561 | | bool do_validate, |
562 | 0 | bool assume_tracked) { |
563 | 0 | { |
564 | 0 | constexpr bool read_only = false; |
565 | 0 | constexpr bool exclusive = true; |
566 | 0 | const Status s = TryLock(column_family, key, read_only, exclusive, |
567 | 0 | do_validate, assume_tracked); |
568 | 0 | if (!s.ok()) { |
569 | 0 | return s; |
570 | 0 | } |
571 | 0 | } |
572 | | |
573 | 0 | { |
574 | 0 | const Status s = GetBatchForWrite()->PutEntity(column_family, key, columns); |
575 | 0 | if (!s.ok()) { |
576 | 0 | return s; |
577 | 0 | } |
578 | 0 | } |
579 | | |
580 | 0 | ++num_put_entities_; |
581 | 0 | return Status::OK(); |
582 | 0 | } |
583 | | |
584 | | Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, |
585 | | const Slice& key, const Slice& value, |
586 | 0 | const bool assume_tracked) { |
587 | 0 | const bool do_validate = !assume_tracked; |
588 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
589 | 0 | true /* exclusive */, do_validate, assume_tracked); |
590 | |
|
591 | 0 | if (s.ok()) { |
592 | 0 | s = GetBatchForWrite()->Put(column_family, key, value); |
593 | 0 | if (s.ok()) { |
594 | 0 | num_puts_++; |
595 | 0 | } |
596 | 0 | } |
597 | |
|
598 | 0 | return s; |
599 | 0 | } |
600 | | |
601 | | Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, |
602 | | const SliceParts& key, const SliceParts& value, |
603 | 0 | const bool assume_tracked) { |
604 | 0 | const bool do_validate = !assume_tracked; |
605 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
606 | 0 | true /* exclusive */, do_validate, assume_tracked); |
607 | |
|
608 | 0 | if (s.ok()) { |
609 | 0 | s = GetBatchForWrite()->Put(column_family, key, value); |
610 | 0 | if (s.ok()) { |
611 | 0 | num_puts_++; |
612 | 0 | } |
613 | 0 | } |
614 | |
|
615 | 0 | return s; |
616 | 0 | } |
617 | | |
618 | | Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, |
619 | | const Slice& key, const Slice& value, |
620 | 0 | const bool assume_tracked) { |
621 | 0 | const bool do_validate = !assume_tracked; |
622 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
623 | 0 | true /* exclusive */, do_validate, assume_tracked); |
624 | |
|
625 | 0 | if (s.ok()) { |
626 | 0 | s = GetBatchForWrite()->Merge(column_family, key, value); |
627 | 0 | if (s.ok()) { |
628 | 0 | num_merges_++; |
629 | 0 | } |
630 | 0 | } |
631 | |
|
632 | 0 | return s; |
633 | 0 | } |
634 | | |
635 | | Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, |
636 | | const Slice& key, |
637 | 0 | const bool assume_tracked) { |
638 | 0 | const bool do_validate = !assume_tracked; |
639 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
640 | 0 | true /* exclusive */, do_validate, assume_tracked); |
641 | |
|
642 | 0 | if (s.ok()) { |
643 | 0 | s = GetBatchForWrite()->Delete(column_family, key); |
644 | 0 | if (s.ok()) { |
645 | 0 | num_deletes_++; |
646 | 0 | } |
647 | 0 | } |
648 | |
|
649 | 0 | return s; |
650 | 0 | } |
651 | | |
652 | | Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, |
653 | | const SliceParts& key, |
654 | 0 | const bool assume_tracked) { |
655 | 0 | const bool do_validate = !assume_tracked; |
656 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
657 | 0 | true /* exclusive */, do_validate, assume_tracked); |
658 | |
|
659 | 0 | if (s.ok()) { |
660 | 0 | s = GetBatchForWrite()->Delete(column_family, key); |
661 | 0 | if (s.ok()) { |
662 | 0 | num_deletes_++; |
663 | 0 | } |
664 | 0 | } |
665 | |
|
666 | 0 | return s; |
667 | 0 | } |
668 | | |
669 | | Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, |
670 | | const Slice& key, |
671 | 0 | const bool assume_tracked) { |
672 | 0 | const bool do_validate = !assume_tracked; |
673 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
674 | 0 | true /* exclusive */, do_validate, assume_tracked); |
675 | |
|
676 | 0 | if (s.ok()) { |
677 | 0 | s = GetBatchForWrite()->SingleDelete(column_family, key); |
678 | 0 | if (s.ok()) { |
679 | 0 | num_deletes_++; |
680 | 0 | } |
681 | 0 | } |
682 | |
|
683 | 0 | return s; |
684 | 0 | } |
685 | | |
686 | | Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, |
687 | | const SliceParts& key, |
688 | 0 | const bool assume_tracked) { |
689 | 0 | const bool do_validate = !assume_tracked; |
690 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
691 | 0 | true /* exclusive */, do_validate, assume_tracked); |
692 | |
|
693 | 0 | if (s.ok()) { |
694 | 0 | s = GetBatchForWrite()->SingleDelete(column_family, key); |
695 | 0 | if (s.ok()) { |
696 | 0 | num_deletes_++; |
697 | 0 | } |
698 | 0 | } |
699 | |
|
700 | 0 | return s; |
701 | 0 | } |
702 | | |
703 | | Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, |
704 | 0 | const Slice& key, const Slice& value) { |
705 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
706 | 0 | true /* exclusive */, false /* do_validate */); |
707 | |
|
708 | 0 | if (s.ok()) { |
709 | 0 | s = GetBatchForWrite()->Put(column_family, key, value); |
710 | 0 | if (s.ok()) { |
711 | 0 | num_puts_++; |
712 | 0 | } |
713 | 0 | } |
714 | |
|
715 | 0 | return s; |
716 | 0 | } |
717 | | |
718 | | Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, |
719 | | const SliceParts& key, |
720 | 0 | const SliceParts& value) { |
721 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
722 | 0 | true /* exclusive */, false /* do_validate */); |
723 | |
|
724 | 0 | if (s.ok()) { |
725 | 0 | s = GetBatchForWrite()->Put(column_family, key, value); |
726 | 0 | if (s.ok()) { |
727 | 0 | num_puts_++; |
728 | 0 | } |
729 | 0 | } |
730 | |
|
731 | 0 | return s; |
732 | 0 | } |
733 | | |
734 | | Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, |
735 | | const Slice& key, |
736 | 0 | const Slice& value) { |
737 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
738 | 0 | true /* exclusive */, false /* do_validate */); |
739 | |
|
740 | 0 | if (s.ok()) { |
741 | 0 | s = GetBatchForWrite()->Merge(column_family, key, value); |
742 | 0 | if (s.ok()) { |
743 | 0 | num_merges_++; |
744 | 0 | } |
745 | 0 | } |
746 | |
|
747 | 0 | return s; |
748 | 0 | } |
749 | | |
750 | | Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, |
751 | 0 | const Slice& key) { |
752 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
753 | 0 | true /* exclusive */, false /* do_validate */); |
754 | |
|
755 | 0 | if (s.ok()) { |
756 | 0 | s = GetBatchForWrite()->Delete(column_family, key); |
757 | 0 | if (s.ok()) { |
758 | 0 | num_deletes_++; |
759 | 0 | } |
760 | 0 | } |
761 | |
|
762 | 0 | return s; |
763 | 0 | } |
764 | | |
765 | | Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, |
766 | 0 | const SliceParts& key) { |
767 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
768 | 0 | true /* exclusive */, false /* do_validate */); |
769 | |
|
770 | 0 | if (s.ok()) { |
771 | 0 | s = GetBatchForWrite()->Delete(column_family, key); |
772 | 0 | if (s.ok()) { |
773 | 0 | num_deletes_++; |
774 | 0 | } |
775 | 0 | } |
776 | |
|
777 | 0 | return s; |
778 | 0 | } |
779 | | |
780 | | Status TransactionBaseImpl::SingleDeleteUntracked( |
781 | 0 | ColumnFamilyHandle* column_family, const Slice& key) { |
782 | 0 | Status s = TryLock(column_family, key, false /* read_only */, |
783 | 0 | true /* exclusive */, false /* do_validate */); |
784 | |
|
785 | 0 | if (s.ok()) { |
786 | 0 | s = GetBatchForWrite()->SingleDelete(column_family, key); |
787 | 0 | if (s.ok()) { |
788 | 0 | num_deletes_++; |
789 | 0 | } |
790 | 0 | } |
791 | |
|
792 | 0 | return s; |
793 | 0 | } |
794 | | |
795 | 0 | void TransactionBaseImpl::PutLogData(const Slice& blob) { |
796 | 0 | auto s = write_batch_.PutLogData(blob); |
797 | 0 | (void)s; |
798 | 0 | assert(s.ok()); |
799 | 0 | } |
800 | | |
801 | 0 | WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() { |
802 | 0 | return &write_batch_; |
803 | 0 | } |
804 | | |
805 | 0 | uint64_t TransactionBaseImpl::GetElapsedTime() const { |
806 | 0 | return (dbimpl_->GetSystemClock()->NowMicros() - start_time_) / 1000; |
807 | 0 | } |
808 | | |
809 | 0 | uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; } |
810 | | |
811 | 0 | uint64_t TransactionBaseImpl::GetNumPutEntities() const { |
812 | 0 | return num_put_entities_; |
813 | 0 | } |
814 | | |
815 | 0 | uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; } |
816 | | |
817 | 0 | uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; } |
818 | | |
819 | 0 | uint64_t TransactionBaseImpl::GetNumKeys() const { |
820 | 0 | return tracked_locks_->GetNumPointLocks(); |
821 | 0 | } |
822 | | |
823 | | void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key, |
824 | | SequenceNumber seq, bool read_only, |
825 | 0 | bool exclusive) { |
826 | 0 | PointLockRequest r; |
827 | 0 | r.column_family_id = cfh_id; |
828 | 0 | r.key = key; |
829 | 0 | r.seq = seq; |
830 | 0 | r.read_only = read_only; |
831 | 0 | r.exclusive = exclusive; |
832 | | |
833 | | // Update map of all tracked keys for this transaction |
834 | 0 | tracked_locks_->Track(r); |
835 | |
|
836 | 0 | if (save_points_ != nullptr && !save_points_->empty()) { |
837 | | // Update map of tracked keys in this SavePoint |
838 | 0 | save_points_->top().new_locks_->Track(r); |
839 | 0 | } |
840 | 0 | } |
841 | | |
842 | | // Gets the write batch that should be used for Put/PutEntity/Merge/Delete |
843 | | // operations. |
844 | | // |
845 | | // Returns either a WriteBatch or WriteBatchWithIndex depending on whether |
846 | | // DisableIndexing() has been called. |
847 | 0 | WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() { |
848 | 0 | if (indexing_enabled_) { |
849 | | // Use WriteBatchWithIndex |
850 | 0 | return &write_batch_; |
851 | 0 | } else { |
852 | | // Don't use WriteBatchWithIndex. Return base WriteBatch. |
853 | 0 | return write_batch_.GetWriteBatch(); |
854 | 0 | } |
855 | 0 | } |
856 | | |
857 | 0 | void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) { |
858 | 0 | if (snapshot != nullptr) { |
859 | 0 | ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log, |
860 | 0 | "ReleaseSnapshot %" PRIu64 " Set", |
861 | 0 | snapshot->GetSequenceNumber()); |
862 | 0 | db->ReleaseSnapshot(snapshot); |
863 | 0 | } |
864 | 0 | } |
865 | | |
866 | | void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family, |
867 | 0 | const Slice& key) { |
868 | 0 | PointLockRequest r; |
869 | 0 | r.column_family_id = GetColumnFamilyID(column_family); |
870 | 0 | r.key = key.ToString(); |
871 | 0 | r.read_only = true; |
872 | |
|
873 | 0 | bool can_untrack = false; |
874 | 0 | if (save_points_ != nullptr && !save_points_->empty()) { |
875 | | // If there is no GetForUpdate of the key in this save point, |
876 | | // then cannot untrack from the global lock tracker. |
877 | 0 | UntrackStatus s = save_points_->top().new_locks_->Untrack(r); |
878 | 0 | can_untrack = (s != UntrackStatus::NOT_TRACKED); |
879 | 0 | } else { |
880 | | // No save point, so can untrack from the global lock tracker. |
881 | 0 | can_untrack = true; |
882 | 0 | } |
883 | |
|
884 | 0 | if (can_untrack) { |
885 | | // If erased from the global tracker, then can unlock the key. |
886 | 0 | UntrackStatus s = tracked_locks_->Untrack(r); |
887 | 0 | bool can_unlock = (s == UntrackStatus::REMOVED); |
888 | 0 | if (can_unlock) { |
889 | 0 | UnlockGetForUpdate(column_family, key); |
890 | 0 | } |
891 | 0 | } |
892 | 0 | } |
893 | | |
894 | 0 | Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) { |
895 | 0 | struct IndexedWriteBatchBuilder : public WriteBatch::Handler { |
896 | 0 | Transaction* txn_; |
897 | 0 | DBImpl* db_; |
898 | 0 | IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db) |
899 | 0 | : txn_(txn), db_(db) { |
900 | 0 | assert(dynamic_cast<TransactionBaseImpl*>(txn_) != nullptr); |
901 | 0 | } |
902 | |
|
903 | 0 | Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override { |
904 | 0 | Slice user_key = GetUserKey(cf, key); |
905 | 0 | return txn_->Put(db_->GetColumnFamilyHandle(cf), user_key, val); |
906 | 0 | } |
907 | |
|
908 | 0 | Status PutEntityCF(uint32_t cf, const Slice& key, |
909 | 0 | const Slice& entity) override { |
910 | 0 | Slice user_key = GetUserKey(cf, key); |
911 | 0 | Slice entity_copy = entity; |
912 | 0 | WideColumns columns; |
913 | 0 | const Status s = |
914 | 0 | WideColumnSerialization::Deserialize(entity_copy, columns); |
915 | 0 | if (!s.ok()) { |
916 | 0 | return s; |
917 | 0 | } |
918 | | |
919 | 0 | return txn_->PutEntity(db_->GetColumnFamilyHandle(cf), user_key, columns); |
920 | 0 | } |
921 | |
|
922 | 0 | Status DeleteCF(uint32_t cf, const Slice& key) override { |
923 | 0 | Slice user_key = GetUserKey(cf, key); |
924 | 0 | return txn_->Delete(db_->GetColumnFamilyHandle(cf), user_key); |
925 | 0 | } |
926 | |
|
927 | 0 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { |
928 | 0 | Slice user_key = GetUserKey(cf, key); |
929 | 0 | return txn_->SingleDelete(db_->GetColumnFamilyHandle(cf), user_key); |
930 | 0 | } |
931 | |
|
932 | 0 | Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override { |
933 | 0 | Slice user_key = GetUserKey(cf, key); |
934 | 0 | return txn_->Merge(db_->GetColumnFamilyHandle(cf), user_key, val); |
935 | 0 | } |
936 | | |
937 | | // this is used for reconstructing prepared transactions upon |
938 | | // recovery. there should not be any meta markers in the batches |
939 | | // we are processing. |
940 | 0 | Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } |
941 | |
|
942 | 0 | Status MarkEndPrepare(const Slice&) override { |
943 | 0 | return Status::InvalidArgument(); |
944 | 0 | } |
945 | |
|
946 | 0 | Status MarkCommit(const Slice&) override { |
947 | 0 | return Status::InvalidArgument(); |
948 | 0 | } |
949 | |
|
950 | 0 | Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { |
951 | 0 | return Status::InvalidArgument(); |
952 | 0 | } |
953 | |
|
954 | 0 | Status MarkRollback(const Slice&) override { |
955 | 0 | return Status::InvalidArgument(); |
956 | 0 | } |
957 | 0 | size_t GetTimestampSize(uint32_t cf_id) { |
958 | 0 | auto cfd = db_->versions_->GetColumnFamilySet()->GetColumnFamily(cf_id); |
959 | 0 | const Comparator* ucmp = cfd->user_comparator(); |
960 | 0 | assert(ucmp); |
961 | 0 | return ucmp->timestamp_size(); |
962 | 0 | } |
963 | |
|
964 | 0 | Slice GetUserKey(uint32_t cf_id, const Slice& key) { |
965 | 0 | size_t ts_sz = GetTimestampSize(cf_id); |
966 | 0 | if (ts_sz == 0) { |
967 | 0 | return key; |
968 | 0 | } |
969 | 0 | assert(key.size() >= ts_sz); |
970 | 0 | return Slice(key.data(), key.size() - ts_sz); |
971 | 0 | } |
972 | 0 | }; |
973 | |
|
974 | 0 | IndexedWriteBatchBuilder copycat(this, dbimpl_); |
975 | 0 | return src_batch->Iterate(©cat); |
976 | 0 | } |
977 | | |
978 | 0 | WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() { |
979 | 0 | return &commit_time_batch_; |
980 | 0 | } |
981 | | } // namespace ROCKSDB_NAMESPACE |