Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/write_prepared_txn_db.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <cinttypes>
9
#include <mutex>
10
#include <queue>
11
#include <set>
12
#include <string>
13
#include <unordered_map>
14
#include <vector>
15
16
#include "db/attribute_group_iterator_impl.h"
17
#include "db/db_iter.h"
18
#include "db/pre_release_callback.h"
19
#include "db/read_callback.h"
20
#include "db/snapshot_checker.h"
21
#include "logging/logging.h"
22
#include "rocksdb/db.h"
23
#include "rocksdb/options.h"
24
#include "rocksdb/utilities/transaction_db.h"
25
#include "util/cast_util.h"
26
#include "util/set_comparator.h"
27
#include "util/string_util.h"
28
#include "utilities/transactions/pessimistic_transaction.h"
29
#include "utilities/transactions/pessimistic_transaction_db.h"
30
#include "utilities/transactions/write_prepared_txn.h"
31
32
namespace ROCKSDB_NAMESPACE {
33
enum SnapshotBackup : bool { kUnbackedByDBSnapshot, kBackedByDBSnapshot };
34
35
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
36
// In this way some data in the DB might not be committed. The DB provides
37
// mechanisms to tell such data apart from committed data.
38
class WritePreparedTxnDB : public PessimisticTransactionDB {
39
 public:
40
  explicit WritePreparedTxnDB(DB* db,
41
                              const TransactionDBOptions& txn_db_options)
42
0
      : PessimisticTransactionDB(db, txn_db_options),
43
0
        SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
44
0
        SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
45
0
        COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
46
0
        COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
47
0
        FORMAT(COMMIT_CACHE_BITS) {
48
0
    Init(txn_db_options);
49
0
  }
50
51
  explicit WritePreparedTxnDB(StackableDB* db,
52
                              const TransactionDBOptions& txn_db_options)
53
0
      : PessimisticTransactionDB(db, txn_db_options),
54
0
        SNAPSHOT_CACHE_BITS(txn_db_options.wp_snapshot_cache_bits),
55
0
        SNAPSHOT_CACHE_SIZE(static_cast<size_t>(1ull << SNAPSHOT_CACHE_BITS)),
56
0
        COMMIT_CACHE_BITS(txn_db_options.wp_commit_cache_bits),
57
0
        COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
58
0
        FORMAT(COMMIT_CACHE_BITS) {
59
0
    Init(txn_db_options);
60
0
  }
61
62
  virtual ~WritePreparedTxnDB();
63
64
  Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices,
65
                    const std::vector<ColumnFamilyHandle*>& handles) override;
66
67
  Transaction* BeginTransaction(const WriteOptions& write_options,
68
                                const TransactionOptions& txn_options,
69
                                Transaction* old_txn) override;
70
71
  using TransactionDB::Write;
72
  Status Write(const WriteOptions& opts, WriteBatch* updates) override;
73
74
  // Optimized version of ::Write that receives more optimization request such
75
  // as skip_concurrency_control.
76
  using PessimisticTransactionDB::Write;
77
  Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
78
               WriteBatch* updates) override;
79
80
  // Write the batch to the underlying DB and mark it as committed. Could be
81
  // used by both directly from TxnDB or through a transaction.
82
  Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
83
                       size_t batch_cnt, WritePreparedTxn* txn);
84
85
  using DB::Get;
86
  Status Get(const ReadOptions& _read_options,
87
             ColumnFamilyHandle* column_family, const Slice& key,
88
             PinnableSlice* value, std::string* timestamp) override;
89
90
  using DB::MultiGet;
91
  void MultiGet(const ReadOptions& _read_options, const size_t num_keys,
92
                ColumnFamilyHandle** column_families, const Slice* keys,
93
                PinnableSlice* values, std::string* timestamps,
94
                Status* statuses, const bool sorted_input) override;
95
96
  using DB::NewIterator;
97
  Iterator* NewIterator(const ReadOptions& _read_options,
98
                        ColumnFamilyHandle* column_family) override;
99
100
  using DB::NewIterators;
101
  Status NewIterators(const ReadOptions& _read_options,
102
                      const std::vector<ColumnFamilyHandle*>& column_families,
103
                      std::vector<Iterator*>* iterators) override;
104
105
  using DB::NewCoalescingIterator;
106
  std::unique_ptr<Iterator> NewCoalescingIterator(
107
      const ReadOptions& /*options*/,
108
0
      const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
109
0
    return std::unique_ptr<Iterator>(
110
0
        NewErrorIterator(Status::NotSupported("Not supported yet")));
111
0
  }
112
113
  using DB::NewAttributeGroupIterator;
114
  std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
115
      const ReadOptions& /*options*/,
116
0
      const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
117
0
    return NewAttributeGroupErrorIterator(
118
0
        Status::NotSupported("Not supported yet"));
119
0
  }
120
121
  // Check whether the transaction that wrote the value with sequence number seq
122
  // is visible to the snapshot with sequence number snapshot_seq.
123
  // Returns true if commit_seq <= snapshot_seq
124
  // If the snapshot_seq is already released and snapshot_seq <= max, sets
125
  // *snap_released to true and returns true as well.
126
  inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
127
                           uint64_t min_uncommitted = kMinUnCommittedSeq,
