Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/transaction_base.cc
Line
Count
Source
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "utilities/transactions/transaction_base.h"
7
8
#include <cinttypes>
9
10
#include "db/attribute_group_iterator_impl.h"
11
#include "db/coalescing_iterator.h"
12
#include "db/column_family.h"
13
#include "db/db_impl/db_impl.h"
14
#include "logging/logging.h"
15
#include "rocksdb/comparator.h"
16
#include "rocksdb/db.h"
17
#include "rocksdb/status.h"
18
#include "util/cast_util.h"
19
#include "util/string_util.h"
20
#include "utilities/transactions/lock/lock_tracker.h"
21
22
namespace ROCKSDB_NAMESPACE {
23
24
Status Transaction::CommitAndTryCreateSnapshot(
25
    std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts,
26
0
    std::shared_ptr<const Snapshot>* snapshot) {
27
0
  if (snapshot) {
28
0
    snapshot->reset();
29
0
  }
30
0
  TxnTimestamp commit_ts = GetCommitTimestamp();
31
0
  if (commit_ts == kMaxTxnTimestamp) {
32
0
    if (ts == kMaxTxnTimestamp) {
33
0
      return Status::InvalidArgument("Commit timestamp unset");
34
0
    } else {
35
0
      const Status s = SetCommitTimestamp(ts);
36
0
      if (!s.ok()) {
37
0
        return s;
38
0
      }
39
0
    }
40
0
  } else if (ts != kMaxTxnTimestamp) {
41
0
    if (ts != commit_ts) {
42
      // For now we treat this as error.
43
0
      return Status::InvalidArgument("Different commit ts specified");
44
0
    }
45
0
  }
46
0
  SetSnapshotOnNextOperation(notifier);
47
0
  Status s = Commit();
48
0
  if (!s.ok()) {
49
0
    return s;
50
0
  }
51
0
  assert(s.ok());
52
  // If we reach here, we must return ok status for this function.
53
0
  std::shared_ptr<const Snapshot> new_snapshot = GetTimestampedSnapshot();
54
55
0
  if (snapshot) {
56
0
    *snapshot = new_snapshot;
57
0
  }
58
0
  return Status::OK();
59
0
}
60
61
TransactionBaseImpl::TransactionBaseImpl(
62
    DB* db, const WriteOptions& write_options,
63
    const LockTrackerFactory& lock_tracker_factory)
64
0
    : db_(db),
65
0
      dbimpl_(static_cast_with_check<DBImpl>(db)),
66
0
      write_options_(write_options),
67
0
      cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
68
0
      lock_tracker_factory_(lock_tracker_factory),
69
0
      start_time_(dbimpl_->GetSystemClock()->NowMicros()),
70
0
      write_batch_(cmp_, 0, true, 0, write_options.protection_bytes_per_key),
71
0
      tracked_locks_(lock_tracker_factory_.Create()),
72
0
      commit_time_batch_(0 /* reserved_bytes */, 0 /* max_bytes */,
73
0
                         write_options.protection_bytes_per_key,
74
0
                         0 /* default_cf_ts_sz */),
75
0
      indexing_enabled_(true) {
76
0
  assert(dynamic_cast<DBImpl*>(db_) != nullptr);
77
0
  log_number_ = 0;
78
0
  if (dbimpl_->allow_2pc()) {
79
0
    InitWriteBatch();
80
0
  }
81
0
}
82
83
0
TransactionBaseImpl::~TransactionBaseImpl() {
84
  // Release snapshot if snapshot is set
85
0
  SetSnapshotInternal(nullptr);
86
0
}
87
88
0
void TransactionBaseImpl::Clear() {
89
0
  save_points_.reset(nullptr);
90
0
  write_batch_.Clear();
91
0
  commit_time_batch_.Clear();
92
0
  tracked_locks_->Clear();
93
0
  num_puts_ = 0;
94
0
  num_put_entities_ = 0;
95
0
  num_deletes_ = 0;
96
0
  num_merges_ = 0;
97
98
0
  if (dbimpl_->allow_2pc()) {
99
0
    InitWriteBatch();
100
0
  }
101
0
}
102
103
void TransactionBaseImpl::Reinitialize(DB* db,
104
0
                                       const WriteOptions& write_options) {
105
0
  Clear();
106
0
  ClearSnapshot();
107
0
  id_ = 0;
108
0
  db_ = db;
109
0
  name_.clear();
110
0
  log_number_ = 0;
111
0
  write_options_ = write_options;
112
0
  start_time_ = dbimpl_->GetSystemClock()->NowMicros();
113
0
  indexing_enabled_ = true;
114
0
  cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
115
0
  WriteBatchInternal::SetDefaultColumnFamilyTimestampSize(
116
0
      write_batch_.GetWriteBatch(), cmp_->timestamp_size());
117
0
  WriteBatchInternal::UpdateProtectionInfo(
118
0
      write_batch_.GetWriteBatch(), write_options_.protection_bytes_per_key)
119
0
      .PermitUncheckedError();
120
0
  WriteBatchInternal::UpdateProtectionInfo(
121
0
      &commit_time_batch_, write_options_.protection_bytes_per_key)
122
0
      .PermitUncheckedError();
123
0
}
124
125
0
void TransactionBaseImpl::SetSnapshot() {
126
0
  const Snapshot* snapshot = dbimpl_->GetSnapshotForWriteConflictBoundary();
127
0
  SetSnapshotInternal(snapshot);
128
0
}
129
130
0
void TransactionBaseImpl::SetSnapshotInternal(const Snapshot* snapshot) {
131
  // Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to
132
  // be released, not deleted when it is no longer referenced.
133
0
  snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot,
134
0
                                      this, std::placeholders::_1, db_));
