Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/transaction_util.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "utilities/transactions/transaction_util.h"
7
8
#include <cinttypes>
9
#include <string>
10
#include <vector>
11
12
#include "db/db_impl/db_impl.h"
13
#include "rocksdb/status.h"
14
#include "rocksdb/utilities/write_batch_with_index.h"
15
#include "util/cast_util.h"
16
#include "util/string_util.h"
17
18
namespace ROCKSDB_NAMESPACE {
19
20
Status TransactionUtil::CheckKeyForConflicts(
21
    DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key,
22
    SequenceNumber snap_seq, const std::string* const read_ts, bool cache_only,
23
    ReadCallback* snap_checker, SequenceNumber min_uncommitted,
24
0
    bool enable_udt_validation) {
25
0
  Status result;
26
27
0
  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
28
0
  auto cfd = cfh->cfd();
29
0
  SuperVersion* sv = db_impl->GetAndRefSuperVersion(cfd);
30
31
0
  if (sv == nullptr) {
32
0
    result = Status::InvalidArgument("Could not access column family " +
33
0
                                     cfh->GetName());
34
0
  }
35
36
0
  if (result.ok()) {
37
0
    SequenceNumber earliest_seq =
38
0
        db_impl->GetEarliestMemTableSequenceNumber(sv, true);
39
40
0
    result =
41
0
        CheckKey(db_impl, sv, earliest_seq, snap_seq, key, read_ts, cache_only,
42
0
                 snap_checker, min_uncommitted, enable_udt_validation);
43
44
0
    db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
45
0
  }
46
47
0
  return result;
48
0
}
49
50
Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
51
                                 SequenceNumber earliest_seq,
52
                                 SequenceNumber snap_seq,
53
                                 const std::string& key,
54
                                 const std::string* const read_ts,
55
                                 bool cache_only, ReadCallback* snap_checker,
56
                                 SequenceNumber min_uncommitted,
57
0
                                 bool enable_udt_validation) {
58
  // When `min_uncommitted` is provided, keys are not always committed
59
  // in sequence number order, and `snap_checker` is used to check whether
60
  // specific sequence number is in the database is visible to the transaction.
61
  // So `snap_checker` must be provided.
62
0
  assert(min_uncommitted == kMaxSequenceNumber || snap_checker != nullptr);
63
64
0
  Status result;
65
0
  bool need_to_read_sst = false;
66
67
  // Since it would be too slow to check the SST files, we will only use
68
  // the memtables to check whether there have been any recent writes
69
  // to this key after it was accessed in this transaction.  But if the
70
  // Memtables do not contain a long enough history, we must fail the
71
  // transaction.
72
0
  if (earliest_seq == kMaxSequenceNumber) {
73
    // The age of this memtable is unknown.  Cannot rely on it to check
74
    // for recent writes.  This error shouldn't happen often in practice as
75
    // the Memtable should have a valid earliest sequence number except in some
76
    // corner cases (such as error cases during recovery).
77
0
    need_to_read_sst = true;
78
79
0
    if (cache_only) {
80
0
      result = Status::TryAgain(
81
0
          "Transaction could not check for conflicts as the MemTable does not "
82
0
          "contain a long enough history to check write at SequenceNumber: ",
83
0
          std::to_string(snap_seq));
84
0
    }
85
0
  } else if (snap_seq < earliest_seq || min_uncommitted <= earliest_seq) {
86
    // Use <= for min_uncommitted since earliest_seq is actually the largest sec
87
    // before this memtable was created
88
0
    need_to_read_sst = true;
89
90
0
    if (cache_only) {
91
      // The age of this memtable is too new to use to check for recent
92
      // writes.
93
0
      char msg[300];
94
0
      snprintf(msg, sizeof(msg),
95
0
               "Transaction could not check for conflicts for operation at "
96
0
               "SequenceNumber %" PRIu64
97
0
               " as the MemTable only contains changes newer than "
98
0
               "SequenceNumber %" PRIu64
99
0
               ".  Increasing the value of the "
100
0
               "max_write_buffer_size_to_maintain option could reduce the "
101
0
               "frequency "
102
0
               "of this error.",
103
0
               snap_seq, earliest_seq);
104
0
      result = Status::TryAgain(msg);
105
0
    }
106
0
  }
107
108
0
  if (result.ok()) {
109
0
    SequenceNumber seq = kMaxSequenceNumber;
110
0
    std::string timestamp;
111
0
    bool found_record_for_key = false;
112
113
    // When min_uncommitted == kMaxSequenceNumber, writes are committed in
114
    // sequence number order, so only keys larger than `snap_seq` can cause
115
    // conflict.
116
    // When min_uncommitted != kMaxSequenceNumber, keys lower than
117
    // min_uncommitted will not triggered conflicts, while keys larger than
118
    // min_uncommitted might create conflicts, so we need  to read them out
119
    // from the DB, and call callback to snap_checker to determine. So only
120
    // keys lower than min_uncommitted can be skipped.
121
0
    SequenceNumber lower_bound_seq =
122
0
        (min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted;
123
0
    Status s = db_impl->GetLatestSequenceForKey(
124
0
        sv, key, !need_to_read_sst, lower_bound_seq, &seq,
125
0
        !read_ts ? nullptr : &timestamp, &found_record_for_key,
126
0
        /*is_blob_index=*/nullptr);
127
128
0
    if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
129
0
      result = s;
130
0
    } else if (found_record_for_key) {
131
0
      bool write_conflict = snap_checker == nullptr
132
0
                                ? snap_seq < seq
133
0
                                : !snap_checker->IsVisible(seq);
134
      // Perform conflict checking based on timestamp if applicable.
135
0
      if (enable_udt_validation && !write_conflict && read_ts != nullptr) {
136
0
        ColumnFamilyData* cfd = sv->cfd;
137
0
        assert(cfd);
138
0
        const Comparator* const ucmp = cfd->user_comparator();
139
0
        assert(ucmp);
140
0
        assert(read_ts->size() == ucmp->timestamp_size());
141
0
        assert(read_ts->size() == timestamp.size());
142
        // Write conflict if *ts < timestamp.
143
0
        write_conflict = ucmp->CompareTimestamp(*read_ts, timestamp) < 0;
144
0
      }
145
0
      if (write_conflict) {
146
0
        result = Status::Busy();
147
0
      }
148
0
    }
149
0
  }
150
151
0
  return result;
152
0
}
153
154
Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
155
                                              const LockTracker& tracker,