128
0
                           bool* snap_released = nullptr) const {
129
0
    ROCKS_LOG_DETAILS(info_log_,
130
0
                      "IsInSnapshot %" PRIu64 " in %" PRIu64
131
0
                      " min_uncommitted %" PRIu64,
132
0
                      prep_seq, snapshot_seq, min_uncommitted);
133
0
    assert(min_uncommitted >= kMinUnCommittedSeq);
134
    // Caller is responsible to initialize snap_released.
135
0
    assert(snap_released == nullptr || *snap_released == false);
136
    // Here we try to infer the return value without looking into prepare list.
137
    // This would help avoiding synchronization over a shared map.
138
    // TODO(myabandeh): optimize this. This sequence of checks must be correct
139
    // but not necessary efficient
140
0
    if (prep_seq == 0) {
141
      // Compaction will output keys to bottom-level with sequence number 0 if
142
      // it is visible to the earliest snapshot.
143
0
      ROCKS_LOG_DETAILS(
144
0
          info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
145
0
          prep_seq, snapshot_seq, 1);
146
0
      return true;
147
0
    }
148
0
    if (snapshot_seq < prep_seq) {
149
      // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
150
0
      ROCKS_LOG_DETAILS(
151
0
          info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
152
0
          prep_seq, snapshot_seq, 0);
153
0
      return false;
154
0
    }
155
0
    if (prep_seq < min_uncommitted) {
156
0
      ROCKS_LOG_DETAILS(info_log_,
157
0
                        "IsInSnapshot %" PRIu64 " in %" PRIu64
158
0
                        " returns %" PRId32
159
0
                        " because of min_uncommitted %" PRIu64,
160
0
                        prep_seq, snapshot_seq, 1, min_uncommitted);
161
0
      return true;
162
0
    }
163
    // Commit of delayed prepared has two non-atomic steps: add to commit cache,
164
    // remove from delayed prepared. Our reads from these two is also
165
    // non-atomic. By looking into commit cache first thus we might not find the
166
    // prep_seq neither in commit cache not in delayed_prepared_. To fix that i)
167
    // we check if there was any delayed prepared BEFORE looking into commit
168
    // cache, ii) if there was, we complete the search steps to be these: i)
169
    // commit cache, ii) delayed prepared, commit cache again. In this way if
170
    // the first query to commit cache missed the commit, the 2nd will catch it.
171
0
    bool was_empty;
172
0
    SequenceNumber max_evicted_seq_lb, max_evicted_seq_ub;
173
0
    CommitEntry64b dont_care;
174
0
    auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
175
0
    size_t repeats = 0;
176
0
    do {
177
0
      repeats++;
178
0
      assert(repeats < 100);
179
0
      if (UNLIKELY(repeats >= 100)) {
180
0
        throw std::runtime_error(
181
0
            "The read was intrupted 100 times by update to max_evicted_seq_. "
182
0
            "This is unexpected in all setups");
183
0
      }
184
0
      max_evicted_seq_lb = max_evicted_seq_.load(std::memory_order_acquire);
185
0
      TEST_SYNC_POINT(
186
0
          "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause");
187
0
      TEST_SYNC_POINT(
188
0
          "WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume");
189
0
      was_empty = delayed_prepared_empty_.load(std::memory_order_acquire);
190
0
      TEST_SYNC_POINT(
191
0
          "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause");
192
0
      TEST_SYNC_POINT(
193
0
          "WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume");
194
0
      CommitEntry cached;
195
0
      bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
196
0
      TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause");
197
0
      TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume");
198
0
      if (exist && prep_seq == cached.prep_seq) {
199
        // It is committed and also not evicted from commit cache
200
0
        ROCKS_LOG_DETAILS(
201
0
            info_log_,
202
0
            "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
203
0
            prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
204
0
        return cached.commit_seq <= snapshot_seq;
205
0
      }
206
      // else it could be committed but not inserted in the map which could
207
      // happen after recovery, or it could be committed and evicted by another
208
      // commit, or never committed.
209
210
      // At this point we don't know if it was committed or it is still prepared
211
0
      max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
212
0
      if (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub)) {
213
0
        continue;
214
0
      }
215
      // Note: max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq_ub
216
0
      if (max_evicted_seq_ub < prep_seq) {
217
        // Not evicted from cache and also not present, so must be still
218
        // prepared
219
0
        ROCKS_LOG_DETAILS(info_log_,
220
0
                          "IsInSnapshot %" PRIu64 " in %" PRIu64
221
0
                          " returns %" PRId32,
222
0
                          prep_seq, snapshot_seq, 0);
223
0
        return false;
224
0
      }
225
0
      TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause");
226
0
      TEST_SYNC_POINT(
227
0
          "WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume");
228
0
      if (!was_empty) {
229
        // We should not normally reach here
230
0
        WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
231
0
        ReadLock rl(&prepared_mutex_);
232
0
        ROCKS_LOG_WARN(
233
0
            info_log_, "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64,
234
0
            static_cast<uint64_t>(delayed_prepared_.size()), prep_seq);
235
0
        if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
236
          // This is the order: 1) delayed_prepared_commits_ update, 2) publish
237
          // 3) delayed_prepared_ clean up. So check if it is the case of a late
238
          // clenaup.
239
0
          auto it = delayed_prepared_commits_.find(prep_seq);
240
0
          if (it == delayed_prepared_commits_.end()) {
241
            // Then it is not committed yet
242
0
            ROCKS_LOG_DETAILS(info_log_,
243
0
                              "IsInSnapshot %" PRIu64 " in %" PRIu64
244
0
                              " returns %" PRId32,
245
0
                              prep_seq, snapshot_seq, 0);
246
0
            return false;
247
0
          } else {
248
0
            ROCKS_LOG_DETAILS(info_log_,
249
0
                              "IsInSnapshot %" PRIu64 " in %" PRIu64
250
0
                              " commit: %" PRIu64 " returns %" PRId32,
251
0
                              prep_seq, snapshot_seq, it->second,
252
0
                              snapshot_seq <= it->second);
253
0
            return it->second <= snapshot_seq;
254
0
          }
255
0
        } else {
256
          // 2nd query to commit cache. Refer to was_empty comment above.
257
0
          exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
258
0
          if (exist && prep_seq == cached.prep_seq) {
259
0
            ROCKS_LOG_DETAILS(
260
0
                info_log_,
261
0
                "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
262
0
                prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
263
0
            return cached.commit_seq <= snapshot_seq;
264
0
          }
265
0
          max_evicted_seq_ub = max_evicted_seq_.load(std::memory_order_acquire);
266
0
        }
267
0
      }
268
0
    } while (UNLIKELY(max_evicted_seq_lb != max_evicted_seq_ub));
269
    // When advancing max_evicted_seq_, we move older entires from prepared to
270
    // delayed_prepared_. Also we move evicted entries from commit cache to
271
    // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
272
    // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
273
    // old_commit_map_, iii) committed with no conflict with any snapshot. Case
274
    // (i) delayed_prepared_ is checked above
275
0
    if (max_evicted_seq_ub < snapshot_seq) {  // then (ii) cannot be the case
276
      // only (iii) is the case: committed
277
      // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
278
      // snapshot_seq
279
0
      ROCKS_LOG_DETAILS(
280
0
          info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
281
0
          prep_seq, snapshot_seq, 1);
282
0
      return true;
283
0
    }
284
    // else (ii) might be the case: check the commit data saved for this
285
    // snapshot. If there was no overlapping commit entry, then it is committed
286
    // with a commit_seq lower than any live snapshot, including snapshot_seq.
287
0
    if (old_commit_map_empty_.load(std::memory_order_acquire)) {
288
0
      ROCKS_LOG_DETAILS(info_log_,
289
0
                        "IsInSnapshot %" PRIu64 " in %" PRIu64
290
0
                        " returns %" PRId32 " released=1",
291
0
                        prep_seq, snapshot_seq, 0);
292
0
      assert(snap_released);
293
      // This snapshot is not valid anymore. We cannot tell if prep_seq is
294
      // committed before or after the snapshot. Return true but also set
295
      // snap_released to true.
296
0
      *snap_released = true;
297
0
      return true;
298
0
    }
299
0
    {
300
      // We should not normally reach here unless sapshot_seq is old. This is a
301
      // rare case and it is ok to pay the cost of mutex ReadLock for such old,
302
      // reading transactions.
303
0
      WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
304
0
      ReadLock rl(&old_commit_map_mutex_);
305
0
      auto prep_set_entry = old_commit_map_.find(snapshot_seq);
306
0
      bool found = prep_set_entry != old_commit_map_.end();
307
0
      if (found) {
308
0
        auto& vec = prep_set_entry->second;
309
0
        found = std::binary_search(vec.begin(), vec.end(), prep_seq);
310
0
      } else {
311
        // coming from compaction
312
0
        ROCKS_LOG_DETAILS(info_log_,
313
0
                          "IsInSnapshot %" PRIu64 " in %" PRIu64
314
0
                          " returns %" PRId32 " released=1",
315
0
                          prep_seq, snapshot_seq, 0);
316
        // This snapshot is not valid anymore. We cannot tell if prep_seq is
317
        // committed before or after the snapshot. Return true but also set
318
        // snap_released to true.
319
0
        assert(snap_released);
320
0
        *snap_released = true;
321
0
        return true;
322
0
      }
323
324
0
      if (!found) {
325
0
        ROCKS_LOG_DETAILS(info_log_,
326
0
                          "IsInSnapshot %" PRIu64 " in %" PRIu64
327
0
                          " returns %" PRId32,
328
0
                          prep_seq, snapshot_seq, 1);
329
0
        return true;
330
0
      }
331
0
    }
332
    // (ii) it the case: it is committed but after the snapshot_seq
333
0
    ROCKS_LOG_DETAILS(
334
0
        info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
335
0
        prep_seq, snapshot_seq, 0);
336
0
    return false;
337
0
  }