135
0
  snapshot_needed_ = false;
136
0
  snapshot_notifier_ = nullptr;
137
0
}
138
139
void TransactionBaseImpl::SetSnapshotOnNextOperation(
140
0
    std::shared_ptr<TransactionNotifier> notifier) {
141
0
  snapshot_needed_ = true;
142
0
  snapshot_notifier_ = notifier;
143
0
}
144
145
0
void TransactionBaseImpl::SetSnapshotIfNeeded() {
146
0
  if (snapshot_needed_) {
147
0
    std::shared_ptr<TransactionNotifier> notifier = snapshot_notifier_;
148
0
    SetSnapshot();
149
0
    if (notifier != nullptr) {
150
0
      notifier->SnapshotCreated(GetSnapshot());
151
0
    }
152
0
  }
153
0
}
154
155
Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
156
                                    const SliceParts& key, bool read_only,
157
                                    bool exclusive, const bool do_validate,
158
0
                                    const bool assume_tracked) {
159
0
  size_t key_size = 0;
160
0
  for (int i = 0; i < key.num_parts; ++i) {
161
0
    key_size += key.parts[i].size();
162
0
  }
163
164
0
  std::string str;
165
0
  str.reserve(key_size);
166
167
0
  for (int i = 0; i < key.num_parts; ++i) {
168
0
    str.append(key.parts[i].data(), key.parts[i].size());
169
0
  }
170
171
0
  return TryLock(column_family, str, read_only, exclusive, do_validate,
172
0
                 assume_tracked);
173
0
}
174
175
0
void TransactionBaseImpl::SetSavePoint() {
176
0
  if (save_points_ == nullptr) {
177
0
    save_points_.reset(
178
0
        new std::stack<TransactionBaseImpl::SavePoint,
179
0
                       autovector<TransactionBaseImpl::SavePoint>>());
180
0
  }
181
0
  save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
182
0
                        num_puts_, num_put_entities_, num_deletes_, num_merges_,
183
0
                        lock_tracker_factory_);
184
0
  write_batch_.SetSavePoint();