156
0
                                              bool cache_only) {
157
0
  Status result;
158
159
0
  std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
160
0
      tracker.GetColumnFamilyIterator());
161
0
  assert(cf_it != nullptr);
162
0
  while (cf_it->HasNext()) {
163
0
    ColumnFamilyId cf = cf_it->Next();
164
165
0
    SuperVersion* sv = db_impl->GetAndRefSuperVersion(cf);
166
0
    if (sv == nullptr) {
167
0
      result = Status::InvalidArgument("Could not access column family " +
168
0
                                       std::to_string(cf));
169
0
      break;
170
0
    }
171
172
0
    SequenceNumber earliest_seq =
173
0
        db_impl->GetEarliestMemTableSequenceNumber(sv, true);
174
175
    // For each of the keys in this transaction, check to see if someone has
176
    // written to this key since the start of the transaction.
177
0
    std::unique_ptr<LockTracker::KeyIterator> key_it(
178
0
        tracker.GetKeyIterator(cf));
179
0
    assert(key_it != nullptr);
180
0
    while (key_it->HasNext()) {
181
0
      const std::string& key = key_it->Next();
182
0
      PointLockStatus status = tracker.GetPointLockStatus(cf, key);
183
0
      const SequenceNumber key_seq = status.seq;
184
185
      // TODO: support timestamp-based conflict checking.
186
      // CheckKeysForConflicts() is currently used only by optimistic
187
      // transactions.
188
0
      result = CheckKey(db_impl, sv, earliest_seq, key_seq, key,
189
0
                        /*read_ts=*/nullptr, cache_only);
190
0
      if (!result.ok()) {
191
0
        break;
192
0
      }
193
0
    }
194
195
0
    db_impl->ReturnAndCleanupSuperVersion(cf, sv);
196
197
0
    if (!result.ok()) {
198
0
      break;
199
0
    }
200
0
  }
201
202
0
  return result;
203
0
}
204
205
}  // namespace ROCKSDB_NAMESPACE