338
339
  // Add the transaction with prepare sequence seq to the prepared list.
340
  // Note: must be called serially with increasing seq on each call.
341
  // locked is true if prepared_mutex_ is already locked.
342
  void AddPrepared(uint64_t seq, bool locked = false);
343
  // Check if any of the prepared txns are less than new max_evicted_seq_. Must
344
  // be called with prepared_mutex_ write locked.
345
  void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
346
  // Remove the transaction with prepare sequence seq from the prepared list
347
  void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
348
  // Add the transaction with prepare sequence prepare_seq and commit sequence
349
  // commit_seq to the commit map. loop_cnt is to detect infinite loops.
350
  // Note: must be called serially.
351
  void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
352
                    uint8_t loop_cnt = 0);
353
354
  struct CommitEntry {
355
    uint64_t prep_seq;
356
    uint64_t commit_seq;
357
0
    CommitEntry() : prep_seq(0), commit_seq(0) {}
358
0
    CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
359
0
    bool operator==(const CommitEntry& rhs) const {
360
0
      return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq;
361
0
    }
362
  };
363
364
  struct CommitEntry64bFormat {
365
    explicit CommitEntry64bFormat(size_t index_bits)
366
0
        : INDEX_BITS(index_bits),
367
0
          PREP_BITS(static_cast<size_t>(64 - PAD_BITS - INDEX_BITS)),
368
0
          COMMIT_BITS(static_cast<size_t>(64 - PREP_BITS)),
369
0
          COMMIT_FILTER(static_cast<uint64_t>((1ull << COMMIT_BITS) - 1)),
370
0
          DELTA_UPPERBOUND(static_cast<uint64_t>((1ull << COMMIT_BITS))) {}
371
    // Number of higher bits of a sequence number that is not used. They are
372
    // used to encode the value type, ...
373
    const size_t PAD_BITS = static_cast<size_t>(8);
374
    // Number of lower bits from prepare seq that can be skipped as they are
375
    // implied by the index of the entry in the array
376
    const size_t INDEX_BITS;
377
    // Number of bits we use to encode the prepare seq
378
    const size_t PREP_BITS;
379
    // Number of bits we use to encode the commit seq.
380
    const size_t COMMIT_BITS;
381
    // Filter to encode/decode commit seq
382
    const uint64_t COMMIT_FILTER;
383
    // The value of commit_seq - prepare_seq + 1 must be less than this bound
384
    const uint64_t DELTA_UPPERBOUND;
385
  };
386
387
  // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ...
388
  // INDEX Delta Seq (64 bits)   = 0 0 0 0 0 0 0 0 0  0 0 0 DELTA DELTA ...
389
  // DELTA DELTA Encoded Value         = PREP PREP .... PREP PREP DELTA DELTA
390
  // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and
391
  // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the
392
  // bits that do not have to be encoded (will be provided externally) DELTA:
393
  // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of
394
  // index bits + PADs
395
  struct CommitEntry64b {
396
0
    constexpr CommitEntry64b() noexcept : rep_(0) {}
397
398
    CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format)
399
0
        : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {}
400
401
    CommitEntry64b(const uint64_t ps, const uint64_t cs,
402
0
                   const CommitEntry64bFormat& format) {
403
0
      assert(ps < static_cast<uint64_t>(
404
0
                      (1ull << (format.PREP_BITS + format.INDEX_BITS))));
405
0
      assert(ps <= cs);
406
0
      uint64_t delta = cs - ps + 1;  // make initialized delta always >= 1
407
      // zero is reserved for uninitialized entries
408
0
      assert(0 < delta);
409
0
      assert(delta < format.DELTA_UPPERBOUND);
410
0
      if (delta >= format.DELTA_UPPERBOUND) {
411
0
        throw std::runtime_error(
412
0
            "commit_seq >> prepare_seq. The allowed distance is " +
413
0
            std::to_string(format.DELTA_UPPERBOUND) + " commit_seq is " +
414
0
            std::to_string(cs) + " prepare_seq is " + std::to_string(ps));
415
0
      }
416
0
      rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER;
417
0
      rep_ = rep_ | delta;
418
0
    }
419
420
    // Return false if the entry is empty
421
    bool Parse(const uint64_t indexed_seq, CommitEntry* entry,
422
0
               const CommitEntry64bFormat& format) {
423
0
      uint64_t delta = rep_ & format.COMMIT_FILTER;
424
      // zero is reserved for uninitialized entries
425
0
      assert(delta < static_cast<uint64_t>((1ull << format.COMMIT_BITS)));
426
0
      if (delta == 0) {
427
0
        return false;  // initialized entry would have non-zero delta
428
0
      }
429
430
0
      assert(indexed_seq < static_cast<uint64_t>((1ull << format.INDEX_BITS)));
431
0
      uint64_t prep_up = rep_ & ~format.COMMIT_FILTER;
432
0
      prep_up >>= format.PAD_BITS;
433
0
      const uint64_t& prep_low = indexed_seq;
434
0
      entry->prep_seq = prep_up | prep_low;
435
436
0
      entry->commit_seq = entry->prep_seq + delta - 1;
437
0
      return true;
438
0
    }