185
0
}
186
187
0
Status TransactionBaseImpl::RollbackToSavePoint() {
188
0
  if (save_points_ != nullptr && save_points_->size() > 0) {
189
    // Restore saved SavePoint
190
0
    TransactionBaseImpl::SavePoint& save_point = save_points_->top();
191
0
    snapshot_ = save_point.snapshot_;
192
0
    snapshot_needed_ = save_point.snapshot_needed_;
193
0
    snapshot_notifier_ = save_point.snapshot_notifier_;
194
0
    num_puts_ = save_point.num_puts_;
195
0
    num_put_entities_ = save_point.num_put_entities_;
196
0
    num_deletes_ = save_point.num_deletes_;
197
0
    num_merges_ = save_point.num_merges_;
198
199
    // Rollback batch
200
0
    Status s = write_batch_.RollbackToSavePoint();
201
0
    assert(s.ok());
202
203
    // Rollback any keys that were tracked since the last savepoint
204
0
    tracked_locks_->Subtract(*save_point.new_locks_);
205
206
0
    save_points_->pop();
207
208
0
    return s;
209
0
  } else {
210
0
    assert(write_batch_.RollbackToSavePoint().IsNotFound());
211
0
    return Status::NotFound();
212
0
  }
213
0
}
214
215
0
Status TransactionBaseImpl::PopSavePoint() {
216
0
  if (save_points_ == nullptr || save_points_->empty()) {
217
    // No SavePoint yet.
218
0
    assert(write_batch_.PopSavePoint().IsNotFound());
219
0
    return Status::NotFound();
220
0
  }
221
222
0
  assert(!save_points_->empty());
223
  // If there is another savepoint A below the current savepoint B, then A needs
224
  // to inherit tracked_keys in B so that if we rollback to savepoint A, we
225
  // remember to unlock keys in B. If there is no other savepoint below, then we
226
  // can safely discard savepoint info.
227
0
  if (save_points_->size() == 1) {
228
0
    save_points_->pop();
229
0
  } else {
230
0
    TransactionBaseImpl::SavePoint top(lock_tracker_factory_);
231
0
    std::swap(top, save_points_->top());
232
0
    save_points_->pop();
233
234
0
    save_points_->top().new_locks_->Merge(*top.new_locks_);
235
0
  }
236
237
0
  return write_batch_.PopSavePoint();
238
0
}
239
240
Status TransactionBaseImpl::Get(const ReadOptions& _read_options,
241
                                ColumnFamilyHandle* column_family,
242
0
                                const Slice& key, std::string* value) {
243
0
  if (_read_options.io_activity != Env::IOActivity::kUnknown &&
244
0
      _read_options.io_activity != Env::IOActivity::kGet) {
245
0
    return Status::InvalidArgument(
246
0
        "Can only call Get with `ReadOptions::io_activity` is "
247
0
        "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
248
0
  }
249
0
  ReadOptions read_options(_read_options);
250
0
  if (read_options.io_activity == Env::IOActivity::kUnknown) {
251
0
    read_options.io_activity = Env::IOActivity::kGet;
252
0
  }
253
0
  auto s = GetImpl(read_options, column_family, key, value);
254
0
  return s;
255
0
}
256
257
Status TransactionBaseImpl::GetImpl(const ReadOptions& read_options,
258
                                    ColumnFamilyHandle* column_family,
259
0
                                    const Slice& key, std::string* value) {
260
0
  assert(value != nullptr);
261
0
  PinnableSlice pinnable_val(value);
262
0
  assert(!pinnable_val.IsPinned());
263
0
  auto s = GetImpl(read_options, column_family, key, &pinnable_val);
264
0
  if (s.ok() && pinnable_val.IsPinned()) {
265
0
    value->assign(pinnable_val.data(), pinnable_val.size());
266
0
  }  // else value is already assigned
267
0
  return s;
268
0
}
269
270
Status TransactionBaseImpl::Get(const ReadOptions& _read_options,
271
                                ColumnFamilyHandle* column_family,
272
0
                                const Slice& key, PinnableSlice* pinnable_val) {
273
0
  if (_read_options.io_activity != Env::IOActivity::kUnknown &&
274
0
      _read_options.io_activity != Env::IOActivity::kGet) {
275
0
    return Status::InvalidArgument(
276
0
        "Can only call Get with `ReadOptions::io_activity` is "
277
0
        "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
278
0
  }
279
0
  ReadOptions read_options(_read_options);
280
0
  if (read_options.io_activity == Env::IOActivity::kUnknown) {
281
0
    read_options.io_activity = Env::IOActivity::kGet;
282
0
  }
283
0
  return GetImpl(read_options, column_family, key, pinnable_val);
284
0
}
285
286
Status TransactionBaseImpl::GetImpl(const ReadOptions& read_options,
287
                                    ColumnFamilyHandle* column_family,
288
                                    const Slice& key,
289
0
                                    PinnableSlice* pinnable_val) {
290
0
  return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
291
0
                                        pinnable_val);
292
0
}
293
294
Status TransactionBaseImpl::GetEntity(const ReadOptions& read_options,
295
                                      ColumnFamilyHandle* column_family,
296
                                      const Slice& key,
297
0
                                      PinnableWideColumns* columns) {
298
0
  return GetEntityImpl(read_options, column_family, key, columns);
299
0
}
300
301
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
302
                                         ColumnFamilyHandle* column_family,
303
                                         const Slice& key, std::string* value,
304
                                         bool exclusive,
305
0
                                         const bool do_validate) {
306
0
  if (!do_validate && read_options.snapshot != nullptr) {
307
0
    return Status::InvalidArgument(
308
0
        "If do_validate is false then GetForUpdate with snapshot is not "
309
0
        "defined.");
310
0
  }
311
0
  if (read_options.io_activity != Env::IOActivity::kUnknown) {
312
0
    return Status::InvalidArgument(
313
0
        "Cannot call GetForUpdate with `ReadOptions::io_activity` != "
314
0
        "`Env::IOActivity::kUnknown`");
315
0
  }
316
0
  Status s =
317
0
      TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
318
319
0
  if (s.ok() && value != nullptr) {
320
0
    assert(value != nullptr);
321
0
    PinnableSlice pinnable_val(value);
322
0
    assert(!pinnable_val.IsPinned());
323
0
    s = GetImpl(read_options, column_family, key, &pinnable_val);
324
0
    if (s.ok() && pinnable_val.IsPinned()) {
325
0
      value->assign(pinnable_val.data(), pinnable_val.size());
326
0
    }  // else value is already assigned
327
0
  }
328
0
  return s;
329
0
}
330
331
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
332
                                         ColumnFamilyHandle* column_family,
333
                                         const Slice& key,
334
                                         PinnableSlice* pinnable_val,
335
                                         bool exclusive,
