/src/duckdb/src/storage/table/row_group.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | #include "duckdb/storage/table/row_group.hpp" |
2 | | #include "duckdb/common/types/vector.hpp" |
3 | | #include "duckdb/common/exception.hpp" |
4 | | #include "duckdb/storage/table/column_data.hpp" |
5 | | #include "duckdb/storage/table/column_checkpoint_state.hpp" |
6 | | #include "duckdb/storage/table/update_segment.hpp" |
7 | | #include "duckdb/storage/table_storage_info.hpp" |
8 | | #include "duckdb/planner/table_filter.hpp" |
9 | | #include "duckdb/execution/expression_executor.hpp" |
10 | | #include "duckdb/storage/checkpoint/table_data_writer.hpp" |
11 | | #include "duckdb/storage/metadata/metadata_reader.hpp" |
12 | | #include "duckdb/transaction/duck_transaction_manager.hpp" |
13 | | #include "duckdb/main/database.hpp" |
14 | | #include "duckdb/main/attached_database.hpp" |
15 | | #include "duckdb/transaction/duck_transaction.hpp" |
16 | | #include "duckdb/storage/table/append_state.hpp" |
17 | | #include "duckdb/storage/table/scan_state.hpp" |
18 | | #include "duckdb/storage/table/row_version_manager.hpp" |
19 | | #include "duckdb/common/serializer/serializer.hpp" |
20 | | #include "duckdb/common/serializer/deserializer.hpp" |
21 | | #include "duckdb/common/serializer/binary_serializer.hpp" |
22 | | #include "duckdb/planner/filter/conjunction_filter.hpp" |
23 | | #include "duckdb/planner/filter/struct_filter.hpp" |
24 | | #include "duckdb/planner/filter/optional_filter.hpp" |
25 | | #include "duckdb/execution/adaptive_filter.hpp" |
26 | | |
27 | | namespace duckdb { |
28 | | |
29 | | RowGroup::RowGroup(RowGroupCollection &collection_p, idx_t start, idx_t count) |
30 | 201 | : SegmentBase<RowGroup>(start, count), collection(collection_p), version_info(nullptr), allocation_size(0) { |
31 | 201 | Verify(); |
32 | 201 | } |
33 | | |
34 | | RowGroup::RowGroup(RowGroupCollection &collection_p, RowGroupPointer pointer) |
35 | 0 | : SegmentBase<RowGroup>(pointer.row_start, pointer.tuple_count), collection(collection_p), version_info(nullptr), |
36 | 0 | allocation_size(0) { |
37 | | // deserialize the columns |
38 | 0 | if (pointer.data_pointers.size() != collection_p.GetTypes().size()) { |
39 | 0 | throw IOException("Row group column count is unaligned with table column count. Corrupt file?"); |
40 | 0 | } |
41 | 0 | this->column_pointers = std::move(pointer.data_pointers); |
42 | 0 | this->columns.resize(column_pointers.size()); |
43 | 0 | this->is_loaded = unique_ptr<atomic<bool>[]>(new atomic<bool>[columns.size()]); |
44 | 0 | for (idx_t c = 0; c < columns.size(); c++) { |
45 | 0 | this->is_loaded[c] = false; |
46 | 0 | } |
47 | 0 | this->deletes_pointers = std::move(pointer.deletes_pointers); |
48 | 0 | this->deletes_is_loaded = false; |
49 | |
|
50 | 0 | Verify(); |
51 | 0 | } |
52 | | |
53 | | RowGroup::RowGroup(RowGroupCollection &collection_p, PersistentRowGroupData &data) |
54 | 0 | : SegmentBase<RowGroup>(data.start, data.count), collection(collection_p), version_info(nullptr), |
55 | 0 | allocation_size(0) { |
56 | 0 | auto &block_manager = GetBlockManager(); |
57 | 0 | auto &info = GetTableInfo(); |
58 | 0 | auto &types = collection.get().GetTypes(); |
59 | 0 | columns.reserve(types.size()); |
60 | 0 | for (idx_t c = 0; c < types.size(); c++) { |
61 | 0 | auto entry = ColumnData::CreateColumn(block_manager, info, c, data.start, types[c], nullptr); |
62 | 0 | entry->InitializeColumn(data.column_data[c]); |
63 | 0 | columns.push_back(std::move(entry)); |
64 | 0 | } |
65 | |
|
66 | 0 | Verify(); |
67 | 0 | } |
68 | | |
69 | 192 | void RowGroup::MoveToCollection(RowGroupCollection &collection_p, idx_t new_start) { |
70 | 192 | this->collection = collection_p; |
71 | 192 | this->start = new_start; |
72 | 2.61k | for (auto &column : GetColumns()) { |
73 | 2.61k | column->SetStart(new_start); |
74 | 2.61k | } |
75 | 192 | if (!HasUnloadedDeletes()) { |
76 | 192 | auto vinfo = GetVersionInfo(); |
77 | 192 | if (vinfo) { |
78 | 192 | vinfo->SetStart(new_start); |
79 | 192 | } |
80 | 192 | } |
81 | 192 | } |
82 | | |
83 | 201 | RowGroup::~RowGroup() { |
84 | 201 | } |
85 | | |
86 | 192 | vector<shared_ptr<ColumnData>> &RowGroup::GetColumns() { |
87 | | // ensure all columns are loaded |
88 | 2.80k | for (idx_t c = 0; c < GetColumnCount(); c++) { |
89 | 2.61k | GetColumn(c); |
90 | 2.61k | } |
91 | 192 | return columns; |
92 | 192 | } |
93 | | |
94 | 8.92k | idx_t RowGroup::GetColumnCount() const { |
95 | 8.92k | return columns.size(); |
96 | 8.92k | } |
97 | | |
98 | 201 | idx_t RowGroup::GetRowGroupSize() const { |
99 | 201 | return collection.get().GetRowGroupSize(); |
100 | 201 | } |
101 | | |
102 | 54 | ColumnData &RowGroup::GetColumn(const StorageIndex &c) { |
103 | 54 | return GetColumn(c.GetPrimaryIndex()); |
104 | 54 | } |
105 | | |
106 | 10.6k | ColumnData &RowGroup::GetColumn(storage_t c) { |
107 | 10.6k | D_ASSERT(c < columns.size()); |
108 | 10.6k | if (!is_loaded) { |
109 | | // not being lazy loaded |
110 | 10.6k | D_ASSERT(columns[c]); |
111 | 10.6k | return *columns[c]; |
112 | 10.6k | } |
113 | 0 | if (is_loaded[c]) { |
114 | 0 | D_ASSERT(columns[c]); |
115 | 0 | return *columns[c]; |
116 | 0 | } |
117 | 0 | lock_guard<mutex> l(row_group_lock); |
118 | 0 | if (columns[c]) { |
119 | 0 | D_ASSERT(is_loaded[c]); |
120 | 0 | return *columns[c]; |
121 | 0 | } |
122 | 0 | if (column_pointers.size() != columns.size()) { |
123 | 0 | throw InternalException("Lazy loading a column but the pointer was not set"); |
124 | 0 | } |
125 | 0 | auto &metadata_manager = GetCollection().GetMetadataManager(); |
126 | 0 | auto &types = GetCollection().GetTypes(); |
127 | 0 | auto &block_pointer = column_pointers[c]; |
128 | 0 | MetadataReader column_data_reader(metadata_manager, block_pointer); |
129 | 0 | this->columns[c] = |
130 | 0 | ColumnData::Deserialize(GetBlockManager(), GetTableInfo(), c, start, column_data_reader, types[c]); |
131 | 0 | is_loaded[c] = true; |
132 | 0 | if (this->columns[c]->count != this->count) { |
133 | 0 | throw InternalException("Corrupted database - loaded column with index %llu at row start %llu, count %llu did " |
134 | 0 | "not match count of row group %llu", |
135 | 0 | c, start, this->columns[c]->count.load(), this->count.load()); |
136 | 0 | } |
137 | 0 | return *columns[c]; |
138 | 0 | } |
139 | | |
140 | 2.65k | BlockManager &RowGroup::GetBlockManager() { |
141 | 2.65k | return GetCollection().GetBlockManager(); |
142 | 2.65k | } |
143 | 2.65k | DataTableInfo &RowGroup::GetTableInfo() { |
144 | 2.65k | return GetCollection().GetTableInfo(); |
145 | 2.65k | } |
146 | | |
147 | 201 | void RowGroup::InitializeEmpty(const vector<LogicalType> &types) { |
148 | | // set up the segment trees for the column segments |
149 | 201 | D_ASSERT(columns.empty()); |
150 | 2.85k | for (idx_t i = 0; i < types.size(); i++) { |
151 | 2.65k | auto column_data = ColumnData::CreateColumn(GetBlockManager(), GetTableInfo(), i, start, types[i]); |
152 | 2.65k | columns.push_back(std::move(column_data)); |
153 | 2.65k | } |
154 | 201 | } |
155 | | |
156 | | void ColumnScanState::Initialize(const LogicalType &type, const vector<StorageIndex> &children, |
157 | 27 | optional_ptr<TableScanOptions> options) { |
158 | | // Register the options in the state |
159 | 27 | scan_options = options; |
160 | | |
161 | 27 | if (type.id() == LogicalTypeId::VALIDITY) { |
162 | | // validity - nothing to initialize |
163 | 0 | return; |
164 | 0 | } |
165 | 27 | if (type.InternalType() == PhysicalType::STRUCT) { |
166 | | // validity + struct children |
167 | 0 | auto &struct_children = StructType::GetChildTypes(type); |
168 | 0 | child_states.resize(struct_children.size() + 1); |
169 | |
|
170 | 0 | if (children.empty()) { |
171 | | // scan all struct children |
172 | 0 | scan_child_column.resize(struct_children.size(), true); |
173 | 0 | for (idx_t i = 0; i < struct_children.size(); i++) { |
174 | 0 | child_states[i + 1].Initialize(struct_children[i].second, options); |
175 | 0 | } |
176 | 0 | } else { |
177 | | // only scan the specified subset of columns |
178 | 0 | scan_child_column.resize(struct_children.size(), false); |
179 | 0 | for (idx_t i = 0; i < children.size(); i++) { |
180 | 0 | auto &child = children[i]; |
181 | 0 | auto index = child.GetPrimaryIndex(); |
182 | 0 | auto &child_indexes = child.GetChildIndexes(); |
183 | 0 | scan_child_column[index] = true; |
184 | 0 | child_states[index + 1].Initialize(struct_children[index].second, child_indexes, options); |
185 | 0 | } |
186 | 0 | } |
187 | 0 | child_states[0].scan_options = options; |
188 | 27 | } else if (type.InternalType() == PhysicalType::LIST) { |
189 | | // validity + list child |
190 | 0 | child_states.resize(2); |
191 | 0 | child_states[1].Initialize(ListType::GetChildType(type), options); |
192 | 0 | child_states[0].scan_options = options; |
193 | 27 | } else if (type.InternalType() == PhysicalType::ARRAY) { |
194 | | // validity + array child |
195 | 0 | child_states.resize(2); |
196 | 0 | child_states[0].scan_options = options; |
197 | 0 | child_states[1].Initialize(ArrayType::GetChildType(type), options); |
198 | 27 | } else { |
199 | | // validity |
200 | 27 | child_states.resize(1); |
201 | 27 | child_states[0].scan_options = options; |
202 | 27 | } |
203 | 27 | } |
204 | | |
205 | 0 | void ColumnScanState::Initialize(const LogicalType &type, optional_ptr<TableScanOptions> options) { |
206 | 0 | vector<StorageIndex> children; |
207 | 0 | Initialize(type, children, options); |
208 | 0 | } |
209 | | |
210 | 7 | void CollectionScanState::Initialize(const vector<LogicalType> &types) { |
211 | 7 | auto &column_ids = GetColumnIds(); |
212 | 7 | column_scans = make_unsafe_uniq_array<ColumnScanState>(column_ids.size()); |
213 | 34 | for (idx_t i = 0; i < column_ids.size(); i++) { |
214 | 27 | if (column_ids[i].IsRowIdColumn()) { |
215 | 0 | continue; |
216 | 0 | } |
217 | 27 | auto col_id = column_ids[i].GetPrimaryIndex(); |
218 | 27 | column_scans[i].Initialize(types[col_id], column_ids[i].GetChildIndexes(), &GetOptions()); |
219 | 27 | } |
220 | 7 | } |
221 | | |
222 | 0 | bool RowGroup::InitializeScanWithOffset(CollectionScanState &state, idx_t vector_offset) { |
223 | 0 | auto &column_ids = state.GetColumnIds(); |
224 | 0 | auto &filters = state.GetFilterInfo(); |
225 | 0 | if (!CheckZonemap(filters)) { |
226 | 0 | return false; |
227 | 0 | } |
228 | | |
229 | 0 | state.row_group = this; |
230 | 0 | state.vector_index = vector_offset; |
231 | 0 | state.max_row_group_row = |
232 | 0 | this->start > state.max_row ? 0 : MinValue<idx_t>(this->count, state.max_row - this->start); |
233 | 0 | auto row_number = start + vector_offset * STANDARD_VECTOR_SIZE; |
234 | 0 | if (state.max_row_group_row == 0) { |
235 | | // exceeded row groups to scan |
236 | 0 | return false; |
237 | 0 | } |
238 | 0 | D_ASSERT(state.column_scans); |
239 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
240 | 0 | const auto &column = column_ids[i]; |
241 | 0 | if (!column.IsRowIdColumn()) { |
242 | 0 | auto &column_data = GetColumn(column); |
243 | 0 | column_data.InitializeScanWithOffset(state.column_scans[i], row_number); |
244 | 0 | state.column_scans[i].scan_options = &state.GetOptions(); |
245 | 0 | } else { |
246 | 0 | state.column_scans[i].current = nullptr; |
247 | 0 | } |
248 | 0 | } |
249 | 0 | return true; |
250 | 0 | } |
251 | | |
252 | 7 | bool RowGroup::InitializeScan(CollectionScanState &state) { |
253 | 7 | auto &column_ids = state.GetColumnIds(); |
254 | 7 | auto &filters = state.GetFilterInfo(); |
255 | 7 | if (!CheckZonemap(filters)) { |
256 | 0 | return false; |
257 | 0 | } |
258 | 7 | state.row_group = this; |
259 | 7 | state.vector_index = 0; |
260 | 7 | state.max_row_group_row = |
261 | 7 | this->start > state.max_row ? 0 : MinValue<idx_t>(this->count, state.max_row - this->start); |
262 | 7 | if (state.max_row_group_row == 0) { |
263 | 0 | return false; |
264 | 0 | } |
265 | 7 | D_ASSERT(state.column_scans); |
266 | 34 | for (idx_t i = 0; i < column_ids.size(); i++) { |
267 | 27 | auto column = column_ids[i]; |
268 | 27 | if (!column.IsRowIdColumn()) { |
269 | 27 | auto &column_data = GetColumn(column); |
270 | 27 | column_data.InitializeScan(state.column_scans[i]); |
271 | 27 | state.column_scans[i].scan_options = &state.GetOptions(); |
272 | 27 | } else { |
273 | 0 | state.column_scans[i].current = nullptr; |
274 | 0 | } |
275 | 27 | } |
276 | 7 | return true; |
277 | 7 | } |
278 | | |
279 | | unique_ptr<RowGroup> RowGroup::AlterType(RowGroupCollection &new_collection, const LogicalType &target_type, |
280 | | idx_t changed_idx, ExpressionExecutor &executor, |
281 | 0 | CollectionScanState &scan_state, DataChunk &scan_chunk) { |
282 | 0 | Verify(); |
283 | | |
284 | | // construct a new column data for this type |
285 | 0 | auto column_data = ColumnData::CreateColumn(GetBlockManager(), GetTableInfo(), changed_idx, start, target_type); |
286 | |
|
287 | 0 | ColumnAppendState append_state; |
288 | 0 | column_data->InitializeAppend(append_state); |
289 | | |
290 | | // scan the original table, and fill the new column with the transformed value |
291 | 0 | scan_state.Initialize(GetCollection().GetTypes()); |
292 | 0 | InitializeScan(scan_state); |
293 | |
|
294 | 0 | DataChunk append_chunk; |
295 | 0 | vector<LogicalType> append_types; |
296 | 0 | append_types.push_back(target_type); |
297 | 0 | append_chunk.Initialize(Allocator::DefaultAllocator(), append_types); |
298 | 0 | auto &append_vector = append_chunk.data[0]; |
299 | 0 | while (true) { |
300 | | // scan the table |
301 | 0 | scan_chunk.Reset(); |
302 | 0 | ScanCommitted(scan_state, scan_chunk, TableScanType::TABLE_SCAN_COMMITTED_ROWS); |
303 | 0 | if (scan_chunk.size() == 0) { |
304 | 0 | break; |
305 | 0 | } |
306 | | // execute the expression |
307 | 0 | append_chunk.Reset(); |
308 | 0 | executor.ExecuteExpression(scan_chunk, append_vector); |
309 | 0 | column_data->Append(append_state, append_vector, scan_chunk.size()); |
310 | 0 | } |
311 | | |
312 | | // set up the row_group based on this row_group |
313 | 0 | auto row_group = make_uniq<RowGroup>(new_collection, this->start, this->count); |
314 | 0 | row_group->SetVersionInfo(GetOrCreateVersionInfoPtr()); |
315 | 0 | auto &cols = GetColumns(); |
316 | 0 | for (idx_t i = 0; i < cols.size(); i++) { |
317 | 0 | if (i == changed_idx) { |
318 | | // this is the altered column: use the new column |
319 | 0 | row_group->columns.push_back(std::move(column_data)); |
320 | 0 | column_data.reset(); |
321 | 0 | } else { |
322 | | // this column was not altered: use the data directly |
323 | 0 | row_group->columns.push_back(cols[i]); |
324 | 0 | } |
325 | 0 | } |
326 | 0 | row_group->Verify(); |
327 | 0 | return row_group; |
328 | 0 | } |
329 | | |
330 | | unique_ptr<RowGroup> RowGroup::AddColumn(RowGroupCollection &new_collection, ColumnDefinition &new_column, |
331 | 0 | ExpressionExecutor &executor, Vector &result) { |
332 | 0 | Verify(); |
333 | | |
334 | | // construct a new column data for the new column |
335 | 0 | auto added_column = |
336 | 0 | ColumnData::CreateColumn(GetBlockManager(), GetTableInfo(), GetColumnCount(), start, new_column.Type()); |
337 | |
|
338 | 0 | idx_t rows_to_write = this->count; |
339 | 0 | if (rows_to_write > 0) { |
340 | 0 | DataChunk dummy_chunk; |
341 | |
|
342 | 0 | ColumnAppendState state; |
343 | 0 | added_column->InitializeAppend(state); |
344 | 0 | for (idx_t i = 0; i < rows_to_write; i += STANDARD_VECTOR_SIZE) { |
345 | 0 | idx_t rows_in_this_vector = MinValue<idx_t>(rows_to_write - i, STANDARD_VECTOR_SIZE); |
346 | 0 | dummy_chunk.SetCardinality(rows_in_this_vector); |
347 | 0 | executor.ExecuteExpression(dummy_chunk, result); |
348 | 0 | added_column->Append(state, result, rows_in_this_vector); |
349 | 0 | } |
350 | 0 | } |
351 | | |
352 | | // set up the row_group based on this row_group |
353 | 0 | auto row_group = make_uniq<RowGroup>(new_collection, this->start, this->count); |
354 | 0 | row_group->SetVersionInfo(GetOrCreateVersionInfoPtr()); |
355 | 0 | row_group->columns = GetColumns(); |
356 | | // now add the new column |
357 | 0 | row_group->columns.push_back(std::move(added_column)); |
358 | |
|
359 | 0 | row_group->Verify(); |
360 | 0 | return row_group; |
361 | 0 | } |
362 | | |
363 | 0 | unique_ptr<RowGroup> RowGroup::RemoveColumn(RowGroupCollection &new_collection, idx_t removed_column) { |
364 | 0 | Verify(); |
365 | |
|
366 | 0 | D_ASSERT(removed_column < columns.size()); |
367 | |
|
368 | 0 | auto row_group = make_uniq<RowGroup>(new_collection, this->start, this->count); |
369 | 0 | row_group->SetVersionInfo(GetOrCreateVersionInfoPtr()); |
370 | | // copy over all columns except for the removed one |
371 | 0 | auto &cols = GetColumns(); |
372 | 0 | for (idx_t i = 0; i < cols.size(); i++) { |
373 | 0 | if (i != removed_column) { |
374 | 0 | row_group->columns.push_back(cols[i]); |
375 | 0 | } |
376 | 0 | } |
377 | |
|
378 | 0 | row_group->Verify(); |
379 | 0 | return row_group; |
380 | 0 | } |
381 | | |
382 | 2 | void RowGroup::CommitDrop() { |
383 | 11 | for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) { |
384 | 9 | CommitDropColumn(column_idx); |
385 | 9 | } |
386 | 2 | } |
387 | | |
388 | 9 | void RowGroup::CommitDropColumn(const idx_t column_index) { |
389 | 9 | auto &column = GetColumn(column_index); |
390 | 9 | column.CommitDropColumn(); |
391 | 9 | } |
392 | | |
393 | 0 | void RowGroup::NextVector(CollectionScanState &state) { |
394 | 0 | state.vector_index++; |
395 | 0 | const auto &column_ids = state.GetColumnIds(); |
396 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
397 | 0 | const auto &column = column_ids[i]; |
398 | 0 | if (column.IsRowIdColumn()) { |
399 | 0 | continue; |
400 | 0 | } |
401 | 0 | GetColumn(column).Skip(state.column_scans[i]); |
402 | 0 | } |
403 | 0 | } |
404 | | |
405 | 0 | FilterPropagateResult RowGroup::CheckRowIdFilter(const TableFilter &filter, idx_t beg_row, idx_t end_row) { |
406 | | // RowId columns dont have a zonemap, but we can trivially create stats to check the filter against. |
407 | 0 | BaseStatistics dummy_stats = NumericStats::CreateEmpty(LogicalType::ROW_TYPE); |
408 | 0 | NumericStats::SetMin(dummy_stats, UnsafeNumericCast<row_t>(beg_row)); |
409 | 0 | NumericStats::SetMax(dummy_stats, UnsafeNumericCast<row_t>(end_row)); |
410 | |
|
411 | 0 | return filter.CheckStatistics(dummy_stats); |
412 | 0 | } |
413 | | |
414 | 7 | bool RowGroup::CheckZonemap(ScanFilterInfo &filters) { |
415 | 7 | auto &filter_list = filters.GetFilterList(); |
416 | | // new row group - label all filters as up for grabs again |
417 | 7 | filters.CheckAllFilters(); |
418 | 7 | for (idx_t i = 0; i < filter_list.size(); i++) { |
419 | 0 | auto &entry = filter_list[i]; |
420 | 0 | auto &filter = entry.filter; |
421 | 0 | auto base_column_index = entry.table_column_index; |
422 | |
|
423 | 0 | FilterPropagateResult prune_result; |
424 | |
|
425 | 0 | if (base_column_index == COLUMN_IDENTIFIER_ROW_ID) { |
426 | 0 | prune_result = CheckRowIdFilter(filter, this->start, this->start + this->count); |
427 | 0 | } else { |
428 | 0 | prune_result = GetColumn(base_column_index).CheckZonemap(filter); |
429 | 0 | } |
430 | |
|
431 | 0 | if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) { |
432 | 0 | return false; |
433 | 0 | } |
434 | 0 | if (filter.filter_type == TableFilterType::OPTIONAL_FILTER) { |
435 | | // these are only for row group checking, set as always true so we don't check it |
436 | 0 | filters.SetFilterAlwaysTrue(i); |
437 | 0 | } else if (prune_result == FilterPropagateResult::FILTER_ALWAYS_TRUE) { |
438 | | // filter is always true - no need to check it |
439 | | // label the filter as always true so we don't need to check it anymore |
440 | 0 | filters.SetFilterAlwaysTrue(i); |
441 | 0 | } |
442 | 0 | } |
443 | 7 | return true; |
444 | 7 | } |
445 | | |
446 | 7 | bool RowGroup::CheckZonemapSegments(CollectionScanState &state) { |
447 | 7 | auto &filters = state.GetFilterInfo(); |
448 | 7 | for (auto &entry : filters.GetFilterList()) { |
449 | 0 | if (entry.IsAlwaysTrue()) { |
450 | | // filter is always true - avoid checking |
451 | 0 | continue; |
452 | 0 | } |
453 | 0 | auto column_idx = entry.scan_column_index; |
454 | 0 | auto base_column_idx = entry.table_column_index; |
455 | 0 | auto &filter = entry.filter; |
456 | |
|
457 | 0 | FilterPropagateResult prune_result; |
458 | 0 | if (base_column_idx == COLUMN_IDENTIFIER_ROW_ID) { |
459 | 0 | prune_result = CheckRowIdFilter(filter, this->start, this->start + this->count); |
460 | 0 | } else { |
461 | 0 | prune_result = GetColumn(base_column_idx).CheckZonemap(state.column_scans[column_idx], filter); |
462 | 0 | } |
463 | |
|
464 | 0 | if (prune_result != FilterPropagateResult::FILTER_ALWAYS_FALSE) { |
465 | 0 | continue; |
466 | 0 | } |
467 | | |
468 | | // check zone map segment. |
469 | 0 | auto &column_scan_state = state.column_scans[column_idx]; |
470 | 0 | auto current_segment = column_scan_state.current; |
471 | 0 | if (!current_segment) { |
472 | | // no segment to skip |
473 | 0 | continue; |
474 | 0 | } |
475 | 0 | idx_t target_row = current_segment->start + current_segment->count; |
476 | 0 | if (target_row >= state.max_row) { |
477 | 0 | target_row = state.max_row; |
478 | 0 | } |
479 | |
|
480 | 0 | D_ASSERT(target_row >= this->start); |
481 | 0 | D_ASSERT(target_row <= this->start + this->count); |
482 | 0 | idx_t target_vector_index = (target_row - this->start) / STANDARD_VECTOR_SIZE; |
483 | 0 | if (state.vector_index == target_vector_index) { |
484 | | // we can't skip any full vectors because this segment contains less than a full vector |
485 | | // for now we just bail-out |
486 | | // FIXME: we could check if we can ALSO skip the next segments, in which case skipping a full vector |
487 | | // might be possible |
488 | | // we don't care that much though, since a single segment that fits less than a full vector is |
489 | | // exceedingly rare |
490 | 0 | return true; |
491 | 0 | } |
492 | 0 | while (state.vector_index < target_vector_index) { |
493 | 0 | NextVector(state); |
494 | 0 | } |
495 | 0 | return false; |
496 | 0 | } |
497 | | |
498 | 7 | return true; |
499 | 7 | } |
500 | | |
501 | | template <TableScanType TYPE> |
502 | 14 | void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &state, DataChunk &result) { |
503 | 14 | const bool ALLOW_UPDATES = TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES && |
504 | 14 | TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED; |
505 | 14 | const auto &column_ids = state.GetColumnIds(); |
506 | 14 | auto &filter_info = state.GetFilterInfo(); |
507 | 14 | while (true) { |
508 | 14 | if (state.vector_index * STANDARD_VECTOR_SIZE >= state.max_row_group_row) { |
509 | | // exceeded the amount of rows to scan |
510 | 7 | return; |
511 | 7 | } |
512 | 7 | idx_t current_row = state.vector_index * STANDARD_VECTOR_SIZE; |
513 | 7 | auto max_count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, state.max_row_group_row - current_row); |
514 | | |
515 | | // check the sampling info if we have to sample this chunk |
516 | 7 | if (state.GetSamplingInfo().do_system_sample && |
517 | 7 | state.random.NextRandom() > state.GetSamplingInfo().sample_rate) { |
518 | 0 | NextVector(state); |
519 | 0 | continue; |
520 | 0 | } |
521 | | |
522 | | //! first check the zonemap if we have to scan this partition |
523 | 7 | if (!CheckZonemapSegments(state)) { |
524 | 0 | continue; |
525 | 0 | } |
526 | | |
527 | | // second, scan the version chunk manager to figure out which tuples to load for this transaction |
528 | 7 | idx_t count; |
529 | 7 | if (TYPE == TableScanType::TABLE_SCAN_REGULAR) { |
530 | 7 | count = state.row_group->GetSelVector(transaction, state.vector_index, state.valid_sel, max_count); |
531 | 7 | if (count == 0) { |
532 | | // nothing to scan for this vector, skip the entire vector |
533 | 0 | NextVector(state); |
534 | 0 | continue; |
535 | 0 | } |
536 | 7 | } else if (TYPE == TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED) { |
537 | 0 | count = state.row_group->GetCommittedSelVector(transaction.start_time, transaction.transaction_id, |
538 | 0 | state.vector_index, state.valid_sel, max_count); |
539 | 0 | if (count == 0) { |
540 | | // nothing to scan for this vector, skip the entire vector |
541 | 0 | NextVector(state); |
542 | 0 | continue; |
543 | 0 | } |
544 | 0 | } else { |
545 | 0 | count = max_count; |
546 | 0 | } |
547 | 7 | auto &block_manager = GetBlockManager(); |
548 | 7 | #ifndef DUCKDB_ALTERNATIVE_VERIFY |
549 | | // // in regular operation we only prefetch from remote file systems |
550 | | // // when alternative verify is set, we always prefetch for testing purposes |
551 | 7 | if (block_manager.IsRemote()) |
552 | | #else |
553 | | if (!block_manager.InMemory()) |
554 | | #endif |
555 | 0 | { |
556 | 0 | PrefetchState prefetch_state; |
557 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
558 | 0 | const auto &column = column_ids[i]; |
559 | 0 | if (!column.IsRowIdColumn()) { |
560 | 0 | GetColumn(column).InitializePrefetch(prefetch_state, state.column_scans[i], max_count); |
561 | 0 | } |
562 | 0 | } |
563 | 0 | auto &buffer_manager = block_manager.buffer_manager; |
564 | 0 | buffer_manager.Prefetch(prefetch_state.blocks); |
565 | 0 | } |
566 | | |
567 | 7 | bool has_filters = filter_info.HasFilters(); |
568 | 7 | if (count == max_count && !has_filters) { |
569 | | // scan all vectors completely: full scan without deletions or table filters |
570 | 34 | for (idx_t i = 0; i < column_ids.size(); i++) { |
571 | 27 | const auto &column = column_ids[i]; |
572 | 27 | if (column.IsRowIdColumn()) { |
573 | | // scan row id |
574 | 0 | D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE); |
575 | 0 | result.data[i].Sequence(UnsafeNumericCast<int64_t>(this->start + current_row), 1, count); |
576 | 27 | } else { |
577 | 27 | auto &col_data = GetColumn(column); |
578 | 27 | if (TYPE != TableScanType::TABLE_SCAN_REGULAR) { |
579 | 0 | col_data.ScanCommitted(state.vector_index, state.column_scans[i], result.data[i], |
580 | 0 | ALLOW_UPDATES); |
581 | 27 | } else { |
582 | 27 | col_data.Scan(transaction, state.vector_index, state.column_scans[i], result.data[i]); |
583 | 27 | } |
584 | 27 | } |
585 | 27 | } |
586 | 7 | } else { |
587 | | // partial scan: we have deletions or table filters |
588 | 0 | idx_t approved_tuple_count = count; |
589 | 0 | SelectionVector sel; |
590 | 0 | if (count != max_count) { |
591 | 0 | sel.Initialize(state.valid_sel); |
592 | 0 | } else { |
593 | 0 | sel.Initialize(nullptr); |
594 | 0 | } |
595 | | //! first, we scan the columns with filters, fetch their data and generate a selection vector. |
596 | | //! get runtime statistics |
597 | 0 | auto adaptive_filter = filter_info.GetAdaptiveFilter(); |
598 | 0 | auto filter_state = filter_info.BeginFilter(); |
599 | 0 | if (has_filters) { |
600 | 0 | D_ASSERT(ALLOW_UPDATES); |
601 | 0 | auto &filter_list = filter_info.GetFilterList(); |
602 | 0 | for (idx_t i = 0; i < filter_list.size(); i++) { |
603 | 0 | auto filter_idx = adaptive_filter->permutation[i]; |
604 | 0 | auto &filter = filter_list[filter_idx]; |
605 | 0 | if (filter.IsAlwaysTrue()) { |
606 | | // this filter is always true - skip it |
607 | 0 | continue; |
608 | 0 | } |
609 | 0 | auto &table_filter_state = *filter.filter_state; |
610 | |
|
611 | 0 | const auto scan_idx = filter.scan_column_index; |
612 | 0 | const auto column_idx = filter.table_column_index; |
613 | |
|
614 | 0 | auto &result_vector = result.data[scan_idx]; |
615 | 0 | if (column_idx == COLUMN_IDENTIFIER_ROW_ID) { |
616 | | // We do another quick statistics scan for row ids here |
617 | 0 | const auto rowid_start = this->start + current_row; |
618 | 0 | const auto rowid_end = this->start + current_row + max_count; |
619 | 0 | const auto prune_result = CheckRowIdFilter(filter.filter, rowid_start, rowid_end); |
620 | 0 | if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) { |
621 | | // We can just break out of the loop here. |
622 | 0 | approved_tuple_count = 0; |
623 | 0 | continue; |
624 | 0 | } |
625 | | |
626 | | // Generate row ids |
627 | | // Create sequence for row ids |
628 | 0 | D_ASSERT(result_vector.GetType().InternalType() == ROW_TYPE); |
629 | 0 | result_vector.SetVectorType(VectorType::FLAT_VECTOR); |
630 | 0 | auto result_data = FlatVector::GetData<int64_t>(result_vector); |
631 | 0 | for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) { |
632 | 0 | result_data[sel.get_index(sel_idx)] = |
633 | 0 | UnsafeNumericCast<int64_t>(this->start + current_row + sel.get_index(sel_idx)); |
634 | 0 | } |
635 | | |
636 | | // Was this filter always true? If so, we dont need to apply it |
637 | 0 | if (prune_result == FilterPropagateResult::FILTER_ALWAYS_TRUE) { |
638 | 0 | continue; |
639 | 0 | } |
640 | | |
641 | | // Now apply the filter |
642 | 0 | UnifiedVectorFormat vdata; |
643 | 0 | result_vector.ToUnifiedFormat(approved_tuple_count, vdata); |
644 | 0 | ColumnSegment::FilterSelection(sel, result_vector, vdata, filter.filter, table_filter_state, |
645 | 0 | approved_tuple_count, approved_tuple_count); |
646 | |
|
647 | 0 | } else { |
648 | 0 | auto &col_data = GetColumn(filter.table_column_index); |
649 | 0 | col_data.Filter(transaction, state.vector_index, state.column_scans[scan_idx], result_vector, |
650 | 0 | sel, approved_tuple_count, filter.filter, table_filter_state); |
651 | 0 | } |
652 | 0 | } |
653 | 0 | for (auto &table_filter : filter_list) { |
654 | 0 | if (table_filter.IsAlwaysTrue()) { |
655 | 0 | continue; |
656 | 0 | } |
657 | 0 | result.data[table_filter.scan_column_index].Slice(sel, approved_tuple_count); |
658 | 0 | } |
659 | 0 | } |
660 | 0 | if (approved_tuple_count == 0) { |
661 | | // all rows were filtered out by the table filters |
662 | 0 | D_ASSERT(has_filters); |
663 | 0 | result.Reset(); |
664 | | // skip this vector in all the scans that were not scanned yet |
665 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
666 | 0 | auto &col_idx = column_ids[i]; |
667 | 0 | if (col_idx.IsRowIdColumn()) { |
668 | 0 | continue; |
669 | 0 | } |
670 | 0 | if (has_filters && filter_info.ColumnHasFilters(i)) { |
671 | 0 | continue; |
672 | 0 | } |
673 | 0 | auto &col_data = GetColumn(col_idx); |
674 | 0 | col_data.Skip(state.column_scans[i]); |
675 | 0 | } |
676 | 0 | state.vector_index++; |
677 | 0 | continue; |
678 | 0 | } |
679 | | //! Now we use the selection vector to fetch data for the other columns. |
680 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
681 | 0 | if (has_filters && filter_info.ColumnHasFilters(i)) { |
682 | | // column has already been scanned as part of the filtering process |
683 | 0 | continue; |
684 | 0 | } |
685 | 0 | auto &column = column_ids[i]; |
686 | 0 | if (column.IsRowIdColumn()) { |
687 | 0 | D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE); |
688 | 0 | result.data[i].SetVectorType(VectorType::FLAT_VECTOR); |
689 | 0 | auto result_data = FlatVector::GetData<int64_t>(result.data[i]); |
690 | 0 | for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) { |
691 | 0 | result_data[sel_idx] = |
692 | 0 | UnsafeNumericCast<int64_t>(this->start + current_row + sel.get_index(sel_idx)); |
693 | 0 | } |
694 | 0 | } else { |
695 | 0 | auto &col_data = GetColumn(column); |
696 | 0 | if (TYPE == TableScanType::TABLE_SCAN_REGULAR) { |
697 | 0 | col_data.Select(transaction, state.vector_index, state.column_scans[i], result.data[i], sel, |
698 | 0 | approved_tuple_count); |
699 | 0 | } else { |
700 | 0 | col_data.SelectCommitted(state.vector_index, state.column_scans[i], result.data[i], sel, |
701 | 0 | approved_tuple_count, ALLOW_UPDATES); |
702 | 0 | } |
703 | 0 | } |
704 | 0 | } |
705 | 0 | filter_info.EndFilter(filter_state); |
706 | |
|
707 | 0 | D_ASSERT(approved_tuple_count > 0); |
708 | 0 | count = approved_tuple_count; |
709 | 0 | } |
710 | 7 | result.SetCardinality(count); |
711 | 7 | state.vector_index++; |
712 | 7 | break; |
713 | 7 | } |
714 | 14 | } void duckdb::RowGroup::TemplatedScan<(duckdb::TableScanType)0>(duckdb::TransactionData, duckdb::CollectionScanState&, duckdb::DataChunk&) Line | Count | Source | 502 | 14 | void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &state, DataChunk &result) { | 503 | 14 | const bool ALLOW_UPDATES = TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES && | 504 | 14 | TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED; | 505 | 14 | const auto &column_ids = state.GetColumnIds(); | 506 | 14 | auto &filter_info = state.GetFilterInfo(); | 507 | 14 | while (true) { | 508 | 14 | if (state.vector_index * STANDARD_VECTOR_SIZE >= state.max_row_group_row) { | 509 | | // exceeded the amount of rows to scan | 510 | 7 | return; | 511 | 7 | } | 512 | 7 | idx_t current_row = state.vector_index * STANDARD_VECTOR_SIZE; | 513 | 7 | auto max_count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, state.max_row_group_row - current_row); | 514 | | | 515 | | // check the sampling info if we have to sample this chunk | 516 | 7 | if (state.GetSamplingInfo().do_system_sample && | 517 | 7 | state.random.NextRandom() > state.GetSamplingInfo().sample_rate) { | 518 | 0 | NextVector(state); | 519 | 0 | continue; | 520 | 0 | } | 521 | | | 522 | | //! first check the zonemap if we have to scan this partition | 523 | 7 | if (!CheckZonemapSegments(state)) { | 524 | 0 | continue; | 525 | 0 | } | 526 | | | 527 | | // second, scan the version chunk manager to figure out which tuples to load for this transaction | 528 | 7 | idx_t count; | 529 | 7 | if (TYPE == TableScanType::TABLE_SCAN_REGULAR) { | 530 | 7 | count = state.row_group->GetSelVector(transaction, state.vector_index, state.valid_sel, max_count); | 531 | 7 | if (count == 0) { | 532 | | // nothing to scan for this vector, skip the entire vector | 533 | 0 | NextVector(state); | 534 | 0 | continue; | 535 | 0 | } | 536 | 7 | } else if (TYPE == TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED) { | 537 | 0 | count = state.row_group->GetCommittedSelVector(transaction.start_time, transaction.transaction_id, | 538 | 0 | state.vector_index, state.valid_sel, max_count); | 539 | 0 | if (count == 0) { | 540 | | // nothing to scan for this vector, skip the entire vector | 541 | 0 | NextVector(state); | 542 | 0 | continue; | 543 | 0 | } | 544 | 0 | } else { | 545 | 0 | count = max_count; | 546 | 0 | } | 547 | 7 | auto &block_manager = GetBlockManager(); | 548 | 7 | #ifndef DUCKDB_ALTERNATIVE_VERIFY | 549 | | // // in regular operation we only prefetch from remote file systems | 550 | | // // when alternative verify is set, we always prefetch for testing purposes | 551 | 7 | if (block_manager.IsRemote()) | 552 | | #else | 553 | | if (!block_manager.InMemory()) | 554 | | #endif | 555 | 0 | { | 556 | 0 | PrefetchState prefetch_state; | 557 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { | 558 | 0 | const auto &column = column_ids[i]; | 559 | 0 | if (!column.IsRowIdColumn()) { | 560 | 0 | GetColumn(column).InitializePrefetch(prefetch_state, state.column_scans[i], max_count); | 561 | 0 | } | 562 | 0 | } | 563 | 0 | auto &buffer_manager = block_manager.buffer_manager; | 564 | 0 | buffer_manager.Prefetch(prefetch_state.blocks); | 565 | 0 | } | 566 | | | 567 | 7 | bool has_filters = filter_info.HasFilters(); | 568 | 7 | if (count == max_count && !has_filters) { | 569 | | // scan all vectors completely: full scan without deletions or table filters | 570 | 34 | for (idx_t i = 0; i < column_ids.size(); i++) { | 571 | 27 | const auto &column = column_ids[i]; | 572 | 27 | if (column.IsRowIdColumn()) { | 573 | | // scan row id | 574 | 0 | D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE); | 575 | 0 | result.data[i].Sequence(UnsafeNumericCast<int64_t>(this->start + current_row), 1, count); | 576 | 27 | } else { | 577 | 27 | auto &col_data = GetColumn(column); | 578 | 27 | if (TYPE != TableScanType::TABLE_SCAN_REGULAR) { | 579 | 0 | col_data.ScanCommitted(state.vector_index, state.column_scans[i], result.data[i], | 580 | 0 | ALLOW_UPDATES); | 581 | 27 | } else { | 582 | 27 | col_data.Scan(transaction, state.vector_index, state.column_scans[i], result.data[i]); | 583 | 27 | } | 584 | 27 | } | 585 | 27 | } | 586 | 7 | } else { | 587 | | // partial scan: we have deletions or table filters | 588 | 0 | idx_t approved_tuple_count = count; | 589 | 0 | SelectionVector sel; | 590 | 0 | if (count != max_count) { | 591 | 0 | sel.Initialize(state.valid_sel); | 592 | 0 | } else { | 593 | 0 | sel.Initialize(nullptr); | 594 | 0 | } | 595 | | //! first, we scan the columns with filters, fetch their data and generate a selection vector. | 596 | | //! get runtime statistics | 597 | 0 | auto adaptive_filter = filter_info.GetAdaptiveFilter(); | 598 | 0 | auto filter_state = filter_info.BeginFilter(); | 599 | 0 | if (has_filters) { | 600 | 0 | D_ASSERT(ALLOW_UPDATES); | 601 | 0 | auto &filter_list = filter_info.GetFilterList(); | 602 | 0 | for (idx_t i = 0; i < filter_list.size(); i++) { | 603 | 0 | auto filter_idx = adaptive_filter->permutation[i]; | 604 | 0 | auto &filter = filter_list[filter_idx]; | 605 | 0 | if (filter.IsAlwaysTrue()) { | 606 | | // this filter is always true - skip it | 607 | 0 | continue; | 608 | 0 | } | 609 | 0 | auto &table_filter_state = *filter.filter_state; | 610 | |
| 611 | 0 | const auto scan_idx = filter.scan_column_index; | 612 | 0 | const auto column_idx = filter.table_column_index; | 613 | |
| 614 | 0 | auto &result_vector = result.data[scan_idx]; | 615 | 0 | if (column_idx == COLUMN_IDENTIFIER_ROW_ID) { | 616 | | // We do another quick statistics scan for row ids here | 617 | 0 | const auto rowid_start = this->start + current_row; | 618 | 0 | const auto rowid_end = this->start + current_row + max_count; | 619 | 0 | const auto prune_result = CheckRowIdFilter(filter.filter, rowid_start, rowid_end); | 620 | 0 | if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) { | 621 | | // We can just break out of the loop here. | 622 | 0 | approved_tuple_count = 0; | 623 | 0 | continue; | 624 | 0 | } | 625 | | | 626 | | // Generate row ids | 627 | | // Create sequence for row ids | 628 | 0 | D_ASSERT(result_vector.GetType().InternalType() == ROW_TYPE); | 629 | 0 | result_vector.SetVectorType(VectorType::FLAT_VECTOR); | 630 | 0 | auto result_data = FlatVector::GetData<int64_t>(result_vector); | 631 | 0 | for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) { | 632 | 0 | result_data[sel.get_index(sel_idx)] = | 633 | 0 | UnsafeNumericCast<int64_t>(this->start + current_row + sel.get_index(sel_idx)); | 634 | 0 | } | 635 | | | 636 | | // Was this filter always true? If so, we dont need to apply it | 637 | 0 | if (prune_result == FilterPropagateResult::FILTER_ALWAYS_TRUE) { | 638 | 0 | continue; | 639 | 0 | } | 640 | | | 641 | | // Now apply the filter | 642 | 0 | UnifiedVectorFormat vdata; | 643 | 0 | result_vector.ToUnifiedFormat(approved_tuple_count, vdata); | 644 | 0 | ColumnSegment::FilterSelection(sel, result_vector, vdata, filter.filter, table_filter_state, | 645 | 0 | approved_tuple_count, approved_tuple_count); | 646 | |
| 647 | 0 | } else { | 648 | 0 | auto &col_data = GetColumn(filter.table_column_index); | 649 | 0 | col_data.Filter(transaction, state.vector_index, state.column_scans[scan_idx], result_vector, | 650 | 0 | sel, approved_tuple_count, filter.filter, table_filter_state); | 651 | 0 | } | 652 | 0 | } | 653 | 0 | for (auto &table_filter : filter_list) { | 654 | 0 | if (table_filter.IsAlwaysTrue()) { | 655 | 0 | continue; | 656 | 0 | } | 657 | 0 | result.data[table_filter.scan_column_index].Slice(sel, approved_tuple_count); | 658 | 0 | } | 659 | 0 | } | 660 | 0 | if (approved_tuple_count == 0) { | 661 | | // all rows were filtered out by the table filters | 662 | 0 | D_ASSERT(has_filters); | 663 | 0 | result.Reset(); | 664 | | // skip this vector in all the scans that were not scanned yet | 665 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { | 666 | 0 | auto &col_idx = column_ids[i]; | 667 | 0 | if (col_idx.IsRowIdColumn()) { | 668 | 0 | continue; | 669 | 0 | } | 670 | 0 | if (has_filters && filter_info.ColumnHasFilters(i)) { | 671 | 0 | continue; | 672 | 0 | } | 673 | 0 | auto &col_data = GetColumn(col_idx); | 674 | 0 | col_data.Skip(state.column_scans[i]); | 675 | 0 | } | 676 | 0 | state.vector_index++; | 677 | 0 | continue; | 678 | 0 | } | 679 | | //! Now we use the selection vector to fetch data for the other columns. | 680 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { | 681 | 0 | if (has_filters && filter_info.ColumnHasFilters(i)) { | 682 | | // column has already been scanned as part of the filtering process | 683 | 0 | continue; | 684 | 0 | } | 685 | 0 | auto &column = column_ids[i]; | 686 | 0 | if (column.IsRowIdColumn()) { | 687 | 0 | D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE); | 688 | 0 | result.data[i].SetVectorType(VectorType::FLAT_VECTOR); | 689 | 0 | auto result_data = FlatVector::GetData<int64_t>(result.data[i]); | 690 | 0 | for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) { | 691 | 0 | result_data[sel_idx] = | 692 | 0 | UnsafeNumericCast<int64_t>(this->start + current_row + sel.get_index(sel_idx)); | 693 | 0 | } | 694 | 0 | } else { | 695 | 0 | auto &col_data = GetColumn(column); | 696 | 0 | if (TYPE == TableScanType::TABLE_SCAN_REGULAR) { | 697 | 0 | col_data.Select(transaction, state.vector_index, state.column_scans[i], result.data[i], sel, | 698 | 0 | approved_tuple_count); | 699 | 0 | } else { | 700 | 0 | col_data.SelectCommitted(state.vector_index, state.column_scans[i], result.data[i], sel, | 701 | 0 | approved_tuple_count, ALLOW_UPDATES); | 702 | 0 | } | 703 | 0 | } | 704 | 0 | } | 705 | 0 | filter_info.EndFilter(filter_state); | 706 | |
| 707 | 0 | D_ASSERT(approved_tuple_count > 0); | 708 | 0 | count = approved_tuple_count; | 709 | 0 | } | 710 | 7 | result.SetCardinality(count); | 711 | 7 | state.vector_index++; | 712 | 7 | break; | 713 | 7 | } | 714 | 14 | } |
Unexecuted instantiation: void duckdb::RowGroup::TemplatedScan<(duckdb::TableScanType)1>(duckdb::TransactionData, duckdb::CollectionScanState&, duckdb::DataChunk&) Unexecuted instantiation: void duckdb::RowGroup::TemplatedScan<(duckdb::TableScanType)2>(duckdb::TransactionData, duckdb::CollectionScanState&, duckdb::DataChunk&) Unexecuted instantiation: void duckdb::RowGroup::TemplatedScan<(duckdb::TableScanType)3>(duckdb::TransactionData, duckdb::CollectionScanState&, duckdb::DataChunk&) |
715 | | |
716 | 14 | void RowGroup::Scan(TransactionData transaction, CollectionScanState &state, DataChunk &result) { |
717 | 14 | TemplatedScan<TableScanType::TABLE_SCAN_REGULAR>(transaction, state, result); |
718 | 14 | } |
719 | | |
720 | 0 | void RowGroup::ScanCommitted(CollectionScanState &state, DataChunk &result, TableScanType type) { |
721 | 0 | auto &transaction_manager = DuckTransactionManager::Get(GetCollection().GetAttached()); |
722 | |
|
723 | 0 | transaction_t start_ts; |
724 | 0 | transaction_t transaction_id; |
725 | 0 | if (type == TableScanType::TABLE_SCAN_LATEST_COMMITTED_ROWS) { |
726 | 0 | start_ts = transaction_manager.GetLastCommit() + 1; |
727 | 0 | transaction_id = MAX_TRANSACTION_ID; |
728 | 0 | } else { |
729 | 0 | start_ts = transaction_manager.LowestActiveStart(); |
730 | 0 | transaction_id = transaction_manager.LowestActiveId(); |
731 | 0 | } |
732 | 0 | TransactionData data(transaction_id, start_ts); |
733 | 0 | switch (type) { |
734 | 0 | case TableScanType::TABLE_SCAN_COMMITTED_ROWS: |
735 | 0 | TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS>(data, state, result); |
736 | 0 | break; |
737 | 0 | case TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES: |
738 | 0 | TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES>(data, state, result); |
739 | 0 | break; |
740 | 0 | case TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED: |
741 | 0 | case TableScanType::TABLE_SCAN_LATEST_COMMITTED_ROWS: |
742 | 0 | TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED>(data, state, result); |
743 | 0 | break; |
744 | 0 | default: |
745 | 0 | throw InternalException("Unrecognized table scan type"); |
746 | 0 | } |
747 | 0 | } |
748 | | |
749 | 784 | optional_ptr<RowVersionManager> RowGroup::GetVersionInfo() { |
750 | 784 | if (!HasUnloadedDeletes()) { |
751 | | // deletes are loaded - return the version info |
752 | 784 | return version_info; |
753 | 784 | } |
754 | 0 | lock_guard<mutex> lock(row_group_lock); |
755 | | // double-check after obtaining the lock whether or not deletes are still not loaded to avoid double load |
756 | 0 | if (!HasUnloadedDeletes()) { |
757 | 0 | return version_info; |
758 | 0 | } |
759 | | // deletes are not loaded - reload |
760 | 0 | auto root_delete = deletes_pointers[0]; |
761 | 0 | auto loaded_info = RowVersionManager::Deserialize(root_delete, GetBlockManager().GetMetadataManager(), start); |
762 | 0 | SetVersionInfo(std::move(loaded_info)); |
763 | 0 | deletes_is_loaded = true; |
764 | 0 | return version_info; |
765 | 0 | } |
766 | | |
767 | 201 | void RowGroup::SetVersionInfo(shared_ptr<RowVersionManager> version) { |
768 | 201 | owned_version_info = std::move(version); |
769 | 201 | version_info = owned_version_info.get(); |
770 | 201 | } |
771 | | |
772 | 201 | shared_ptr<RowVersionManager> RowGroup::GetOrCreateVersionInfoInternal() { |
773 | | // version info does not exist - need to create it |
774 | 201 | lock_guard<mutex> lock(row_group_lock); |
775 | 201 | if (!owned_version_info) { |
776 | 201 | auto new_info = make_shared_ptr<RowVersionManager>(start); |
777 | 201 | SetVersionInfo(std::move(new_info)); |
778 | 201 | } |
779 | 201 | return owned_version_info; |
780 | 201 | } |
781 | | |
782 | 0 | shared_ptr<RowVersionManager> RowGroup::GetOrCreateVersionInfoPtr() { |
783 | 0 | auto vinfo = GetVersionInfo(); |
784 | 0 | if (vinfo) { |
785 | | // version info exists - return it directly |
786 | 0 | return owned_version_info; |
787 | 0 | } |
788 | 0 | return GetOrCreateVersionInfoInternal(); |
789 | 0 | } |
790 | | |
791 | 585 | RowVersionManager &RowGroup::GetOrCreateVersionInfo() { |
792 | 585 | auto vinfo = GetVersionInfo(); |
793 | 585 | if (vinfo) { |
794 | | // version info exists - return it directly |
795 | 384 | return *vinfo; |
796 | 384 | } |
797 | 201 | return *GetOrCreateVersionInfoInternal(); |
798 | 585 | } |
799 | | |
800 | | idx_t RowGroup::GetSelVector(TransactionData transaction, idx_t vector_idx, SelectionVector &sel_vector, |
801 | 7 | idx_t max_count) { |
802 | 7 | auto vinfo = GetVersionInfo(); |
803 | 7 | if (!vinfo) { |
804 | 0 | return max_count; |
805 | 0 | } |
806 | 7 | return vinfo->GetSelVector(transaction, vector_idx, sel_vector, max_count); |
807 | 7 | } |
808 | | |
809 | | idx_t RowGroup::GetCommittedSelVector(transaction_t start_time, transaction_t transaction_id, idx_t vector_idx, |
810 | 0 | SelectionVector &sel_vector, idx_t max_count) { |
811 | 0 | auto vinfo = GetVersionInfo(); |
812 | 0 | if (!vinfo) { |
813 | 0 | return max_count; |
814 | 0 | } |
815 | 0 | return vinfo->GetCommittedSelVector(start_time, transaction_id, vector_idx, sel_vector, max_count); |
816 | 0 | } |
817 | | |
818 | 0 | bool RowGroup::Fetch(TransactionData transaction, idx_t row) { |
819 | 0 | D_ASSERT(row < this->count); |
820 | 0 | auto vinfo = GetVersionInfo(); |
821 | 0 | if (!vinfo) { |
822 | 0 | return true; |
823 | 0 | } |
824 | 0 | return vinfo->Fetch(transaction, row); |
825 | 0 | } |
826 | | |
827 | | void RowGroup::FetchRow(TransactionData transaction, ColumnFetchState &state, const vector<StorageIndex> &column_ids, |
828 | 0 | row_t row_id, DataChunk &result, idx_t result_idx) { |
829 | 0 | for (idx_t col_idx = 0; col_idx < column_ids.size(); col_idx++) { |
830 | 0 | auto &column = column_ids[col_idx]; |
831 | 0 | auto &result_vector = result.data[col_idx]; |
832 | 0 | D_ASSERT(result_vector.GetVectorType() == VectorType::FLAT_VECTOR); |
833 | 0 | D_ASSERT(!FlatVector::IsNull(result_vector, result_idx)); |
834 | 0 | if (column.IsRowIdColumn()) { |
835 | | // row id column: fill in the row ids |
836 | 0 | D_ASSERT(result_vector.GetType().InternalType() == PhysicalType::INT64); |
837 | 0 | result_vector.SetVectorType(VectorType::FLAT_VECTOR); |
838 | 0 | auto data = FlatVector::GetData<row_t>(result_vector); |
839 | 0 | data[result_idx] = row_id; |
840 | 0 | } else { |
841 | | // regular column: fetch data from the base column |
842 | 0 | auto &col_data = GetColumn(column); |
843 | 0 | col_data.FetchRow(transaction, state, row_id, result_vector, result_idx); |
844 | 0 | } |
845 | 0 | } |
846 | 0 | } |
847 | | |
848 | 201 | void RowGroup::AppendVersionInfo(TransactionData transaction, idx_t count) { |
849 | 201 | const idx_t row_group_size = GetRowGroupSize(); |
850 | 201 | idx_t row_group_start = this->count.load(); |
851 | 201 | idx_t row_group_end = row_group_start + count; |
852 | 201 | if (row_group_end > row_group_size) { |
853 | 0 | row_group_end = row_group_size; |
854 | 0 | } |
855 | | // create the version_info if it doesn't exist yet |
856 | 201 | auto &vinfo = GetOrCreateVersionInfo(); |
857 | 201 | vinfo.AppendVersionInfo(transaction, count, row_group_start, row_group_end); |
858 | 201 | this->count = row_group_end; |
859 | 201 | } |
860 | | |
861 | 192 | void RowGroup::CommitAppend(transaction_t commit_id, idx_t row_group_start, idx_t count) { |
862 | 192 | auto &vinfo = GetOrCreateVersionInfo(); |
863 | 192 | vinfo.CommitAppend(commit_id, row_group_start, count); |
864 | 192 | } |
865 | | |
866 | 0 | void RowGroup::RevertAppend(idx_t row_group_start) { |
867 | 0 | auto &vinfo = GetOrCreateVersionInfo(); |
868 | 0 | vinfo.RevertAppend(row_group_start - this->start); |
869 | 0 | for (auto &column : columns) { |
870 | 0 | column->RevertAppend(UnsafeNumericCast<row_t>(row_group_start)); |
871 | 0 | } |
872 | 0 | this->count = MinValue<idx_t>(row_group_start - this->start, this->count); |
873 | 0 | Verify(); |
874 | 0 | } |
875 | | |
876 | 201 | void RowGroup::InitializeAppend(RowGroupAppendState &append_state) { |
877 | 201 | append_state.row_group = this; |
878 | 201 | append_state.offset_in_row_group = this->count; |
879 | | // for each column, initialize the append state |
880 | 201 | append_state.states = make_unsafe_uniq_array<ColumnAppendState>(GetColumnCount()); |
881 | 2.85k | for (idx_t i = 0; i < GetColumnCount(); i++) { |
882 | 2.65k | auto &col_data = GetColumn(i); |
883 | 2.65k | col_data.InitializeAppend(append_state.states[i]); |
884 | 2.65k | } |
885 | 201 | } |
886 | | |
887 | 201 | void RowGroup::Append(RowGroupAppendState &state, DataChunk &chunk, idx_t append_count) { |
888 | | // append to the current row_group |
889 | 201 | D_ASSERT(chunk.ColumnCount() == GetColumnCount()); |
890 | 2.85k | for (idx_t i = 0; i < GetColumnCount(); i++) { |
891 | 2.65k | auto &col_data = GetColumn(i); |
892 | 2.65k | auto prev_allocation_size = col_data.GetAllocationSize(); |
893 | 2.65k | col_data.Append(state.states[i], chunk.data[i], append_count); |
894 | 2.65k | allocation_size += col_data.GetAllocationSize() - prev_allocation_size; |
895 | 2.65k | } |
896 | 201 | state.offset_in_row_group += append_count; |
897 | 201 | } |
898 | | |
899 | 192 | void RowGroup::CleanupAppend(transaction_t lowest_transaction, idx_t start, idx_t count) { |
900 | 192 | auto &vinfo = GetOrCreateVersionInfo(); |
901 | 192 | vinfo.CleanupAppend(lowest_transaction, start, count); |
902 | 192 | } |
903 | | |
904 | | void RowGroup::Update(TransactionData transaction, DataChunk &update_chunk, row_t *ids, idx_t offset, idx_t count, |
905 | 0 | const vector<PhysicalIndex> &column_ids) { |
906 | | #ifdef DEBUG |
907 | | for (size_t i = offset; i < offset + count; i++) { |
908 | | D_ASSERT(ids[i] >= row_t(this->start) && ids[i] < row_t(this->start + this->count)); |
909 | | } |
910 | | #endif |
911 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
912 | 0 | auto column = column_ids[i]; |
913 | 0 | D_ASSERT(column.index != COLUMN_IDENTIFIER_ROW_ID); |
914 | 0 | auto &col_data = GetColumn(column.index); |
915 | 0 | D_ASSERT(col_data.type.id() == update_chunk.data[i].GetType().id()); |
916 | 0 | if (offset > 0) { |
917 | 0 | Vector sliced_vector(update_chunk.data[i], offset, offset + count); |
918 | 0 | sliced_vector.Flatten(count); |
919 | 0 | col_data.Update(transaction, column.index, sliced_vector, ids + offset, count); |
920 | 0 | } else { |
921 | 0 | col_data.Update(transaction, column.index, update_chunk.data[i], ids, count); |
922 | 0 | } |
923 | 0 | MergeStatistics(column.index, *col_data.GetUpdateStatistics()); |
924 | 0 | } |
925 | 0 | } |
926 | | |
927 | | void RowGroup::UpdateColumn(TransactionData transaction, DataChunk &updates, Vector &row_ids, |
928 | 0 | const vector<column_t> &column_path) { |
929 | 0 | D_ASSERT(updates.ColumnCount() == 1); |
930 | 0 | auto ids = FlatVector::GetData<row_t>(row_ids); |
931 | |
|
932 | 0 | auto primary_column_idx = column_path[0]; |
933 | 0 | D_ASSERT(primary_column_idx != COLUMN_IDENTIFIER_ROW_ID); |
934 | 0 | D_ASSERT(primary_column_idx < columns.size()); |
935 | 0 | auto &col_data = GetColumn(primary_column_idx); |
936 | 0 | col_data.UpdateColumn(transaction, column_path, updates.data[0], ids, updates.size(), 1); |
937 | 0 | MergeStatistics(primary_column_idx, *col_data.GetUpdateStatistics()); |
938 | 0 | } |
939 | | |
940 | 0 | unique_ptr<BaseStatistics> RowGroup::GetStatistics(idx_t column_idx) { |
941 | 0 | auto &col_data = GetColumn(column_idx); |
942 | 0 | return col_data.GetStatistics(); |
943 | 0 | } |
944 | | |
945 | 0 | void RowGroup::MergeStatistics(idx_t column_idx, const BaseStatistics &other) { |
946 | 0 | auto &col_data = GetColumn(column_idx); |
947 | 0 | col_data.MergeStatistics(other); |
948 | 0 | } |
949 | | |
950 | 2.65k | void RowGroup::MergeIntoStatistics(idx_t column_idx, BaseStatistics &other) { |
951 | 2.65k | auto &col_data = GetColumn(column_idx); |
952 | 2.65k | col_data.MergeIntoStatistics(other); |
953 | 2.65k | } |
954 | | |
955 | 201 | void RowGroup::MergeIntoStatistics(TableStatistics &other) { |
956 | 201 | auto stats_lock = other.GetLock(); |
957 | 2.85k | for (idx_t i = 0; i < columns.size(); i++) { |
958 | 2.65k | MergeIntoStatistics(i, other.GetStats(*stats_lock, i).Statistics()); |
959 | 2.65k | } |
960 | 201 | } |
961 | | |
962 | 0 | CompressionType ColumnCheckpointInfo::GetCompressionType() { |
963 | 0 | return info.compression_types[column_idx]; |
964 | 0 | } |
965 | | |
966 | 0 | RowGroupWriteData RowGroup::WriteToDisk(RowGroupWriteInfo &info) { |
967 | 0 | RowGroupWriteData result; |
968 | 0 | result.states.reserve(columns.size()); |
969 | 0 | result.statistics.reserve(columns.size()); |
970 | | |
971 | | // Checkpoint the individual columns of the row group |
972 | | // Here we're iterating over columns. Each column can have multiple segments. |
973 | | // (Some columns will be wider than others, and require different numbers |
974 | | // of blocks to encode.) Segments cannot span blocks. |
975 | | // |
976 | | // Some of these columns are composite (list, struct). The data is written |
977 | | // first sequentially, and the pointers are written later, so that the |
978 | | // pointers all end up densely packed, and thus more cache-friendly. |
979 | 0 | for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) { |
980 | 0 | auto &column = GetColumn(column_idx); |
981 | 0 | ColumnCheckpointInfo checkpoint_info(info, column_idx); |
982 | 0 | auto checkpoint_state = column.Checkpoint(*this, checkpoint_info); |
983 | 0 | D_ASSERT(checkpoint_state); |
984 | |
|
985 | 0 | auto stats = checkpoint_state->GetStatistics(); |
986 | 0 | D_ASSERT(stats); |
987 | |
|
988 | 0 | result.statistics.push_back(stats->Copy()); |
989 | 0 | result.states.push_back(std::move(checkpoint_state)); |
990 | 0 | } |
991 | 0 | D_ASSERT(result.states.size() == result.statistics.size()); |
992 | 0 | return result; |
993 | 0 | } |
994 | | |
995 | 0 | idx_t RowGroup::GetCommittedRowCount() { |
996 | 0 | auto vinfo = GetVersionInfo(); |
997 | 0 | if (!vinfo) { |
998 | 0 | return count; |
999 | 0 | } |
1000 | 0 | return count - vinfo->GetCommittedDeletedCount(count); |
1001 | 0 | } |
1002 | | |
1003 | 976 | bool RowGroup::HasUnloadedDeletes() const { |
1004 | 976 | if (deletes_pointers.empty()) { |
1005 | | // no stored deletes at all |
1006 | 976 | return false; |
1007 | 976 | } |
1008 | | // return whether or not the deletes have been loaded |
1009 | 0 | return !deletes_is_loaded; |
1010 | 976 | } |
1011 | | |
1012 | 0 | RowGroupWriteData RowGroup::WriteToDisk(RowGroupWriter &writer) { |
1013 | 0 | vector<CompressionType> compression_types; |
1014 | 0 | compression_types.reserve(columns.size()); |
1015 | |
|
1016 | 0 | for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) { |
1017 | 0 | auto &column = GetColumn(column_idx); |
1018 | 0 | if (column.count != this->count) { |
1019 | 0 | throw InternalException("Corrupted in-memory column - column with index %llu has misaligned count (row " |
1020 | 0 | "group has %llu rows, column has %llu)", |
1021 | 0 | column_idx, this->count.load(), column.count.load()); |
1022 | 0 | } |
1023 | 0 | auto compression_type = writer.GetColumnCompressionType(column_idx); |
1024 | 0 | compression_types.push_back(compression_type); |
1025 | 0 | } |
1026 | | |
1027 | 0 | RowGroupWriteInfo info(writer.GetPartialBlockManager(), compression_types, writer.GetCheckpointType()); |
1028 | 0 | return WriteToDisk(info); |
1029 | 0 | } |
1030 | | |
1031 | | RowGroupPointer RowGroup::Checkpoint(RowGroupWriteData write_data, RowGroupWriter &writer, |
1032 | 0 | TableStatistics &global_stats) { |
1033 | 0 | RowGroupPointer row_group_pointer; |
1034 | |
|
1035 | 0 | auto lock = global_stats.GetLock(); |
1036 | 0 | for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) { |
1037 | 0 | global_stats.GetStats(*lock, column_idx).Statistics().Merge(write_data.statistics[column_idx]); |
1038 | 0 | } |
1039 | | |
1040 | | // construct the row group pointer and write the column meta data to disk |
1041 | 0 | D_ASSERT(write_data.states.size() == columns.size()); |
1042 | 0 | row_group_pointer.row_start = start; |
1043 | 0 | row_group_pointer.tuple_count = count; |
1044 | 0 | for (auto &state : write_data.states) { |
1045 | | // get the current position of the table data writer |
1046 | 0 | auto &data_writer = writer.GetPayloadWriter(); |
1047 | 0 | auto pointer = data_writer.GetMetaBlockPointer(); |
1048 | | |
1049 | | // store the stats and the data pointers in the row group pointers |
1050 | 0 | row_group_pointer.data_pointers.push_back(pointer); |
1051 | | |
1052 | | // Write pointers to the column segments. |
1053 | | // |
1054 | | // Just as above, the state can refer to many other states, so this |
1055 | | // can cascade recursively into more pointer writes. |
1056 | 0 | auto persistent_data = state->ToPersistentData(); |
1057 | 0 | BinarySerializer serializer(data_writer); |
1058 | 0 | serializer.Begin(); |
1059 | 0 | persistent_data.Serialize(serializer); |
1060 | 0 | serializer.End(); |
1061 | 0 | } |
1062 | 0 | row_group_pointer.deletes_pointers = CheckpointDeletes(writer.GetPayloadWriter().GetManager()); |
1063 | 0 | Verify(); |
1064 | 0 | return row_group_pointer; |
1065 | 0 | } |
1066 | | |
1067 | 0 | bool RowGroup::IsPersistent() const { |
1068 | 0 | for (auto &column : columns) { |
1069 | 0 | if (!column->IsPersistent()) { |
1070 | | // column is not persistent |
1071 | 0 | return false; |
1072 | 0 | } |
1073 | 0 | } |
1074 | 0 | return true; |
1075 | 0 | } |
1076 | | |
1077 | 0 | PersistentRowGroupData RowGroup::SerializeRowGroupInfo() const { |
1078 | | // all columns are persistent - serialize |
1079 | 0 | PersistentRowGroupData result; |
1080 | 0 | for (auto &col : columns) { |
1081 | 0 | result.column_data.push_back(col->Serialize()); |
1082 | 0 | } |
1083 | 0 | result.start = start; |
1084 | 0 | result.count = count; |
1085 | 0 | return result; |
1086 | 0 | } |
1087 | | |
1088 | 0 | vector<MetaBlockPointer> RowGroup::CheckpointDeletes(MetadataManager &manager) { |
1089 | 0 | if (HasUnloadedDeletes()) { |
1090 | | // deletes were not loaded so they cannot be changed |
1091 | | // re-use them as-is |
1092 | 0 | manager.ClearModifiedBlocks(deletes_pointers); |
1093 | 0 | return deletes_pointers; |
1094 | 0 | } |
1095 | 0 | auto vinfo = GetVersionInfo(); |
1096 | 0 | if (!vinfo) { |
1097 | | // no version information: write nothing |
1098 | 0 | return vector<MetaBlockPointer>(); |
1099 | 0 | } |
1100 | 0 | return vinfo->Checkpoint(manager); |
1101 | 0 | } |
1102 | | |
1103 | 0 | void RowGroup::Serialize(RowGroupPointer &pointer, Serializer &serializer) { |
1104 | 0 | serializer.WriteProperty(100, "row_start", pointer.row_start); |
1105 | 0 | serializer.WriteProperty(101, "tuple_count", pointer.tuple_count); |
1106 | 0 | serializer.WriteProperty(102, "data_pointers", pointer.data_pointers); |
1107 | 0 | serializer.WriteProperty(103, "delete_pointers", pointer.deletes_pointers); |
1108 | 0 | } |
1109 | | |
1110 | 0 | RowGroupPointer RowGroup::Deserialize(Deserializer &deserializer) { |
1111 | 0 | RowGroupPointer result; |
1112 | 0 | result.row_start = deserializer.ReadProperty<uint64_t>(100, "row_start"); |
1113 | 0 | result.tuple_count = deserializer.ReadProperty<uint64_t>(101, "tuple_count"); |
1114 | 0 | result.data_pointers = deserializer.ReadProperty<vector<MetaBlockPointer>>(102, "data_pointers"); |
1115 | 0 | result.deletes_pointers = deserializer.ReadProperty<vector<MetaBlockPointer>>(103, "delete_pointers"); |
1116 | 0 | return result; |
1117 | 0 | } |
1118 | | |
1119 | | //===--------------------------------------------------------------------===// |
1120 | | // GetPartitionStats |
1121 | | //===--------------------------------------------------------------------===// |
1122 | 0 | PartitionStatistics RowGroup::GetPartitionStats() const { |
1123 | 0 | PartitionStatistics result; |
1124 | 0 | result.row_start = start; |
1125 | 0 | result.count = count; |
1126 | 0 | if (HasUnloadedDeletes() || version_info.load().get()) { |
1127 | | // we have version info - approx count |
1128 | 0 | result.count_type = CountType::COUNT_APPROXIMATE; |
1129 | 0 | } else { |
1130 | 0 | result.count_type = CountType::COUNT_EXACT; |
1131 | 0 | } |
1132 | 0 | return result; |
1133 | 0 | } |
1134 | | |
1135 | | //===--------------------------------------------------------------------===// |
1136 | | // GetColumnSegmentInfo |
1137 | | //===--------------------------------------------------------------------===// |
1138 | 0 | void RowGroup::GetColumnSegmentInfo(idx_t row_group_index, vector<ColumnSegmentInfo> &result) { |
1139 | 0 | for (idx_t col_idx = 0; col_idx < GetColumnCount(); col_idx++) { |
1140 | 0 | auto &col_data = GetColumn(col_idx); |
1141 | 0 | col_data.GetColumnSegmentInfo(row_group_index, {col_idx}, result); |
1142 | 0 | } |
1143 | 0 | } |
1144 | | |
1145 | | //===--------------------------------------------------------------------===// |
1146 | | // Version Delete Information |
1147 | | //===--------------------------------------------------------------------===// |
1148 | | class VersionDeleteState { |
1149 | | public: |
1150 | | VersionDeleteState(RowGroup &info, TransactionData transaction, DataTable &table, idx_t base_row) |
1151 | 0 | : info(info), transaction(transaction), table(table), current_chunk(DConstants::INVALID_INDEX), count(0), |
1152 | 0 | base_row(base_row), delete_count(0) { |
1153 | 0 | } |
1154 | | |
1155 | | RowGroup &info; |
1156 | | TransactionData transaction; |
1157 | | DataTable &table; |
1158 | | idx_t current_chunk; |
1159 | | row_t rows[STANDARD_VECTOR_SIZE]; |
1160 | | idx_t count; |
1161 | | idx_t base_row; |
1162 | | idx_t chunk_row; |
1163 | | idx_t delete_count; |
1164 | | |
1165 | | public: |
1166 | | void Delete(row_t row_id); |
1167 | | void Flush(); |
1168 | | }; |
1169 | | |
1170 | 0 | idx_t RowGroup::Delete(TransactionData transaction, DataTable &table, row_t *ids, idx_t count) { |
1171 | 0 | VersionDeleteState del_state(*this, transaction, table, this->start); |
1172 | | |
1173 | | // obtain a write lock |
1174 | 0 | for (idx_t i = 0; i < count; i++) { |
1175 | 0 | D_ASSERT(ids[i] >= 0); |
1176 | 0 | D_ASSERT(idx_t(ids[i]) >= this->start && idx_t(ids[i]) < this->start + this->count); |
1177 | 0 | del_state.Delete(ids[i] - UnsafeNumericCast<row_t>(this->start)); |
1178 | 0 | } |
1179 | 0 | del_state.Flush(); |
1180 | 0 | return del_state.delete_count; |
1181 | 0 | } |
1182 | | |
1183 | 201 | void RowGroup::Verify() { |
1184 | | #ifdef DEBUG |
1185 | | for (auto &column : GetColumns()) { |
1186 | | column->Verify(*this); |
1187 | | } |
1188 | | #endif |
1189 | 201 | } |
1190 | | |
1191 | 0 | idx_t RowGroup::DeleteRows(idx_t vector_idx, transaction_t transaction_id, row_t rows[], idx_t count) { |
1192 | 0 | return GetOrCreateVersionInfo().DeleteRows(vector_idx, transaction_id, rows, count); |
1193 | 0 | } |
1194 | | |
1195 | 0 | void VersionDeleteState::Delete(row_t row_id) { |
1196 | 0 | D_ASSERT(row_id >= 0); |
1197 | 0 | idx_t vector_idx = UnsafeNumericCast<idx_t>(row_id) / STANDARD_VECTOR_SIZE; |
1198 | 0 | idx_t idx_in_vector = UnsafeNumericCast<idx_t>(row_id) - vector_idx * STANDARD_VECTOR_SIZE; |
1199 | 0 | if (current_chunk != vector_idx) { |
1200 | 0 | Flush(); |
1201 | |
|
1202 | 0 | current_chunk = vector_idx; |
1203 | 0 | chunk_row = vector_idx * STANDARD_VECTOR_SIZE; |
1204 | 0 | } |
1205 | 0 | rows[count++] = UnsafeNumericCast<row_t>(idx_in_vector); |
1206 | 0 | } |
1207 | | |
1208 | 0 | void VersionDeleteState::Flush() { |
1209 | 0 | if (count == 0) { |
1210 | 0 | return; |
1211 | 0 | } |
1212 | | // it is possible for delete statements to delete the same tuple multiple times when combined with a USING clause |
1213 | | // in the current_info->Delete, we check which tuples are actually deleted (excluding duplicate deletions) |
1214 | | // this is returned in the actual_delete_count |
1215 | 0 | auto actual_delete_count = info.DeleteRows(current_chunk, transaction.transaction_id, rows, count); |
1216 | 0 | delete_count += actual_delete_count; |
1217 | 0 | if (transaction.transaction && actual_delete_count > 0) { |
1218 | | // now push the delete into the undo buffer, but only if any deletes were actually performed |
1219 | 0 | transaction.transaction->PushDelete(table, info.GetOrCreateVersionInfo(), current_chunk, rows, |
1220 | 0 | actual_delete_count, base_row + chunk_row); |
1221 | 0 | } |
1222 | 0 | count = 0; |
1223 | 0 | } |
1224 | | |
1225 | | } // namespace duckdb |