439
440
   private:
441
    uint64_t rep_;
442
  };
443
444
  // Struct to hold ownership of snapshot and read callback for cleanup.
445
  struct IteratorState;
446
447
0
  std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
448
0
    return cf_map_;
449
0
  }
450
0
  std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
451
0
    return handle_map_;
452
0
  }
453
  void UpdateCFComparatorMap(
454
      const std::vector<ColumnFamilyHandle*>& handles) override;
455
  void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
456
457
  const Snapshot* GetSnapshot() override;
458
  SnapshotImpl* GetSnapshotInternal(bool for_ww_conflict_check);
459
460
 protected:
461
  Status VerifyCFOptions(const ColumnFamilyOptions& cf_options) override;
462
  // Assign the min and max sequence numbers for reading from the db. A seq >
463
  // max is not valid, and a seq < min is valid, and a min <= seq < max requires
464
  // further checking. Normally max is defined by the snapshot and min is by
465
  // minimum uncommitted seq.
466
  inline SnapshotBackup AssignMinMaxSeqs(const Snapshot* snapshot,
467
                                         SequenceNumber* min,
468
                                         SequenceNumber* max);
469
  // Validate is a snapshot sequence number is still valid based on the latest
470
  // db status. backed_by_snapshot specifies if the number is baked by an actual
471
  // snapshot object. order specified the memory order with which we load the
472
  // atomic variables: relax is enough for the default since we care about last
473
  // value seen by same thread.
474
  inline bool ValidateSnapshot(
475
      const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
476
      std::memory_order order = std::memory_order_relaxed);
477
  // Get a dummy snapshot that refers to kMaxSequenceNumber
478
0
  Snapshot* GetMaxSnapshot() { return &dummy_max_snapshot_; }
479
480
  bool ShouldRollbackWithSingleDelete(ColumnFamilyHandle* column_family,
481
0
                                      const Slice& key) {
482
0
    return rollback_deletion_type_callback_
483
0
               ? rollback_deletion_type_callback_(this, column_family, key)
484
0
               : false;
485
0
  }
486
487
  std::function<bool(TransactionDB*, ColumnFamilyHandle*, const Slice&)>
488
      rollback_deletion_type_callback_;
489
490
 private:
491
  friend class AddPreparedCallback;
492
  friend class PreparedHeap_BasicsTest_Test;
493
  friend class PreparedHeap_Concurrent_Test;
494
  friend class PreparedHeap_EmptyAtTheEnd_Test;
495
  friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccess_Test;
496
  friend class WritePreparedCommitEntryPreReleaseCallback;
497
  friend class WritePreparedTransactionTestBase;
498
  friend class WritePreparedTxn;
499
  friend class WritePreparedTxnDBMock;
500
  friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
501
  friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasic_Test;
502
  friend class
503
      WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicates_Test;
504
  friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
505
  friend class WritePreparedTransactionTest_BasicRecovery_Test;
506
  friend class WritePreparedTransactionTest_CheckAgainstSnapshots_Test;
507
  friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
508
  friend class WritePreparedTransactionTest_ConflictDetectionAfterRecovery_Test;
509
  friend class WritePreparedTransactionTest_CommitMap_Test;
510
  friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
511
  friend class WritePreparedTransactionTest_IsInSnapshotEmptyMap_Test;
512
  friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
513
  friend class WritePreparedTransactionTest_IsInSnapshot_Test;
514
  friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
515
  friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
516
  friend class WritePreparedTransactionTest_MaxCatchupWithUnbackedSnapshot_Test;
517
  friend class
518
      WritePreparedTransactionTest_NonAtomicCommitOfDelayedPrepared_Test;
519
  friend class
520
      WritePreparedTransactionTest_NonAtomicUpdateOfDelayedPrepared_Test;
521
  friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test;
522
  friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
523
  friend class WritePreparedTransactionTest_Rollback_Test;
524
  friend class WritePreparedTransactionTest_SmallestUnCommittedSeq_Test;
525
  friend class WriteUnpreparedTxn;
526
  friend class WriteUnpreparedTxnDB;
527
  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
528
  friend class MultiOpsTxnsStressTest;
529
530
  void Init(const TransactionDBOptions& txn_db_opts);
531
532
0
  void WPRecordTick(uint32_t ticker_type) const {
533
0
    RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
534
0
  }
535
536
  Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
537
0
                 const Slice& key, std::string* value) {
538
0
    assert(value != nullptr);
539
0
    PinnableSlice pinnable_val(value);
540
0
    assert(!pinnable_val.IsPinned());
541
0
    auto s = GetImpl(options, column_family, key, &pinnable_val);
542
0
    if (s.ok() && pinnable_val.IsPinned()) {
543
0
      value->assign(pinnable_val.data(), pinnable_val.size());
544
0
    }  // else value is already assigned
545
0
    return s;
546
0
  }
547
548
  Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
549
                 const Slice& key, PinnableSlice* value);
550
551
  // A heap with the amortized O(1) complexity for erase. It uses one extra heap
552
  // to keep track of erased entries that are not yet on top of the main heap.
553
  class PreparedHeap {
554
    // The mutex is required for push and pop from PreparedHeap. ::erase will
555
    // use external synchronization via prepared_mutex_.
556
    port::Mutex push_pop_mutex_;
557
    std::deque<uint64_t> heap_;
558
    std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
559
        erased_heap_;
560
    std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
561
    // True when testing crash recovery
562
    bool TEST_CRASH_ = false;
563
    friend class WritePreparedTxnDB;
564
565
   public:
566
0
    ~PreparedHeap() {
567
0
      if (!TEST_CRASH_) {
568
0
        assert(heap_.empty());
569
0
        assert(erased_heap_.empty());
570
0
      }
571
0
    }
572
0
    port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
573
574
0
    inline bool empty() { return top() == kMaxSequenceNumber; }
575
    // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
576
0
    inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
577
0
    inline void push(uint64_t v) {
578
0
      push_pop_mutex_.AssertHeld();
579
0
      if (heap_.empty()) {
580
0
        heap_top_.store(v, std::memory_order_release);
581
0
      } else {
582
0
        assert(heap_top_.load() < v);
583
0
      }
584
0
      heap_.push_back(v);
585
0
    }
586
0
    void pop(bool locked = false) {
587
0
      if (!locked) {
588
0
        push_pop_mutex()->Lock();
589
0
      }
590
0
      push_pop_mutex_.AssertHeld();
591
0
      heap_.pop_front();
592
0
      while (!heap_.empty() && !erased_heap_.empty() &&
593
             // heap_.top() > erased_heap_.top() could happen if we have erased
594
             // a non-existent entry. Ideally the user should not do that but we
595
             // should be resilient against it.
596
0
             heap_.front() >= erased_heap_.top()) {
597
0
        if (heap_.front() == erased_heap_.top()) {
598
0
          heap_.pop_front();
599
0
        }
600
0
        uint64_t erased __attribute__((__unused__));
601
0
        erased = erased_heap_.top();
602
0
        erased_heap_.pop();
603
        // No duplicate prepare sequence numbers
604
0
        assert(erased_heap_.empty() || erased_heap_.top() != erased);
605
0
      }
606
0
      while (heap_.empty() && !erased_heap_.empty()) {
607
0
        erased_heap_.pop();
608
0
      }
609
0
      heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
610
0
                      std::memory_order_release);
611
0
      if (!locked) {
612
0
        push_pop_mutex()->Unlock();
613
0
      }
614
0
    }