336
0
                                         const bool do_validate) {
337
0
  if (!do_validate && read_options.snapshot != nullptr) {
338
0
    return Status::InvalidArgument(
339
0
        "If do_validate is false then GetForUpdate with snapshot is not "
340
0
        "defined.");
341
0
  }
342
0
  if (read_options.io_activity != Env::IOActivity::kUnknown) {
343
0
    return Status::InvalidArgument(
344
0
        "Cannot call GetForUpdate with `ReadOptions::io_activity` != "
345
0
        "`Env::IOActivity::kUnknown`");
346
0
  }
347
0
  Status s =
348
0
      TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
349
350
0
  if (s.ok() && pinnable_val != nullptr) {
351
0
    s = GetImpl(read_options, column_family, key, pinnable_val);
352
0
  }
353
0
  return s;
354
0
}
355
356
Status TransactionBaseImpl::GetEntityForUpdate(
357
    const ReadOptions& read_options, ColumnFamilyHandle* column_family,
358
    const Slice& key, PinnableWideColumns* columns, bool exclusive,
359
0
    bool do_validate) {
360
0
  if (!do_validate && read_options.snapshot != nullptr) {
361
0
    return Status::InvalidArgument(
362
0
        "Snapshot must not be set if validation is disabled");
363
0
  }
364
365
0
  const Status s =
366
0
      TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
367
0
  if (!s.ok()) {
368
0
    return s;
369
0
  }
370
371
0
  return GetEntityImpl(read_options, column_family, key, columns);
372
0
}
373
374
std::vector<Status> TransactionBaseImpl::MultiGet(
375
    const ReadOptions& _read_options,
376
    const std::vector<ColumnFamilyHandle*>& column_family,
377
0
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
378
0
  size_t num_keys = keys.size();
379
0
  std::vector<Status> stat_list(num_keys);
380
0
  if (_read_options.io_activity != Env::IOActivity::kUnknown &&
381
0
      _read_options.io_activity != Env::IOActivity::kMultiGet) {
382
0
    Status s = Status::InvalidArgument(
383
0
        "Can only call MultiGet with `ReadOptions::io_activity` is "
384
0
        "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
385
386
0
    for (size_t i = 0; i < num_keys; ++i) {
387
0
      stat_list[i] = s;
388
0
    }
389
0
    return stat_list;
390
0
  }
391
0
  ReadOptions read_options(_read_options);
392
0
  if (read_options.io_activity == Env::IOActivity::kUnknown) {
393
0
    read_options.io_activity = Env::IOActivity::kMultiGet;
394
0
  }
395
396
0
  values->resize(num_keys);
397
0
  for (size_t i = 0; i < num_keys; ++i) {
398
0
    stat_list[i] =
399
0
        GetImpl(read_options, column_family[i], keys[i], &(*values)[i]);
400
0
  }
401
402
0
  return stat_list;
403
0
}
404
405
void TransactionBaseImpl::MultiGet(const ReadOptions& _read_options,
406
                                   ColumnFamilyHandle* column_family,
407
                                   const size_t num_keys, const Slice* keys,
408
                                   PinnableSlice* values, Status* statuses,
409
0
                                   const bool sorted_input) {
410
0
  if (_read_options.io_activity != Env::IOActivity::kUnknown &&
411
0
      _read_options.io_activity != Env::IOActivity::kMultiGet) {
412
0
    Status s = Status::InvalidArgument(
413
0
        "Can only call MultiGet with `ReadOptions::io_activity` is "
414
0
        "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
415
0
    for (size_t i = 0; i < num_keys; ++i) {
416
0
      if (statuses[i].ok()) {
417
0
        statuses[i] = s;
418
0
      }
419
0
    }
420
0
    return;
421
0
  }
422
0
  ReadOptions read_options(_read_options);
423
0
  if (read_options.io_activity == Env::IOActivity::kUnknown) {
424
0
    read_options.io_activity = Env::IOActivity::kMultiGet;
425
0
  }
426
0
  write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
427
0
                                      num_keys, keys, values, statuses,
428
0
                                      sorted_input);
429
0
}
430
431
void TransactionBaseImpl::MultiGetEntity(const ReadOptions& read_options,
432
                                         ColumnFamilyHandle* column_family,
433
                                         size_t num_keys, const Slice* keys,
434
                                         PinnableWideColumns* results,
435
0
                                         Status* statuses, bool sorted_input) {
436
0
  MultiGetEntityImpl(read_options, column_family, num_keys, keys, results,
437
0
                     statuses, sorted_input);
438
0
}
439
440
std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
441
    const ReadOptions& read_options,
442
    const std::vector<ColumnFamilyHandle*>& column_family,
443
0
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
444
0
  size_t num_keys = keys.size();
445
0
  if (read_options.io_activity != Env::IOActivity::kUnknown) {
446
0
    Status s = Status::InvalidArgument(
447
0
        "Cannot call MultiGetForUpdate with `ReadOptions::io_activity` != "
448
0
        "`Env::IOActivity::kUnknown`");
449
0
    return std::vector<Status>(num_keys, s);
450
0
  }
451
  // Regardless of whether the MultiGet succeeded, track these keys.
452
0
  values->resize(num_keys);
453
454
  // Lock all keys
455
0
  for (size_t i = 0; i < num_keys; ++i) {
456
0
    Status s = TryLock(column_family[i], keys[i], true /* read_only */,
457
0
                       true /* exclusive */);
458
0
    if (!s.ok()) {
459
      // Fail entire multiget if we cannot lock all keys
460
0
      return std::vector<Status>(num_keys, s);
461
0
    }
462
0
  }
463
464
  // TODO(agiardullo): optimize multiget?
465
0
  std::vector<Status> stat_list(num_keys);
466
0
  for (size_t i = 0; i < num_keys; ++i) {
467
0
    stat_list[i] =
468
0
        GetImpl(read_options, column_family[i], keys[i], &(*values)[i]);
469
0
  }
470
471
0
  return stat_list;
472
0
}
473
474
0
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
475
0
  Iterator* db_iter = db_->NewIterator(read_options);
