Coverage Report

Created: 2025-11-15 07:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/src/transaction/duck_transaction.cpp
Line
Count
Source
1
#include "duckdb/transaction/duck_transaction.hpp"
2
#include "duckdb/transaction/duck_transaction_manager.hpp"
3
#include "duckdb/main/client_context.hpp"
4
#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
5
#include "duckdb/common/exception.hpp"
6
#include "duckdb/parser/column_definition.hpp"
7
#include "duckdb/storage/data_table.hpp"
8
#include "duckdb/storage/write_ahead_log.hpp"
9
#include "duckdb/storage/storage_manager.hpp"
10
11
#include "duckdb/transaction/append_info.hpp"
12
#include "duckdb/transaction/delete_info.hpp"
13
#include "duckdb/transaction/update_info.hpp"
14
#include "duckdb/transaction/local_storage.hpp"
15
#include "duckdb/main/config.hpp"
16
#include "duckdb/storage/table/column_data.hpp"
17
#include "duckdb/main/client_data.hpp"
18
#include "duckdb/main/attached_database.hpp"
19
#include "duckdb/storage/storage_lock.hpp"
20
#include "duckdb/storage/table/data_table_info.hpp"
21
#include "duckdb/storage/table/scan_state.hpp"
22
23
namespace duckdb {
24
25
TransactionData::TransactionData(DuckTransaction &transaction_p) // NOLINT
26
124
    : transaction(&transaction_p), transaction_id(transaction_p.transaction_id), start_time(transaction_p.start_time) {
27
124
}
28
TransactionData::TransactionData(transaction_t transaction_id_p, transaction_t start_time_p)
29
381
    : transaction(nullptr), transaction_id(transaction_id_p), start_time(start_time_p) {
30
381
}
31
32
DuckTransaction::DuckTransaction(DuckTransactionManager &manager, ClientContext &context_p, transaction_t start_time,
33
                                 transaction_t transaction_id, idx_t catalog_version_p)
34
216k
    : Transaction(manager, context_p), start_time(start_time), transaction_id(transaction_id), commit_id(0),
35
216k
      catalog_version(catalog_version_p), awaiting_cleanup(false), transaction_manager(manager),
36
216k
      undo_buffer(*this, context_p), storage(make_uniq<LocalStorage>(context_p, *this)) {
37
216k
}
38
39
216k
DuckTransaction::~DuckTransaction() {
40
216k
}
41
42
9.66k
DuckTransaction &DuckTransaction::Get(ClientContext &context, AttachedDatabase &db) {
43
9.66k
  return DuckTransaction::Get(context, db.GetCatalog());
44
9.66k
}
45
46
9.66k
DuckTransaction &DuckTransaction::Get(ClientContext &context, Catalog &catalog) {
47
9.66k
  auto &transaction = Transaction::Get(context, catalog);
48
9.66k
  if (!transaction.IsDuckTransaction()) {
49
0
    throw InternalException("DuckTransaction::Get called on non-DuckDB transaction");
50
0
  }
51
9.66k
  return transaction.Cast<DuckTransaction>();
52
9.66k
}
53
54
257
LocalStorage &DuckTransaction::GetLocalStorage() {
55
257
  return *storage;
56
257
}
57
58
501
void DuckTransaction::PushCatalogEntry(CatalogEntry &entry, data_ptr_t extra_data, idx_t extra_data_size) {
59
501
  idx_t alloc_size = sizeof(CatalogEntry *);
60
501
  if (extra_data_size > 0) {
61
0
    alloc_size += extra_data_size + sizeof(idx_t);
62
0
  }
63
64
501
  auto undo_entry = undo_buffer.CreateEntry(UndoFlags::CATALOG_ENTRY, alloc_size);
65
501
  auto ptr = undo_entry.Ptr();
66
  // store the pointer to the catalog entry
67
501
  Store<CatalogEntry *>(&entry, ptr);
68
501
  if (extra_data_size > 0) {
69
    // copy the extra data behind the catalog entry pointer (if any)
70
0
    ptr += sizeof(CatalogEntry *);
71
    // first store the extra data size
72
0
    Store<idx_t>(extra_data_size, ptr);
73
0
    ptr += sizeof(idx_t);
74
    // then copy over the actual data
75
0
    memcpy(ptr, extra_data, extra_data_size);
76
0
  }
77
501
}
78
79
9.40k
void DuckTransaction::PushAttach(AttachedDatabase &db) {
80
9.40k
  auto undo_entry = undo_buffer.CreateEntry(UndoFlags::ATTACHED_DATABASE, sizeof(AttachedDatabase *));
81
9.40k
  auto ptr = undo_entry.Ptr();
82
  // store the pointer to the database
83
9.40k
  Store<CatalogEntry *>(&db, ptr);
84
9.40k
}
85
86
void DuckTransaction::PushDelete(DataTable &table, RowVersionManager &info, idx_t vector_idx, row_t rows[], idx_t count,
87
0
                                 idx_t base_row) {
88
0
  ModifyTable(table);
89
0
  bool is_consecutive = true;
90
  // check if the rows are consecutive
91
0
  for (idx_t i = 0; i < count; i++) {
92
0
    if (rows[i] != row_t(i)) {
93
0
      is_consecutive = false;
94
0
      break;
95
0
    }
96
0
  }
97
0
  idx_t alloc_size = sizeof(DeleteInfo);
98
0
  if (!is_consecutive) {
99
    // if rows are not consecutive we need to allocate row identifiers
100
0
    alloc_size += sizeof(uint16_t) * count;
101
0
  }
102
103
0
  auto undo_entry = undo_buffer.CreateEntry(UndoFlags::DELETE_TUPLE, alloc_size);
104
0
  auto delete_info = reinterpret_cast<DeleteInfo *>(undo_entry.Ptr());
105
0
  delete_info->version_info = &info;
106
0
  delete_info->vector_idx = vector_idx;
107
0
  delete_info->table = &table;
108
0
  delete_info->count = count;
109
0
  delete_info->base_row = base_row;
110
0
  delete_info->is_consecutive = is_consecutive;
111
0
  if (!is_consecutive) {
112
    // if rows are not consecutive
113
0
    auto delete_rows = delete_info->GetRows();
114
0
    for (idx_t i = 0; i < count; i++) {
115
0
      delete_rows[i] = NumericCast<uint16_t>(rows[i]);
116
0
    }
117
0
  }
118
0
}
119
120
118
void DuckTransaction::PushAppend(DataTable &table, idx_t start_row, idx_t row_count) {
121
118
  ModifyTable(table);
122
118
  auto undo_entry = undo_buffer.CreateEntry(UndoFlags::INSERT_TUPLE, sizeof(AppendInfo));
123
118
  auto append_info = reinterpret_cast<AppendInfo *>(undo_entry.Ptr());
124
118
  append_info->table = &table;
125
118
  append_info->start_row = start_row;
126
118
  append_info->count = row_count;
127
118
}
128
129
UndoBufferReference DuckTransaction::CreateUpdateInfo(idx_t type_size, DataTable &data_table, idx_t entries,
130
0
                                                      idx_t row_group_start) {
131
0
  idx_t alloc_size = UpdateInfo::GetAllocSize(type_size);
132
0
  auto undo_entry = undo_buffer.CreateEntry(UndoFlags::UPDATE_TUPLE, alloc_size);
133
0
  auto &update_info = UpdateInfo::Get(undo_entry);
134
0
  UpdateInfo::Initialize(update_info, data_table, transaction_id, row_group_start);
135
0
  return undo_entry;
136
0
}
137
138
0
void DuckTransaction::PushSequenceUsage(SequenceCatalogEntry &sequence, const SequenceData &data) {
139
0
  lock_guard<mutex> l(sequence_lock);
140
0
  auto entry = sequence_usage.find(sequence);
141
0
  if (entry == sequence_usage.end()) {
142
0
    auto undo_entry = undo_buffer.CreateEntry(UndoFlags::SEQUENCE_VALUE, sizeof(SequenceValue));
143
0
    auto sequence_info = reinterpret_cast<SequenceValue *>(undo_entry.Ptr());
144
0
    sequence_info->entry = &sequence;
145
0
    sequence_info->usage_count = data.usage_count;
146
0
    sequence_info->counter = data.counter;
147
0
    sequence_usage.emplace(sequence, *sequence_info);
148
0
  } else {
149
0
    auto &sequence_info = entry->second.get();
150
0
    D_ASSERT(RefersToSameObject(*sequence_info.entry, sequence));
151
0
    sequence_info.usage_count = data.usage_count;
152
0
    sequence_info.counter = data.counter;
153
0
  }
154
0
}
155
156
118
void DuckTransaction::ModifyTable(DataTable &tbl) {
157
118
  lock_guard<mutex> guard(modified_tables_lock);
158
118
  auto table_ref = reference<DataTable>(tbl);
159
118
  auto entry = modified_tables.find(table_ref);
160
118
  if (entry != modified_tables.end()) {
161
    // already exists
162
0
    return;
163
0
  }
164
118
  modified_tables.insert(make_pair(table_ref, tbl.shared_from_this()));
165
118
}
166
167
667k
bool DuckTransaction::ChangesMade() {
168
667k
  return undo_buffer.ChangesMade() || storage->ChangesMade();
169
667k
}
170
171
178k
UndoBufferProperties DuckTransaction::GetUndoProperties() {
172
178k
  auto properties = undo_buffer.GetProperties();
173
178k
  properties.estimated_size += storage->EstimatedSize();
174
178k
  return properties;
175
178k
}
176
177
169
bool DuckTransaction::AutomaticCheckpoint(AttachedDatabase &db, const UndoBufferProperties &properties) {
178
169
  if (!ChangesMade()) {
179
    // read-only transactions cannot trigger an automated checkpoint
180
1
    return false;
181
1
  }
182
168
  if (db.IsReadOnly()) {
183
    // when attaching a database in read-only mode we cannot checkpoint
184
    // note that attaching a database in read-only mode does NOT mean we never make changes
185
    // WAL replay can make changes to the database - but only in the in-memory copy of the
186
0
    return false;
187
0
  }
188
168
  auto &storage_manager = db.GetStorageManager();
189
168
  return storage_manager.AutomaticCheckpoint(properties.estimated_size);
190
168
}
191
192
178k
bool DuckTransaction::ShouldWriteToWAL(AttachedDatabase &db) {
193
178k
  if (!ChangesMade()) {
194
168k
    return false;
195
168k
  }
196
9.57k
  if (db.IsSystem()) {
197
9.40k
    return false;
198
9.40k
  }
199
168
  auto &storage_manager = db.GetStorageManager();
200
168
  auto log = storage_manager.GetWAL();
201
168
  if (!log) {
202
168
    return false;
203
168
  }
204
0
  return true;
205
168
}
206
207
0
ErrorData DuckTransaction::WriteToWAL(AttachedDatabase &db, unique_ptr<StorageCommitState> &commit_state) noexcept {
208
0
  ErrorData error_data;
209
0
  try {
210
0
    D_ASSERT(ShouldWriteToWAL(db));
211
0
    auto &storage_manager = db.GetStorageManager();
212
0
    auto wal = storage_manager.GetWAL();
213
0
    commit_state = storage_manager.GenStorageCommitState(*wal);
214
0
    storage->Commit(commit_state.get());
215
0
    undo_buffer.WriteToWAL(*wal, commit_state.get());
216
0
    if (commit_state->HasRowGroupData()) {
217
      // if we have optimistically written any data AND we are writing to the WAL, we have written references to
218
      // optimistically written blocks
219
      // hence we need to ensure those optimistically written blocks are persisted
220
0
      storage_manager.GetBlockManager().FileSync();
221
0
    }
222
0
  } catch (std::exception &ex) {
223
    // Call RevertCommit() outside this try-catch as it itself may throw
224
0
    error_data = ErrorData(ex);
225
0
  }
226
227
0
  if (commit_state && error_data.HasError()) {
228
0
    try {
229
0
      commit_state->RevertCommit();
230
0
      commit_state.reset();
231
0
    } catch (std::exception &) {
232
      // Ignore this error. If we fail to RevertCommit(), just return the original exception
233
0
    }
234
0
  }
235
236
0
  return error_data;
237
0
}
238
239
ErrorData DuckTransaction::Commit(AttachedDatabase &db, transaction_t new_commit_id,
240
178k
                                  unique_ptr<StorageCommitState> commit_state) noexcept {
241
  // "checkpoint" parameter indicates if the caller will checkpoint. If checkpoint ==
242
  //    true: Then this function will NOT write to the WAL or flush/persist.
243
  //          This method only makes commit in memory, expecting caller to checkpoint/flush.
244
  //    false: Then this function WILL write to the WAL and Flush/Persist it.
245
178k
  this->commit_id = new_commit_id;
246
178k
  if (!ChangesMade()) {
247
    // no need to flush anything if we made no changes
248
168k
    return ErrorData();
249
168k
  }
250
9.57k
  D_ASSERT(db.IsSystem() || db.IsTemporary() || !IsReadOnly());
251
252
9.57k
  UndoBuffer::IteratorState iterator_state;
253
9.57k
  try {
254
9.57k
    storage->Commit(commit_state.get());
255
9.57k
    undo_buffer.Commit(iterator_state, commit_id);
256
9.57k
    if (commit_state) {
257
      // if we have written to the WAL - flush after the commit has been successful
258
0
      commit_state->FlushCommit();
259
0
    }
260
9.57k
    return ErrorData();
261
9.57k
  } catch (std::exception &ex) {
262
0
    undo_buffer.RevertCommit(iterator_state, this->transaction_id);
263
0
    if (commit_state) {
264
      // if we have written to the WAL - truncate the WAL on failure
265
0
      commit_state->RevertCommit();
266
0
    }
267
0
    return ErrorData(ex);
268
0
  }
269
9.57k
}
270
271
38.3k
ErrorData DuckTransaction::Rollback() {
272
38.3k
  try {
273
38.3k
    storage->Rollback();
274
38.3k
    undo_buffer.Rollback();
275
38.3k
    return ErrorData();
276
38.3k
  } catch (std::exception &ex) {
277
0
    return ErrorData(ex);
278
0
  }
279
38.3k
}
280
281
9.57k
void DuckTransaction::Cleanup(transaction_t lowest_active_transaction) {
282
9.57k
  undo_buffer.Cleanup(lowest_active_transaction);
283
9.57k
}
284
285
200
void DuckTransaction::SetReadWrite() {
286
200
  Transaction::SetReadWrite();
287
  // obtain a shared checkpoint lock to prevent concurrent checkpoints while this transaction is running
288
200
  write_lock = transaction_manager.SharedCheckpointLock();
289
200
}
290
291
0
unique_ptr<StorageLockKey> DuckTransaction::TryGetCheckpointLock() {
292
0
  if (!write_lock) {
293
0
    throw InternalException("TryUpgradeCheckpointLock - but thread has no shared lock!?");
294
0
  }
295
0
  return transaction_manager.TryUpgradeCheckpointLock(*write_lock);
296
0
}
297
298
0
shared_ptr<CheckpointLock> DuckTransaction::SharedLockTable(DataTableInfo &info) {
299
0
  unique_lock<mutex> transaction_lock(active_locks_lock);
300
0
  auto entry = active_locks.find(info);
301
0
  if (entry == active_locks.end()) {
302
0
    entry = active_locks.insert(entry, make_pair(std::ref(info), make_uniq<ActiveTableLock>()));
303
0
  }
304
0
  auto &active_table_lock = *entry->second;
305
0
  transaction_lock.unlock(); // release transaction-level lock before acquiring table-level lock
306
0
  lock_guard<mutex> table_lock(active_table_lock.checkpoint_lock_mutex);
307
0
  auto checkpoint_lock = active_table_lock.checkpoint_lock.lock();
308
  // check if it is expired (or has never been acquired yet)
309
0
  if (checkpoint_lock) {
310
    // not expired - return it
311
0
    return checkpoint_lock;
312
0
  }
313
  // no existing lock - obtain it
314
0
  checkpoint_lock = make_shared_ptr<CheckpointLock>(info.GetSharedLock());
315
  // store it for future reference
316
0
  active_table_lock.checkpoint_lock = checkpoint_lock;
317
0
  return checkpoint_lock;
318
0
}
319
320
} // namespace duckdb