615
    // Concurrrent calls needs external synchronization. It is safe to be called
616
    // concurrent to push and pop though.
617
0
    void erase(uint64_t seq) {
618
0
      if (!empty()) {
619
0
        auto top_seq = top();
620
0
        if (seq < top_seq) {
621
          // Already popped, ignore it.
622
0
        } else if (top_seq == seq) {
623
0
          pop();
624
#ifndef NDEBUG
625
          MutexLock ml(push_pop_mutex());
626
          assert(heap_.empty() || heap_.front() != seq);
627
#endif
628
0
        } else {  // top() > seq
629
          // Down the heap, remember to pop it later
630
0
          erased_heap_.push(seq);
631
0
        }
632
0
      }
633
0
    }
634
  };
635
636
0
  void TEST_Crash() override { prepared_txns_.TEST_CRASH_ = true; }
637
638
  // Get the commit entry with index indexed_seq from the commit table. It
639
  // returns true if such entry exists.
640
  bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
641
                      CommitEntry* entry) const;
642
643
  // Rewrite the entry with the index indexed_seq in the commit table with the
644
  // commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
645
  // sets the evicted_entry and returns true.
646
  bool AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry,
647
                      CommitEntry* evicted_entry);
648
649
  // Rewrite the entry with the index indexed_seq in the commit table with the
650
  // commit entry new_entry only if the existing entry matches the
651
  // expected_entry. Returns false otherwise.
652
  bool ExchangeCommitEntry(const uint64_t indexed_seq,
653
                           CommitEntry64b& expected_entry,
654
                           const CommitEntry& new_entry);
655
656
  // Increase max_evicted_seq_ from the previous value prev_max to the new
657
  // value. This also involves taking care of prepared txns that are not
658
  // committed before new_max, as well as updating the list of live snapshots at
659
  // the time of updating the max. Thread-safety: this function can be called
660
  // concurrently. The concurrent invocations of this function is equivalent to
661
  // a serial invocation in which the last invocation is the one with the
662
  // largest new_max value.
663
  void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
664
                            const SequenceNumber& new_max);
665
666
0
  inline SequenceNumber SmallestUnCommittedSeq() {
667
    // Note: We have two lists to look into, but for performance reasons they
668
    // are not read atomically. Since CheckPreparedAgainstMax copies the entry
669
    // to delayed_prepared_ before removing it from prepared_txns_, to ensure
670
    // that a prepared entry will not go unmissed, we look into them in opposite
671
    // order: first read prepared_txns_ and then delayed_prepared_.
672
673
    // This must be called before calling ::top. This is because the concurrent
674
    // thread would call ::RemovePrepared before updating
675
    // GetLatestSequenceNumber(). Reading then in opposite order here guarantees
676
    // that the ::top that we read would be lower the ::top if we had otherwise
677
    // update/read them atomically.
678
0
    auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
679
0
    auto min_prepare = prepared_txns_.top();
680
    // Since we update the prepare_heap always from the main write queue via
681
    // PreReleaseCallback, the prepared_txns_.top() indicates the smallest
682
    // prepared data in 2pc transactions. For non-2pc transactions that are
683
    // written in two steps, we also update prepared_txns_ at the first step
684
    // (via the same mechanism) so that their uncommitted data is reflected in
685
    // SmallestUnCommittedSeq.
686
0
    if (!delayed_prepared_empty_.load()) {
687
0
      ReadLock rl(&prepared_mutex_);
688
0
      if (!delayed_prepared_.empty()) {
689
0
        return *delayed_prepared_.begin();
690
0
      }
691
0
    }
692
0
    bool empty = min_prepare == kMaxSequenceNumber;
693
0
    if (empty) {
694
      // Since GetLatestSequenceNumber is updated
695
      // after prepared_txns_ are, the value of GetLatestSequenceNumber would
696
      // reflect any uncommitted data that is not added to prepared_txns_ yet.
697
      // Otherwise, if there is no concurrent txn, this value simply reflects
698
      // that latest value in the memtable.
699
0
      return next_prepare;
700
0
    } else {
701
0
      return std::min(min_prepare, next_prepare);
702
0
    }
703
0
  }
704
705
  // Enhance the snapshot object by recording in it the smallest uncommitted seq
706
  inline void EnhanceSnapshot(SnapshotImpl* snapshot,
707
0
                              SequenceNumber min_uncommitted) {
708
0
    assert(snapshot);
709
0
    assert(min_uncommitted <= snapshot->number_ + 1);
710
0
    snapshot->min_uncommitted_ = min_uncommitted;
711
0
  }
712
713
  virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
714
      SequenceNumber max);
715
716
  // Will be called by the public ReleaseSnapshot method. Does the maintenance
717
  // internal to WritePreparedTxnDB
718
  void ReleaseSnapshotInternal(const SequenceNumber snap_seq);
719
720
  // Update the list of snapshots corresponding to the soon-to-be-updated
721
  // max_evicted_seq_. Thread-safety: this function can be called concurrently.
722
  // The concurrent invocations of this function is equivalent to a serial
723
  // invocation in which the last invocation is the one with the largest
724
  // version value.
725
  void UpdateSnapshots(const std::vector<SequenceNumber>& snapshots,
726
                       const SequenceNumber& version);
727
  // Check the new list of new snapshots against the old one to see  if any of
728
  // the snapshots are released and to do the cleanup for the released snapshot.
729
  void CleanupReleasedSnapshots(
730
      const std::vector<SequenceNumber>& new_snapshots,
731
      const std::vector<SequenceNumber>& old_snapshots);
732
733
  // Check an evicted entry against live snapshots to see if it should be kept
734
  // around or it can be safely discarded (and hence assume committed for all
735
  // snapshots). Thread-safety: this function can be called concurrently. If it
736
  // is called concurrently with multiple UpdateSnapshots, the result is the