476
0
  assert(db_iter);
477
478
0
  return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter,
479
0
                                          &read_options);
480
0
}
481
482
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
483
0
                                           ColumnFamilyHandle* column_family) {
484
0
  Iterator* db_iter = db_->NewIterator(read_options, column_family);
485
0
  assert(db_iter);
486
487
0
  return write_batch_.NewIteratorWithBase(column_family, db_iter,
488
0
                                          &read_options);
489
0
}
490
491
template <typename IterType, typename ImplType, typename ErrorIteratorFuncType>
492
std::unique_ptr<IterType> TransactionBaseImpl::NewMultiCfIterator(
493
    const ReadOptions& read_options,
494
    const std::vector<ColumnFamilyHandle*>& column_families,
495
0
    ErrorIteratorFuncType error_iterator_func) {
496
0
  if (column_families.empty()) {
497
0
    return error_iterator_func(
498
0
        Status::InvalidArgument("No Column Family was provided"));
499
0
  }
500
501
0
  const Comparator* const first_comparator =
502
0
      column_families[0]->GetComparator();
503
0
  assert(first_comparator);
504
505
0
  for (size_t i = 1; i < column_families.size(); ++i) {
506
0
    const Comparator* cf_comparator = column_families[i]->GetComparator();
507
0
    assert(cf_comparator);
508
509
0
    if (first_comparator != cf_comparator &&
510
0
        first_comparator->GetId() != cf_comparator->GetId()) {
511
0
      return error_iterator_func(Status::InvalidArgument(
512
0
          "Different comparators are being used across CFs"));
513
0
    }
514
0
  }
515
516
0
  std::vector<Iterator*> child_iterators;
517
0
  const Status s =
518
0
      db_->NewIterators(read_options, column_families, &child_iterators);
519
0
  if (!s.ok()) {
520
0
    return error_iterator_func(s);
521
0
  }
522
523
0
  assert(column_families.size() == child_iterators.size());
524
525
0
  std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
526
0
      cfh_iter_pairs;
527
0
  cfh_iter_pairs.reserve(column_families.size());
528
0
  for (size_t i = 0; i < column_families.size(); ++i) {
529
0
    cfh_iter_pairs.emplace_back(
530
0
        column_families[i],
531
0
        write_batch_.NewIteratorWithBase(column_families[i], child_iterators[i],
532
0
                                         &read_options));
533
0
  }
534
535
0
  return std::make_unique<ImplType>(read_options,
536
0
                                    column_families[0]->GetComparator(),
537
0
                                    std::move(cfh_iter_pairs));
538
0
}
Unexecuted instantiation: transaction_base.cc:std::__1::unique_ptr<rocksdb::Iterator, std::__1::default_delete<rocksdb::Iterator> > rocksdb::TransactionBaseImpl::NewMultiCfIterator<rocksdb::Iterator, rocksdb::CoalescingIterator, rocksdb::TransactionBaseImpl::GetCoalescingIterator(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&)::$_0>(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&, rocksdb::TransactionBaseImpl::GetCoalescingIterator(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&)::$_0)
Unexecuted instantiation: transaction_base.cc:std::__1::unique_ptr<rocksdb::AttributeGroupIterator, std::__1::default_delete<rocksdb::AttributeGroupIterator> > rocksdb::TransactionBaseImpl::NewMultiCfIterator<rocksdb::AttributeGroupIterator, rocksdb::AttributeGroupIteratorImpl, rocksdb::TransactionBaseImpl::GetAttributeGroupIterator(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&)::$_0>(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&, rocksdb::TransactionBaseImpl::GetAttributeGroupIterator(rocksdb::ReadOptions const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&)::$_0)
539
540
std::unique_ptr<Iterator> TransactionBaseImpl::GetCoalescingIterator(
541
    const ReadOptions& read_options,
542
0
    const std::vector<ColumnFamilyHandle*>& column_families) {
543
0
  return NewMultiCfIterator<Iterator, CoalescingIterator>(
544
0
      read_options, column_families, [](const Status& s) {
545
0
        return std::unique_ptr<Iterator>(NewErrorIterator(s));
546
0
      });
547
0
}
548
549
std::unique_ptr<AttributeGroupIterator>
550
TransactionBaseImpl::GetAttributeGroupIterator(
551
    const ReadOptions& read_options,
552
0
    const std::vector<ColumnFamilyHandle*>& column_families) {
553
0
  return NewMultiCfIterator<AttributeGroupIterator, AttributeGroupIteratorImpl>(
554
0
      read_options, column_families,
555
0
      [](const Status& s) { return NewAttributeGroupErrorIterator(s); });
556
0
}
557
558
Status TransactionBaseImpl::PutEntityImpl(ColumnFamilyHandle* column_family,
559
                                          const Slice& key,
560
                                          const WideColumns& columns,
561
                                          bool do_validate,
562
0
                                          bool assume_tracked) {
563
0
  {
564
0
    constexpr bool read_only = false;
565
0
    constexpr bool exclusive = true;
566
0
    const Status s = TryLock(column_family, key, read_only, exclusive,
567
0
                             do_validate, assume_tracked);
568
0
    if (!s.ok()) {
569
0
      return s;
570
0
    }
571
0
  }
572
573
0
  {
574
0
    const Status s = GetBatchForWrite()->PutEntity(column_family, key, columns);
575
0
    if (!s.ok()) {
576
0
      return s;
577
0
    }
578
0
  }
579
580
0
  ++num_put_entities_;
581
0
  return Status::OK();
582
0
}
583
584
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
585
                                const Slice& key, const Slice& value,
