/src/duckdb/src/transaction/duck_transaction.cpp
Line | Count | Source |
1 | | #include "duckdb/transaction/duck_transaction.hpp" |
2 | | #include "duckdb/transaction/duck_transaction_manager.hpp" |
3 | | #include "duckdb/main/client_context.hpp" |
4 | | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
5 | | #include "duckdb/common/exception.hpp" |
6 | | #include "duckdb/parser/column_definition.hpp" |
7 | | #include "duckdb/storage/data_table.hpp" |
8 | | #include "duckdb/storage/write_ahead_log.hpp" |
9 | | #include "duckdb/storage/storage_manager.hpp" |
10 | | |
11 | | #include "duckdb/transaction/append_info.hpp" |
12 | | #include "duckdb/transaction/delete_info.hpp" |
13 | | #include "duckdb/transaction/update_info.hpp" |
14 | | #include "duckdb/transaction/local_storage.hpp" |
15 | | #include "duckdb/main/config.hpp" |
16 | | #include "duckdb/storage/table/column_data.hpp" |
17 | | #include "duckdb/main/client_data.hpp" |
18 | | #include "duckdb/main/attached_database.hpp" |
19 | | #include "duckdb/storage/storage_lock.hpp" |
20 | | #include "duckdb/storage/table/data_table_info.hpp" |
21 | | #include "duckdb/storage/table/scan_state.hpp" |
22 | | |
23 | | namespace duckdb { |
24 | | |
25 | | TransactionData::TransactionData(DuckTransaction &transaction_p) // NOLINT |
26 | 124 | : transaction(&transaction_p), transaction_id(transaction_p.transaction_id), start_time(transaction_p.start_time) { |
27 | 124 | } |
28 | | TransactionData::TransactionData(transaction_t transaction_id_p, transaction_t start_time_p) |
29 | 381 | : transaction(nullptr), transaction_id(transaction_id_p), start_time(start_time_p) { |
30 | 381 | } |
31 | | |
32 | | DuckTransaction::DuckTransaction(DuckTransactionManager &manager, ClientContext &context_p, transaction_t start_time, |
33 | | transaction_t transaction_id, idx_t catalog_version_p) |
34 | 216k | : Transaction(manager, context_p), start_time(start_time), transaction_id(transaction_id), commit_id(0), |
35 | 216k | catalog_version(catalog_version_p), awaiting_cleanup(false), transaction_manager(manager), |
36 | 216k | undo_buffer(*this, context_p), storage(make_uniq<LocalStorage>(context_p, *this)) { |
37 | 216k | } |
38 | | |
39 | 216k | DuckTransaction::~DuckTransaction() { |
40 | 216k | } |
41 | | |
42 | 9.66k | DuckTransaction &DuckTransaction::Get(ClientContext &context, AttachedDatabase &db) { |
43 | 9.66k | return DuckTransaction::Get(context, db.GetCatalog()); |
44 | 9.66k | } |
45 | | |
46 | 9.66k | DuckTransaction &DuckTransaction::Get(ClientContext &context, Catalog &catalog) { |
47 | 9.66k | auto &transaction = Transaction::Get(context, catalog); |
48 | 9.66k | if (!transaction.IsDuckTransaction()) { |
49 | 0 | throw InternalException("DuckTransaction::Get called on non-DuckDB transaction"); |
50 | 0 | } |
51 | 9.66k | return transaction.Cast<DuckTransaction>(); |
52 | 9.66k | } |
53 | | |
54 | 257 | LocalStorage &DuckTransaction::GetLocalStorage() { |
55 | 257 | return *storage; |
56 | 257 | } |
57 | | |
58 | 501 | void DuckTransaction::PushCatalogEntry(CatalogEntry &entry, data_ptr_t extra_data, idx_t extra_data_size) { |
59 | 501 | idx_t alloc_size = sizeof(CatalogEntry *); |
60 | 501 | if (extra_data_size > 0) { |
61 | 0 | alloc_size += extra_data_size + sizeof(idx_t); |
62 | 0 | } |
63 | | |
64 | 501 | auto undo_entry = undo_buffer.CreateEntry(UndoFlags::CATALOG_ENTRY, alloc_size); |
65 | 501 | auto ptr = undo_entry.Ptr(); |
66 | | // store the pointer to the catalog entry |
67 | 501 | Store<CatalogEntry *>(&entry, ptr); |
68 | 501 | if (extra_data_size > 0) { |
69 | | // copy the extra data behind the catalog entry pointer (if any) |
70 | 0 | ptr += sizeof(CatalogEntry *); |
71 | | // first store the extra data size |
72 | 0 | Store<idx_t>(extra_data_size, ptr); |
73 | 0 | ptr += sizeof(idx_t); |
74 | | // then copy over the actual data |
75 | 0 | memcpy(ptr, extra_data, extra_data_size); |
76 | 0 | } |
77 | 501 | } |
78 | | |
79 | 9.40k | void DuckTransaction::PushAttach(AttachedDatabase &db) { |
80 | 9.40k | auto undo_entry = undo_buffer.CreateEntry(UndoFlags::ATTACHED_DATABASE, sizeof(AttachedDatabase *)); |
81 | 9.40k | auto ptr = undo_entry.Ptr(); |
82 | | // store the pointer to the database |
83 | 9.40k | Store<CatalogEntry *>(&db, ptr); |
84 | 9.40k | } |
85 | | |
86 | | void DuckTransaction::PushDelete(DataTable &table, RowVersionManager &info, idx_t vector_idx, row_t rows[], idx_t count, |
87 | 0 | idx_t base_row) { |
88 | 0 | ModifyTable(table); |
89 | 0 | bool is_consecutive = true; |
90 | | // check if the rows are consecutive |
91 | 0 | for (idx_t i = 0; i < count; i++) { |
92 | 0 | if (rows[i] != row_t(i)) { |
93 | 0 | is_consecutive = false; |
94 | 0 | break; |
95 | 0 | } |
96 | 0 | } |
97 | 0 | idx_t alloc_size = sizeof(DeleteInfo); |
98 | 0 | if (!is_consecutive) { |
99 | | // if rows are not consecutive we need to allocate row identifiers |
100 | 0 | alloc_size += sizeof(uint16_t) * count; |
101 | 0 | } |
102 | |
|
103 | 0 | auto undo_entry = undo_buffer.CreateEntry(UndoFlags::DELETE_TUPLE, alloc_size); |
104 | 0 | auto delete_info = reinterpret_cast<DeleteInfo *>(undo_entry.Ptr()); |
105 | 0 | delete_info->version_info = &info; |
106 | 0 | delete_info->vector_idx = vector_idx; |
107 | 0 | delete_info->table = &table; |
108 | 0 | delete_info->count = count; |
109 | 0 | delete_info->base_row = base_row; |
110 | 0 | delete_info->is_consecutive = is_consecutive; |
111 | 0 | if (!is_consecutive) { |
112 | | // if rows are not consecutive |
113 | 0 | auto delete_rows = delete_info->GetRows(); |
114 | 0 | for (idx_t i = 0; i < count; i++) { |
115 | 0 | delete_rows[i] = NumericCast<uint16_t>(rows[i]); |
116 | 0 | } |
117 | 0 | } |
118 | 0 | } |
119 | | |
120 | 118 | void DuckTransaction::PushAppend(DataTable &table, idx_t start_row, idx_t row_count) { |
121 | 118 | ModifyTable(table); |
122 | 118 | auto undo_entry = undo_buffer.CreateEntry(UndoFlags::INSERT_TUPLE, sizeof(AppendInfo)); |
123 | 118 | auto append_info = reinterpret_cast<AppendInfo *>(undo_entry.Ptr()); |
124 | 118 | append_info->table = &table; |
125 | 118 | append_info->start_row = start_row; |
126 | 118 | append_info->count = row_count; |
127 | 118 | } |
128 | | |
129 | | UndoBufferReference DuckTransaction::CreateUpdateInfo(idx_t type_size, DataTable &data_table, idx_t entries, |
130 | 0 | idx_t row_group_start) { |
131 | 0 | idx_t alloc_size = UpdateInfo::GetAllocSize(type_size); |
132 | 0 | auto undo_entry = undo_buffer.CreateEntry(UndoFlags::UPDATE_TUPLE, alloc_size); |
133 | 0 | auto &update_info = UpdateInfo::Get(undo_entry); |
134 | 0 | UpdateInfo::Initialize(update_info, data_table, transaction_id, row_group_start); |
135 | 0 | return undo_entry; |
136 | 0 | } |
137 | | |
138 | 0 | void DuckTransaction::PushSequenceUsage(SequenceCatalogEntry &sequence, const SequenceData &data) { |
139 | 0 | lock_guard<mutex> l(sequence_lock); |
140 | 0 | auto entry = sequence_usage.find(sequence); |
141 | 0 | if (entry == sequence_usage.end()) { |
142 | 0 | auto undo_entry = undo_buffer.CreateEntry(UndoFlags::SEQUENCE_VALUE, sizeof(SequenceValue)); |
143 | 0 | auto sequence_info = reinterpret_cast<SequenceValue *>(undo_entry.Ptr()); |
144 | 0 | sequence_info->entry = &sequence; |
145 | 0 | sequence_info->usage_count = data.usage_count; |
146 | 0 | sequence_info->counter = data.counter; |
147 | 0 | sequence_usage.emplace(sequence, *sequence_info); |
148 | 0 | } else { |
149 | 0 | auto &sequence_info = entry->second.get(); |
150 | 0 | D_ASSERT(RefersToSameObject(*sequence_info.entry, sequence)); |
151 | 0 | sequence_info.usage_count = data.usage_count; |
152 | 0 | sequence_info.counter = data.counter; |
153 | 0 | } |
154 | 0 | } |
155 | | |
156 | 118 | void DuckTransaction::ModifyTable(DataTable &tbl) { |
157 | 118 | lock_guard<mutex> guard(modified_tables_lock); |
158 | 118 | auto table_ref = reference<DataTable>(tbl); |
159 | 118 | auto entry = modified_tables.find(table_ref); |
160 | 118 | if (entry != modified_tables.end()) { |
161 | | // already exists |
162 | 0 | return; |
163 | 0 | } |
164 | 118 | modified_tables.insert(make_pair(table_ref, tbl.shared_from_this())); |
165 | 118 | } |
166 | | |
167 | 667k | bool DuckTransaction::ChangesMade() { |
168 | 667k | return undo_buffer.ChangesMade() || storage->ChangesMade(); |
169 | 667k | } |
170 | | |
171 | 178k | UndoBufferProperties DuckTransaction::GetUndoProperties() { |
172 | 178k | auto properties = undo_buffer.GetProperties(); |
173 | 178k | properties.estimated_size += storage->EstimatedSize(); |
174 | 178k | return properties; |
175 | 178k | } |
176 | | |
177 | 169 | bool DuckTransaction::AutomaticCheckpoint(AttachedDatabase &db, const UndoBufferProperties &properties) { |
178 | 169 | if (!ChangesMade()) { |
179 | | // read-only transactions cannot trigger an automated checkpoint |
180 | 1 | return false; |
181 | 1 | } |
182 | 168 | if (db.IsReadOnly()) { |
183 | | // when attaching a database in read-only mode we cannot checkpoint |
184 | | // note that attaching a database in read-only mode does NOT mean we never make changes |
185 | | // WAL replay can make changes to the database - but only in the in-memory copy of the |
186 | 0 | return false; |
187 | 0 | } |
188 | 168 | auto &storage_manager = db.GetStorageManager(); |
189 | 168 | return storage_manager.AutomaticCheckpoint(properties.estimated_size); |
190 | 168 | } |
191 | | |
192 | 178k | bool DuckTransaction::ShouldWriteToWAL(AttachedDatabase &db) { |
193 | 178k | if (!ChangesMade()) { |
194 | 168k | return false; |
195 | 168k | } |
196 | 9.57k | if (db.IsSystem()) { |
197 | 9.40k | return false; |
198 | 9.40k | } |
199 | 168 | auto &storage_manager = db.GetStorageManager(); |
200 | 168 | auto log = storage_manager.GetWAL(); |
201 | 168 | if (!log) { |
202 | 168 | return false; |
203 | 168 | } |
204 | 0 | return true; |
205 | 168 | } |
206 | | |
207 | 0 | ErrorData DuckTransaction::WriteToWAL(AttachedDatabase &db, unique_ptr<StorageCommitState> &commit_state) noexcept { |
208 | 0 | ErrorData error_data; |
209 | 0 | try { |
210 | 0 | D_ASSERT(ShouldWriteToWAL(db)); |
211 | 0 | auto &storage_manager = db.GetStorageManager(); |
212 | 0 | auto wal = storage_manager.GetWAL(); |
213 | 0 | commit_state = storage_manager.GenStorageCommitState(*wal); |
214 | 0 | storage->Commit(commit_state.get()); |
215 | 0 | undo_buffer.WriteToWAL(*wal, commit_state.get()); |
216 | 0 | if (commit_state->HasRowGroupData()) { |
217 | | // if we have optimistically written any data AND we are writing to the WAL, we have written references to |
218 | | // optimistically written blocks |
219 | | // hence we need to ensure those optimistically written blocks are persisted |
220 | 0 | storage_manager.GetBlockManager().FileSync(); |
221 | 0 | } |
222 | 0 | } catch (std::exception &ex) { |
223 | | // Call RevertCommit() outside this try-catch as it itself may throw |
224 | 0 | error_data = ErrorData(ex); |
225 | 0 | } |
226 | |
|
227 | 0 | if (commit_state && error_data.HasError()) { |
228 | 0 | try { |
229 | 0 | commit_state->RevertCommit(); |
230 | 0 | commit_state.reset(); |
231 | 0 | } catch (std::exception &) { |
232 | | // Ignore this error. If we fail to RevertCommit(), just return the original exception |
233 | 0 | } |
234 | 0 | } |
235 | |
|
236 | 0 | return error_data; |
237 | 0 | } |
238 | | |
239 | | ErrorData DuckTransaction::Commit(AttachedDatabase &db, transaction_t new_commit_id, |
240 | 178k | unique_ptr<StorageCommitState> commit_state) noexcept { |
241 | | // "checkpoint" parameter indicates if the caller will checkpoint. If checkpoint == |
242 | | // true: Then this function will NOT write to the WAL or flush/persist. |
243 | | // This method only makes commit in memory, expecting caller to checkpoint/flush. |
244 | | // false: Then this function WILL write to the WAL and Flush/Persist it. |
245 | 178k | this->commit_id = new_commit_id; |
246 | 178k | if (!ChangesMade()) { |
247 | | // no need to flush anything if we made no changes |
248 | 168k | return ErrorData(); |
249 | 168k | } |
250 | 9.57k | D_ASSERT(db.IsSystem() || db.IsTemporary() || !IsReadOnly()); |
251 | | |
252 | 9.57k | UndoBuffer::IteratorState iterator_state; |
253 | 9.57k | try { |
254 | 9.57k | storage->Commit(commit_state.get()); |
255 | 9.57k | undo_buffer.Commit(iterator_state, commit_id); |
256 | 9.57k | if (commit_state) { |
257 | | // if we have written to the WAL - flush after the commit has been successful |
258 | 0 | commit_state->FlushCommit(); |
259 | 0 | } |
260 | 9.57k | return ErrorData(); |
261 | 9.57k | } catch (std::exception &ex) { |
262 | 0 | undo_buffer.RevertCommit(iterator_state, this->transaction_id); |
263 | 0 | if (commit_state) { |
264 | | // if we have written to the WAL - truncate the WAL on failure |
265 | 0 | commit_state->RevertCommit(); |
266 | 0 | } |
267 | 0 | return ErrorData(ex); |
268 | 0 | } |
269 | 9.57k | } |
270 | | |
271 | 38.3k | ErrorData DuckTransaction::Rollback() { |
272 | 38.3k | try { |
273 | 38.3k | storage->Rollback(); |
274 | 38.3k | undo_buffer.Rollback(); |
275 | 38.3k | return ErrorData(); |
276 | 38.3k | } catch (std::exception &ex) { |
277 | 0 | return ErrorData(ex); |
278 | 0 | } |
279 | 38.3k | } |
280 | | |
281 | 9.57k | void DuckTransaction::Cleanup(transaction_t lowest_active_transaction) { |
282 | 9.57k | undo_buffer.Cleanup(lowest_active_transaction); |
283 | 9.57k | } |
284 | | |
285 | 200 | void DuckTransaction::SetReadWrite() { |
286 | 200 | Transaction::SetReadWrite(); |
287 | | // obtain a shared checkpoint lock to prevent concurrent checkpoints while this transaction is running |
288 | 200 | write_lock = transaction_manager.SharedCheckpointLock(); |
289 | 200 | } |
290 | | |
291 | 0 | unique_ptr<StorageLockKey> DuckTransaction::TryGetCheckpointLock() { |
292 | 0 | if (!write_lock) { |
293 | 0 | throw InternalException("TryUpgradeCheckpointLock - but thread has no shared lock!?"); |
294 | 0 | } |
295 | 0 | return transaction_manager.TryUpgradeCheckpointLock(*write_lock); |
296 | 0 | } |
297 | | |
298 | 0 | shared_ptr<CheckpointLock> DuckTransaction::SharedLockTable(DataTableInfo &info) { |
299 | 0 | unique_lock<mutex> transaction_lock(active_locks_lock); |
300 | 0 | auto entry = active_locks.find(info); |
301 | 0 | if (entry == active_locks.end()) { |
302 | 0 | entry = active_locks.insert(entry, make_pair(std::ref(info), make_uniq<ActiveTableLock>())); |
303 | 0 | } |
304 | 0 | auto &active_table_lock = *entry->second; |
305 | 0 | transaction_lock.unlock(); // release transaction-level lock before acquiring table-level lock |
306 | 0 | lock_guard<mutex> table_lock(active_table_lock.checkpoint_lock_mutex); |
307 | 0 | auto checkpoint_lock = active_table_lock.checkpoint_lock.lock(); |
308 | | // check if it is expired (or has never been acquired yet) |
309 | 0 | if (checkpoint_lock) { |
310 | | // not expired - return it |
311 | 0 | return checkpoint_lock; |
312 | 0 | } |
313 | | // no existing lock - obtain it |
314 | 0 | checkpoint_lock = make_shared_ptr<CheckpointLock>(info.GetSharedLock()); |
315 | | // store it for future reference |
316 | 0 | active_table_lock.checkpoint_lock = checkpoint_lock; |
317 | 0 | return checkpoint_lock; |
318 | 0 | } |
319 | | |
320 | | } // namespace duckdb |