737
  // same as checking the intersection of the snapshot list before updates with
738
  // the snapshot list of all the concurrent updates.
739
  void CheckAgainstSnapshots(const CommitEntry& evicted);
740
741
  // Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
742
  // commit_seq. Return false if checking the next snapshot(s) is not needed.
743
  // This is the case if none of the next snapshots could satisfy the condition.
744
  // next_is_larger: the next snapshot will be a larger value
745
  bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
746
                               const uint64_t& commit_seq,
747
                               const uint64_t& snapshot_seq,
748
                               const bool next_is_larger);
749
750
  // A trick to increase the last visible sequence number by one and also wait
751
  // for the in-flight commits to be visible.
752
  void AdvanceSeqByOne();
753
754
  // The list of live snapshots at the last time that max_evicted_seq_ advanced.
755
  // The list stored into two data structures: in snapshot_cache_ that is
756
  // efficient for concurrent reads, and in snapshots_ if the data does not fit
757
  // into snapshot_cache_. The total number of snapshots in the two lists
758
  std::atomic<size_t> snapshots_total_ = {};
759
  // The list sorted in ascending order. Thread-safety for writes is provided
760
  // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
761
  // each entry. In x86_64 architecture such reads are compiled to simple read
762
  // instructions.
763
  const size_t SNAPSHOT_CACHE_BITS;
764
  const size_t SNAPSHOT_CACHE_SIZE;
765
  std::unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
766
  // 2nd list for storing snapshots. The list sorted in ascending order.
767
  // Thread-safety is provided with snapshots_mutex_.
768
  std::vector<SequenceNumber> snapshots_;
769
  // The list of all snapshots: snapshots_ + snapshot_cache_. This list although
770
  // redundant but simplifies CleanupOldSnapshots implementation.
771
  // Thread-safety is provided with snapshots_mutex_.
772
  std::vector<SequenceNumber> snapshots_all_;
773
  // The version of the latest list of snapshots. This can be used to avoid
774
  // rewriting a list that is concurrently updated with a more recent version.
775
  SequenceNumber snapshots_version_ = 0;
776
777
  // A heap of prepared transactions. Thread-safety is provided with
778
  // prepared_mutex_.
779
  PreparedHeap prepared_txns_;
780
  const size_t COMMIT_CACHE_BITS;
781
  const size_t COMMIT_CACHE_SIZE;
782
  const CommitEntry64bFormat FORMAT;
783
  // commit_cache_ must be initialized to zero to tell apart an empty index from
784
  // a filled one. Thread-safety is provided with commit_cache_mutex_.
785
  std::unique_ptr<std::atomic<CommitEntry64b>[]> commit_cache_;
786
  // The largest evicted *commit* sequence number from the commit_cache_. If a
787
  // seq is smaller than max_evicted_seq_ is might or might not be present in
788
  // commit_cache_. So commit_cache_ must first be checked before consulting
789
  // with max_evicted_seq_.
790
  std::atomic<uint64_t> max_evicted_seq_ = {};
791
  // Order: 1) update future_max_evicted_seq_ = new_max, 2)
792
  // GetSnapshotListFromDB(new_max), max_evicted_seq_ = new_max. Since
793
  // GetSnapshotInternal guarantess that the snapshot seq is larger than
794
  // future_max_evicted_seq_, this guarantes that if a snapshot is not larger
795
  // than max has already being looked at via a GetSnapshotListFromDB(new_max).
796
  std::atomic<uint64_t> future_max_evicted_seq_ = {};
797
  // Advance max_evicted_seq_ by this value each time it needs an update. The
798
  // larger the value, the less frequent advances we would have. We do not want
799
  // it to be too large either as it would cause stalls by doing too much
800
  // maintenance work under the lock.
801
  size_t INC_STEP_FOR_MAX_EVICTED = 1;
802
  // A map from old snapshots (expected to be used by a few read-only txns) to
803
  // prepared sequence number of the evicted entries from commit_cache_ that
804
  // overlaps with such snapshot. These are the prepared sequence numbers that
805
  // the snapshot, to which they are mapped, cannot assume to be committed just
806
  // because it is no longer in the commit_cache_. The vector must be sorted
807
  // after each update.
808
  // Thread-safety is provided with old_commit_map_mutex_.
809
  std::map<SequenceNumber, std::vector<SequenceNumber>> old_commit_map_;
810
  // A set of long-running prepared transactions that are not finished by the
811
  // time max_evicted_seq_ advances their sequence number. This is expected to
812
  // be empty normally. Thread-safety is provided with prepared_mutex_.
813
  std::set<uint64_t> delayed_prepared_;
814
  // Commit of a delayed prepared: 1) update commit cache, 2) update
815
  // delayed_prepared_commits_, 3) publish seq, 3) clean up delayed_prepared_.
816
  // delayed_prepared_commits_ will help us tell apart the unprepared txns from
817
  // the ones that are committed but not cleaned up yet.
818
  std::unordered_map<SequenceNumber, SequenceNumber> delayed_prepared_commits_;
819
  // Update when delayed_prepared_.empty() changes. Expected to be true
820
  // normally.
821
  std::atomic<bool> delayed_prepared_empty_ = {true};
822
  // Update when old_commit_map_.empty() changes. Expected to be true normally.
823
  std::atomic<bool> old_commit_map_empty_ = {true};
824
  mutable port::RWMutex prepared_mutex_;
825
  mutable port::RWMutex old_commit_map_mutex_;
826
  mutable port::RWMutex commit_cache_mutex_;
827
  mutable port::RWMutex snapshots_mutex_;
828
  // A cache of the cf comparators
829
  // Thread safety: since it is a const it is safe to read it concurrently
830
  std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
831
  // A cache of the cf handles
832
  // Thread safety: since the handle is read-only object it is a const it is
833
  // safe to read it concurrently
834
  std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
835
  // A dummy snapshot object that refers to kMaxSequenceNumber
836
  SnapshotImpl dummy_max_snapshot_;
837
};
838
839
class WritePreparedTxnReadCallback : public ReadCallback {
840
 public:
841
  WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
842
0
      : ReadCallback(snapshot),
843
0
        db_(db),
844
0
        backed_by_snapshot_(kBackedByDBSnapshot) {}
845
  WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
846
                               SequenceNumber min_uncommitted,
847
                               SnapshotBackup backed_by_snapshot)
848
0
      : ReadCallback(snapshot, min_uncommitted),
849
0
        db_(db),
850
0
        backed_by_snapshot_(backed_by_snapshot) {
851
0
    (void)backed_by_snapshot_;  // to silence unused private field warning
852
0
  }
853
854
0
  virtual ~WritePreparedTxnReadCallback() {
855
    // If it is not backed by snapshot, the caller must check validity
856
0
    assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
857
0
  }