586
0
                                const bool assume_tracked) {
587
0
  const bool do_validate = !assume_tracked;
588
0
  Status s = TryLock(column_family, key, false /* read_only */,
589
0
                     true /* exclusive */, do_validate, assume_tracked);
590
591
0
  if (s.ok()) {
592
0
    s = GetBatchForWrite()->Put(column_family, key, value);
593
0
    if (s.ok()) {
594
0
      num_puts_++;
595
0
    }
596
0
  }
597
598
0
  return s;
599
0
}
600
601
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
602
                                const SliceParts& key, const SliceParts& value,
603
0
                                const bool assume_tracked) {
604
0
  const bool do_validate = !assume_tracked;
605
0
  Status s = TryLock(column_family, key, false /* read_only */,
606
0
                     true /* exclusive */, do_validate, assume_tracked);
607
608
0
  if (s.ok()) {
609
0
    s = GetBatchForWrite()->Put(column_family, key, value);
610
0
    if (s.ok()) {
611
0
      num_puts_++;
612
0
    }
613
0
  }
614
615
0
  return s;
616
0
}
617
618
Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
619
                                  const Slice& key, const Slice& value,
620
0
                                  const bool assume_tracked) {
621
0
  const bool do_validate = !assume_tracked;
622
0
  Status s = TryLock(column_family, key, false /* read_only */,
623
0
                     true /* exclusive */, do_validate, assume_tracked);
624
625
0
  if (s.ok()) {
626
0
    s = GetBatchForWrite()->Merge(column_family, key, value);
627
0
    if (s.ok()) {
628
0
      num_merges_++;
629
0
    }
630
0
  }
631
632
0
  return s;
633
0
}
634
635
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
636
                                   const Slice& key,
637
0
                                   const bool assume_tracked) {
638
0
  const bool do_validate = !assume_tracked;
639
0
  Status s = TryLock(column_family, key, false /* read_only */,
640
0
                     true /* exclusive */, do_validate, assume_tracked);
641
642
0
  if (s.ok()) {
643
0
    s = GetBatchForWrite()->Delete(column_family, key);
644
0
    if (s.ok()) {
645
0
      num_deletes_++;
646
0
    }
647
0
  }
648
649
0
  return s;
650
0
}
651
652
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
653
                                   const SliceParts& key,
654
0
                                   const bool assume_tracked) {
655
0
  const bool do_validate = !assume_tracked;
656
0
  Status s = TryLock(column_family, key, false /* read_only */,
657
0
                     true /* exclusive */, do_validate, assume_tracked);
658
659
0
  if (s.ok()) {
660
0
    s = GetBatchForWrite()->Delete(column_family, key);
661
0
    if (s.ok()) {
662
0
      num_deletes_++;
663
0
    }
664
0
  }
665
666
0
  return s;
667
0
}
668
669
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
670
                                         const Slice& key,
671
0
                                         const bool assume_tracked) {
672
0
  const bool do_validate = !assume_tracked;
673
0
  Status s = TryLock(column_family, key, false /* read_only */,
674
0
                     true /* exclusive */, do_validate, assume_tracked);
675
676
0
  if (s.ok()) {
677
0
    s = GetBatchForWrite()->SingleDelete(column_family, key);
678
0
    if (s.ok()) {
679
0
      num_deletes_++;
680
0
    }
681
0
  }
682
683
0
  return s;
684
0
}
685
686
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
687
                                         const SliceParts& key,
688
0
                                         const bool assume_tracked) {
689
0
  const bool do_validate = !assume_tracked;
690
0
  Status s = TryLock(column_family, key, false /* read_only */,
691
0
                     true /* exclusive */, do_validate, assume_tracked);
692
693
0
  if (s.ok()) {
694
0
    s = GetBatchForWrite()->SingleDelete(column_family, key);
695
0
    if (s.ok()) {
696
0
      num_deletes_++;
697
0
    }
698
0
  }
699
700
0
  return s;
701
0
}
702
703
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
704
0
                                         const Slice& key, const Slice& value) {
705
0
  Status s = TryLock(column_family, key, false /* read_only */,
706
0
                     true /* exclusive */, false /* do_validate */);
707
708
0
  if (s.ok()) {
709
0
    s = GetBatchForWrite()->Put(column_family, key, value);
710
0
    if (s.ok()) {
711
0
      num_puts_++;
712
0
    }
713
0
  }
714
715
0
  return s;
716
0
}
717
718
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
719
                                         const SliceParts& key,
720
0
                                         const SliceParts& value) {
721
0
  Status s = TryLock(column_family, key, false /* read_only */,
722
0
                     true /* exclusive */, false /* do_validate */);
723
724
0
  if (s.ok()) {
725
0
    s = GetBatchForWrite()->Put(column_family, key, value);
726
0
    if (s.ok()) {
727
0
      num_puts_++;
728
0
    }
729
0
  }
730
731
0
  return s;
732
0
}
733
734
Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
735
                                           const Slice& key,
736
0
                                           const Slice& value) {
737
0
  Status s = TryLock(column_family, key, false /* read_only */,
738
0
                     true /* exclusive */, false /* do_validate */);
739
740
0
  if (s.ok()) {
741
0
    s = GetBatchForWrite()->Merge(column_family, key, value);
742
0
    if (s.ok()) {
743
0
      num_merges_++;
744
0
    }
745
0
  }
746
747
0
  return s;
748
0
}
749
750
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
751
0
                                            const Slice& key) {
752
0
  Status s = TryLock(column_family, key, false /* read_only */,
753
0
                     true /* exclusive */, false /* do_validate */);
754
755
0
  if (s.ok()) {
756
0
    s = GetBatchForWrite()->Delete(column_family, key);
757
0
    if (s.ok()) {
758
0
      num_deletes_++;
759
0
    }
760
0
  }
761
762
0
  return s;
763
0
}
764
765
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
766
0
                                            const SliceParts& key) {
767
0
  Status s = TryLock(column_family, key, false /* read_only */,
768
0
                     true /* exclusive */, false /* do_validate */);
769
770
0
  if (s.ok()) {
771
0
    s = GetBatchForWrite()->Delete(column_family, key);
772
0
    if (s.ok()) {
773
0
      num_deletes_++;
774
0
    }
775
0
  }
776
777
0
  return s;
778
0
}
779
780
Status TransactionBaseImpl::SingleDeleteUntracked(
781
0
    ColumnFamilyHandle* column_family, const Slice& key) {
782
0
  Status s = TryLock(column_family, key, false /* read_only */,
783
0
                     true /* exclusive */, false /* do_validate */);
784
785
0
  if (s.ok()) {
786
0
    s = GetBatchForWrite()->SingleDelete(column_family, key);
787
0
    if (s.ok()) {
788
0
      num_deletes_++;
789
0
    }
790
0
  }
791
792
0
  return s;
793
0
}
794
795
0
void TransactionBaseImpl::PutLogData(const Slice& blob) {
796
0
  auto s = write_batch_.PutLogData(blob);
797
0
  (void)s;
798
0
  assert(s.ok());
799
0
}
800
801
0
WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
802
0
  return &write_batch_;
803
0
}
804
805
0
uint64_t TransactionBaseImpl::GetElapsedTime() const {
806
0
  return (dbimpl_->GetSystemClock()->NowMicros() - start_time_) / 1000;
807
0
}
808
809
0
uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
810
811
0
uint64_t TransactionBaseImpl::GetNumPutEntities() const {
812
0
  return num_put_entities_;
813
0
}
814
815
0
uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
816
817
0
uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
818
819
0
uint64_t TransactionBaseImpl::GetNumKeys() const {
820
0
  return tracked_locks_->GetNumPointLocks();
821
0
}
822
823
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
824
                                   SequenceNumber seq, bool read_only,
825
0
                                   bool exclusive) {
826
0
  PointLockRequest r;
827
0
  r.column_family_id = cfh_id;
828
0
  r.key = key;
829
0
  r.seq = seq;
830
0
  r.read_only = read_only;
831
0
  r.exclusive = exclusive;
832
833
  // Update map of all tracked keys for this transaction
834
0
  tracked_locks_->Track(r);
835
836
0
  if (save_points_ != nullptr && !save_points_->empty()) {
837
    // Update map of tracked keys in this SavePoint
838
0
    save_points_->top().new_locks_->Track(r);
839
0
  }
840
0
}
841
842
// Gets the write batch that should be used for Put/PutEntity/Merge/Delete
843
// operations.
844
//
845
// Returns either a WriteBatch or WriteBatchWithIndex depending on whether
846
// DisableIndexing() has been called.
847
0
WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
848
0
  if (indexing_enabled_) {
849
    // Use WriteBatchWithIndex
850
0
    return &write_batch_;
851
0
  } else {
852
    // Don't use WriteBatchWithIndex. Return base WriteBatch.
853
0
    return write_batch_.GetWriteBatch();
854
0
  }
855
0
}
856
857
0
void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
858
0
  if (snapshot != nullptr) {
859
0
    ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log,
860
0
                      "ReleaseSnapshot %" PRIu64 " Set",
861
0
                      snapshot->GetSequenceNumber());