858
859
  // Will be called to see if the seq number visible; if not it moves on to
860
  // the next seq number.
861
0
  inline bool IsVisibleFullCheck(SequenceNumber seq) override {
862
0
    auto snapshot = max_visible_seq_;
863
0
    bool snap_released = false;
864
0
    auto ret =
865
0
        db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &snap_released);
866
0
    assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
867
0
    snap_released_ |= snap_released;
868
0
    return ret;
869
0
  }
870
871
0
  inline bool valid() {
872
0
    valid_checked_ = true;
873
0
    return snap_released_ == false;
874
0
  }
875
876
  // TODO(myabandeh): override Refresh when Iterator::Refresh is supported
877
 private:
878
  WritePreparedTxnDB* db_;
879
  // Whether max_visible_seq_ is backed by a snapshot
880
  const SnapshotBackup backed_by_snapshot_;
881
  bool snap_released_ = false;
882
  // Safety check to ensure that the caller has checked invalid statuses
883
  bool valid_checked_ = false;
884
};
885
886
class AddPreparedCallback : public PreReleaseCallback {
887
 public:
888
  AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
889
                      size_t sub_batch_cnt, bool two_write_queues,
890
                      bool first_prepare_batch)
891
0
      : db_(db),
892
0
        db_impl_(db_impl),
893
0
        sub_batch_cnt_(sub_batch_cnt),
894
0
        two_write_queues_(two_write_queues),
895
0
        first_prepare_batch_(first_prepare_batch) {
896
0
    (void)two_write_queues_;  // to silence unused private field warning
897
0
  }
898
  Status Callback(SequenceNumber prepare_seq,
899
                  bool is_mem_disabled __attribute__((__unused__)),
900
0
                  uint64_t log_number, size_t index, size_t total) override {
901
0
    assert(index < total);
902
    // To reduce the cost of lock acquisition competing with the concurrent
903
    // prepare requests, lock on the first callback and unlock on the last.
904
0
    const bool do_lock = !two_write_queues_ || index == 0;
905
0
    const bool do_unlock = !two_write_queues_ || index + 1 == total;
906
    // Always Prepare from the main queue
907
0
    assert(!two_write_queues_ || !is_mem_disabled);  // implies the 1st queue
908
0
    TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
909
0
    TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
910
0
    if (do_lock) {
911
0
      db_->prepared_txns_.push_pop_mutex()->Lock();
912
0
    }
913
0
    const bool kLocked = true;
914
0
    for (size_t i = 0; i < sub_batch_cnt_; i++) {
915
0
      db_->AddPrepared(prepare_seq + i, kLocked);
916
0
    }
917
0
    if (do_unlock) {
918
0
      db_->prepared_txns_.push_pop_mutex()->Unlock();
919
0
    }
920
0
    TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
921
0
    if (first_prepare_batch_) {
922
0
      assert(log_number != 0);
923
0
      db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
924
0
          log_number);
925
0
    }
926
0
    return Status::OK();
927
0
  }
928
929
 private:
930
  WritePreparedTxnDB* db_;
931
  DBImpl* db_impl_;
932
  size_t sub_batch_cnt_;
933
  bool two_write_queues_;
934
  // It is 2PC and this is the first prepare batch. Always the case in 2PC
935
  // unless it is WriteUnPrepared.
936
  bool first_prepare_batch_;
937
};
938
939
class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
940
 public:
941
  // includes_data indicates that the commit also writes non-empty
942
  // CommitTimeWriteBatch to memtable, which needs to be committed separately.
943
  WritePreparedCommitEntryPreReleaseCallback(
944
      WritePreparedTxnDB* db, DBImpl* db_impl, SequenceNumber prep_seq,
945
      size_t prep_batch_cnt, size_t data_batch_cnt = 0,
946
      SequenceNumber aux_seq = kMaxSequenceNumber, size_t aux_batch_cnt = 0)
947
0
      : db_(db),
948
0
        db_impl_(db_impl),
949
0
        prep_seq_(prep_seq),
950
0
        prep_batch_cnt_(prep_batch_cnt),
951
0
        data_batch_cnt_(data_batch_cnt),
952
0
        includes_data_(data_batch_cnt_ > 0),
953
0
        aux_seq_(aux_seq),
954
0
        aux_batch_cnt_(aux_batch_cnt),
955
0
        includes_aux_batch_(aux_batch_cnt > 0) {
956
0
    assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber));  // xor
957
0
    assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
958
0
    assert((aux_batch_cnt_ > 0) != (aux_seq == kMaxSequenceNumber));  // xor
959
0
  }
960
961
  Status Callback(SequenceNumber commit_seq,
962
                  bool is_mem_disabled __attribute__((__unused__)), uint64_t,
963
0
                  size_t /*index*/, size_t /*total*/) override {
964
    // Always commit from the 2nd queue
965
0
    assert(!db_impl_->immutable_db_options().two_write_queues ||
966
0
           is_mem_disabled);
967
0
    assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
968
    // Data batch is what accompanied with the commit marker and affects the
969
    // last seq in the commit batch.
970
0
    const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
971
0
                                         ? commit_seq
972
0
                                         : commit_seq + data_batch_cnt_ - 1;
973
0
    if (prep_seq_ != kMaxSequenceNumber) {
974
0
      for (size_t i = 0; i < prep_batch_cnt_; i++) {
975
0
        db_->AddCommitted(prep_seq_ + i, last_commit_seq);
976
0
      }
977
0
    }  // else there was no prepare phase
978
0
    if (includes_aux_batch_) {
979
0
      for (size_t i = 0; i < aux_batch_cnt_; i++) {
980
0
        db_->AddCommitted(aux_seq_ + i, last_commit_seq);
981
0
      }
982
0
    }
983
0
    if (includes_data_) {
984
0
      assert(data_batch_cnt_);
985
      // Commit the data that is accompanied with the commit request
986
0
      for (size_t i = 0; i < data_batch_cnt_; i++) {
987
        // For commit seq of each batch use the commit seq of the last batch.
988
        // This would make debugging easier by having all the batches having
989
        // the same sequence number.
990
0
        db_->AddCommitted(commit_seq + i, last_commit_seq);
991
0
      }
992
0
    }
993
0
    if (db_impl_->immutable_db_options().two_write_queues) {
994
0
      assert(is_mem_disabled);  // implies the 2nd queue
995
      // Publish the sequence number. We can do that here assuming the callback
996
      // is invoked only from one write queue, which would guarantee that the
997
      // publish sequence numbers will be in order, i.e., once a seq is
998
      // published all the seq prior to that are also publishable.
999
0
      db_impl_->SetLastPublishedSequence(last_commit_seq);
1000
      // Note RemovePrepared should be called after publishing the seq.
1001
      // Otherwise SmallestUnCommittedSeq optimization breaks.
1002
0
      if (prep_seq_ != kMaxSequenceNumber) {
1003
0
        db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
1004
0
      }  // else there was no prepare phase
1005
0
      if (includes_aux_batch_) {
1006
0
        db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
1007
0
      }
1008
0
    }
1009
    // else SequenceNumber that is updated as part of the write already does the
1010
    // publishing
1011
0
    return Status::OK();
1012
0
  }