862
0
    db->ReleaseSnapshot(snapshot);
863
0
  }
864
0
}
865
866
void TransactionBaseImpl::UndoGetForUpdate(ColumnFamilyHandle* column_family,
867
0
                                           const Slice& key) {
868
0
  PointLockRequest r;
869
0
  r.column_family_id = GetColumnFamilyID(column_family);
870
0
  r.key = key.ToString();
871
0
  r.read_only = true;
872
873
0
  bool can_untrack = false;
874
0
  if (save_points_ != nullptr && !save_points_->empty()) {
875
    // If there is no GetForUpdate of the key in this save point,
876
    // then cannot untrack from the global lock tracker.
877
0
    UntrackStatus s = save_points_->top().new_locks_->Untrack(r);
878
0
    can_untrack = (s != UntrackStatus::NOT_TRACKED);
879
0
  } else {
880
    // No save point, so can untrack from the global lock tracker.
881
0
    can_untrack = true;
882
0
  }
883
884
0
  if (can_untrack) {
885
    // If erased from the global tracker, then can unlock the key.
886
0
    UntrackStatus s = tracked_locks_->Untrack(r);
887
0
    bool can_unlock = (s == UntrackStatus::REMOVED);
888
0
    if (can_unlock) {
889
0
      UnlockGetForUpdate(column_family, key);
890
0
    }
891
0
  }
892
0
}
893
894
0
Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
895
0
  struct IndexedWriteBatchBuilder : public WriteBatch::Handler {
896
0
    Transaction* txn_;
897
0
    DBImpl* db_;
898
0
    IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db)
899
0
        : txn_(txn), db_(db) {
900
0
      assert(dynamic_cast<TransactionBaseImpl*>(txn_) != nullptr);
901
0
    }
902
903
0
    Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
904
0
      Slice user_key = GetUserKey(cf, key);
905
0
      return txn_->Put(db_->GetColumnFamilyHandle(cf), user_key, val);
906
0
    }
907
908
0
    Status PutEntityCF(uint32_t cf, const Slice& key,
909
0
                       const Slice& entity) override {
910
0
      Slice user_key = GetUserKey(cf, key);
911
0
      Slice entity_copy = entity;
912
0
      WideColumns columns;
913
0
      const Status s =
914
0
          WideColumnSerialization::Deserialize(entity_copy, columns);
915
0
      if (!s.ok()) {
916
0
        return s;
917
0
      }
918
919
0
      return txn_->PutEntity(db_->GetColumnFamilyHandle(cf), user_key, columns);
920
0
    }
921
922
0
    Status DeleteCF(uint32_t cf, const Slice& key) override {
923
0
      Slice user_key = GetUserKey(cf, key);
924
0
      return txn_->Delete(db_->GetColumnFamilyHandle(cf), user_key);
925
0
    }
926
927
0
    Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
928
0
      Slice user_key = GetUserKey(cf, key);
929
0
      return txn_->SingleDelete(db_->GetColumnFamilyHandle(cf), user_key);
930
0
    }
931
932
0
    Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
933
0
      Slice user_key = GetUserKey(cf, key);
934
0
      return txn_->Merge(db_->GetColumnFamilyHandle(cf), user_key, val);
935
0
    }
936
937
    // this is used for reconstructing prepared transactions upon
938
    // recovery. there should not be any meta markers in the batches
939
    // we are processing.
940
0
    Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
941
942
0
    Status MarkEndPrepare(const Slice&) override {
943
0
      return Status::InvalidArgument();
944
0
    }
945
946
0
    Status MarkCommit(const Slice&) override {
947
0
      return Status::InvalidArgument();
948
0
    }
949
950
0
    Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
951
0
      return Status::InvalidArgument();
952
0
    }
953
954
0
    Status MarkRollback(const Slice&) override {
955
0
      return Status::InvalidArgument();
956
0
    }
957
0
    size_t GetTimestampSize(uint32_t cf_id) {
958
0
      auto cfd = db_->versions_->GetColumnFamilySet()->GetColumnFamily(cf_id);
959
0
      const Comparator* ucmp = cfd->user_comparator();
960
0
      assert(ucmp);
961
0
      return ucmp->timestamp_size();
962
0
    }
963
964
0
    Slice GetUserKey(uint32_t cf_id, const Slice& key) {
965
0
      size_t ts_sz = GetTimestampSize(cf_id);
966
0
      if (ts_sz == 0) {
967
0
        return key;
968
0
      }
969
0
      assert(key.size() >= ts_sz);
970
0
      return Slice(key.data(), key.size() - ts_sz);
971
0
    }
972
0
  };
973
974
0
  IndexedWriteBatchBuilder copycat(this, dbimpl_);
975
0
  return src_batch->Iterate(&copycat);
976
0
}
977
978
0
WriteBatch* TransactionBaseImpl::GetCommitTimeWriteBatch() {
979
0
  return &commit_time_batch_;
980
0
}
981
}  // namespace ROCKSDB_NAMESPACE