1013
1014
 private:
1015
  WritePreparedTxnDB* db_;
1016
  DBImpl* db_impl_;
1017
  // kMaxSequenceNumber if there was no prepare phase
1018
  SequenceNumber prep_seq_;
1019
  size_t prep_batch_cnt_;
1020
  size_t data_batch_cnt_;
1021
  // Data here is the batch that is written with the commit marker, either
1022
  // because it is commit without prepare or commit has a CommitTimeWriteBatch.
1023
  bool includes_data_;
1024
  // Auxiliary batch (if there is any) is a batch that is written before, but
1025
  // gets the same commit seq as prepare batch or data batch. This is used in
1026
  // two write queues where the CommitTimeWriteBatch becomes the aux batch and
1027
  // we do a separate write to actually commit everything.
1028
  SequenceNumber aux_seq_;
1029
  size_t aux_batch_cnt_;
1030
  bool includes_aux_batch_;
1031
};
1032
1033
// For two_write_queues commit both the aborted batch and the cleanup batch and
1034
// then published the seq
1035
class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
1036
 public:
1037
  WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
1038
                                          DBImpl* db_impl,
1039
                                          SequenceNumber prep_seq,
1040
                                          SequenceNumber rollback_seq,
1041
                                          size_t prep_batch_cnt)
1042
0
      : db_(db),
1043
0
        db_impl_(db_impl),
1044
0
        prep_seq_(prep_seq),
1045
0
        rollback_seq_(rollback_seq),
1046
0
        prep_batch_cnt_(prep_batch_cnt) {
1047
0
    assert(prep_seq != kMaxSequenceNumber);
1048
0
    assert(rollback_seq != kMaxSequenceNumber);
1049
0
    assert(prep_batch_cnt_ > 0);
1050
0
  }
1051
1052
  Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
1053
0
                  size_t /*index*/, size_t /*total*/) override {
1054
    // Always commit from the 2nd queue
1055
0
    assert(is_mem_disabled);  // implies the 2nd queue
1056
0
    assert(db_impl_->immutable_db_options().two_write_queues);
1057
0
#ifdef NDEBUG
1058
0
    (void)is_mem_disabled;
1059
0
#endif
1060
0
    const uint64_t last_commit_seq = commit_seq;
1061
0
    db_->AddCommitted(rollback_seq_, last_commit_seq);
1062
0
    for (size_t i = 0; i < prep_batch_cnt_; i++) {
1063
0
      db_->AddCommitted(prep_seq_ + i, last_commit_seq);
1064
0
    }
1065
0
    db_impl_->SetLastPublishedSequence(last_commit_seq);
1066
0
    return Status::OK();
1067
0
  }
1068
1069
 private:
1070
  WritePreparedTxnDB* db_;
1071
  DBImpl* db_impl_;
1072
  SequenceNumber prep_seq_;
1073
  SequenceNumber rollback_seq_;
1074
  size_t prep_batch_cnt_;
1075
};
1076
1077
// Count the number of sub-batches inside a batch. A sub-batch does not have
1078
// duplicate keys.
1079
struct SubBatchCounter : public WriteBatch::Handler {
1080
  explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
1081
0
      : comparators_(comparators), batches_(1) {}
1082
  std::map<uint32_t, const Comparator*>& comparators_;
1083
  using CFKeys = std::set<Slice, SetComparator>;
1084
  std::map<uint32_t, CFKeys> keys_;
1085
  size_t batches_;
1086
0
  size_t BatchCount() { return batches_; }
1087
  void AddKey(const uint32_t cf, const Slice& key);
1088
  void InitWithComp(const uint32_t cf);
1089
0
  Status MarkNoop(bool) override { return Status::OK(); }
1090
0
  Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
1091
0
  Status MarkCommit(const Slice&) override { return Status::OK(); }
1092
0
  Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
1093
0
    AddKey(cf, key);
1094
0
    return Status::OK();
1095
0
  }
1096
0
  Status DeleteCF(uint32_t cf, const Slice& key) override {
1097
0
    AddKey(cf, key);
1098
0
    return Status::OK();
1099
0
  }
1100
0
  Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
1101
0
    AddKey(cf, key);
1102
0
    return Status::OK();
1103
0
  }
1104
0
  Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
1105
0
    AddKey(cf, key);
1106
0
    return Status::OK();
1107
0
  }
1108
0
  Status MarkBeginPrepare(bool) override { return Status::OK(); }
1109
0
  Status MarkRollback(const Slice&) override { return Status::OK(); }
1110
0
  Handler::OptionState WriteAfterCommit() const override {
1111
0
    return Handler::OptionState::kDisabled;
1112
0
  }
1113
};
1114
1115
SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,
1116
                                                    SequenceNumber* min,
1117
0
                                                    SequenceNumber* max) {
1118
0
  if (snapshot != nullptr) {
1119
0
    *min =
1120
0
        static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
1121
0
    *max = static_cast_with_check<const SnapshotImpl>(snapshot)->number_;
1122
    // A duplicate of the check in EnhanceSnapshot().
1123
0
    assert(*min <= *max + 1);
1124
0
    return kBackedByDBSnapshot;
1125
0
  } else {
1126
0
    *min = SmallestUnCommittedSeq();
1127
0
    *max = 0;  // to be assigned later after sv is referenced.
1128
0
    return kUnbackedByDBSnapshot;
1129
0
  }
1130
0
}
1131
1132
bool WritePreparedTxnDB::ValidateSnapshot(
1133
    const SequenceNumber snap_seq, const SnapshotBackup backed_by_snapshot,
1134
0
    std::memory_order order) {
1135
0
  if (backed_by_snapshot == kBackedByDBSnapshot) {
1136
0
    return true;
1137
0
  } else {
1138
0
    SequenceNumber max = max_evicted_seq_.load(order);
1139
    // Validate that max has not advanced the snapshot seq that is not backed
1140
    // by a real snapshot. This is a very rare case that should not happen in
1141
    // real workloads.
1142
0
    if (UNLIKELY(snap_seq <= max && snap_seq != 0)) {
1143
0
      return false;
1144
0
    }
1145
0
  }
1146
0
  return true;
1147
0
}
1148
1149
}  // namespace ROCKSDB_NAMESPACE