Coverage Report

Created: 2025-06-24 07:53

/src/duckdb/src/storage/table/row_group.cpp
Line
Count
Source (jump to first uncovered line)
1
#include "duckdb/storage/table/row_group.hpp"
2
#include "duckdb/common/types/vector.hpp"
3
#include "duckdb/common/exception.hpp"
4
#include "duckdb/storage/table/column_data.hpp"
5
#include "duckdb/storage/table/column_checkpoint_state.hpp"
6
#include "duckdb/storage/table/update_segment.hpp"
7
#include "duckdb/storage/table_storage_info.hpp"
8
#include "duckdb/planner/table_filter.hpp"
9
#include "duckdb/execution/expression_executor.hpp"
10
#include "duckdb/storage/checkpoint/table_data_writer.hpp"
11
#include "duckdb/storage/metadata/metadata_reader.hpp"
12
#include "duckdb/transaction/duck_transaction_manager.hpp"
13
#include "duckdb/main/database.hpp"
14
#include "duckdb/main/attached_database.hpp"
15
#include "duckdb/transaction/duck_transaction.hpp"
16
#include "duckdb/storage/table/append_state.hpp"
17
#include "duckdb/storage/table/scan_state.hpp"
18
#include "duckdb/storage/table/row_version_manager.hpp"
19
#include "duckdb/common/serializer/serializer.hpp"
20
#include "duckdb/common/serializer/deserializer.hpp"
21
#include "duckdb/common/serializer/binary_serializer.hpp"
22
#include "duckdb/planner/filter/conjunction_filter.hpp"
23
#include "duckdb/planner/filter/struct_filter.hpp"
24
#include "duckdb/planner/filter/optional_filter.hpp"
25
#include "duckdb/execution/adaptive_filter.hpp"
26
27
namespace duckdb {
28
29
RowGroup::RowGroup(RowGroupCollection &collection_p, idx_t start, idx_t count)
30
201
    : SegmentBase<RowGroup>(start, count), collection(collection_p), version_info(nullptr), allocation_size(0) {
31
201
  Verify();
32
201
}
33
34
RowGroup::RowGroup(RowGroupCollection &collection_p, RowGroupPointer pointer)
35
0
    : SegmentBase<RowGroup>(pointer.row_start, pointer.tuple_count), collection(collection_p), version_info(nullptr),
36
0
      allocation_size(0) {
37
  // deserialize the columns
38
0
  if (pointer.data_pointers.size() != collection_p.GetTypes().size()) {
39
0
    throw IOException("Row group column count is unaligned with table column count. Corrupt file?");
40
0
  }
41
0
  this->column_pointers = std::move(pointer.data_pointers);
42
0
  this->columns.resize(column_pointers.size());
43
0
  this->is_loaded = unique_ptr<atomic<bool>[]>(new atomic<bool>[columns.size()]);
44
0
  for (idx_t c = 0; c < columns.size(); c++) {
45
0
    this->is_loaded[c] = false;
46
0
  }
47
0
  this->deletes_pointers = std::move(pointer.deletes_pointers);
48
0
  this->deletes_is_loaded = false;
49
50
0
  Verify();
51
0
}
52
53
RowGroup::RowGroup(RowGroupCollection &collection_p, PersistentRowGroupData &data)
54
0
    : SegmentBase<RowGroup>(data.start, data.count), collection(collection_p), version_info(nullptr),
55
0
      allocation_size(0) {
56
0
  auto &block_manager = GetBlockManager();
57
0
  auto &info = GetTableInfo();
58
0
  auto &types = collection.get().GetTypes();
59
0
  columns.reserve(types.size());
60
0
  for (idx_t c = 0; c < types.size(); c++) {
61
0
    auto entry = ColumnData::CreateColumn(block_manager, info, c, data.start, types[c], nullptr);
62
0
    entry->InitializeColumn(data.column_data[c]);
63
0
    columns.push_back(std::move(entry));
64
0
  }
65
66
0
  Verify();
67
0
}
68
69
192
void RowGroup::MoveToCollection(RowGroupCollection &collection_p, idx_t new_start) {
70
192
  this->collection = collection_p;
71
192
  this->start = new_start;
72
2.61k
  for (auto &column : GetColumns()) {
73
2.61k
    column->SetStart(new_start);
74
2.61k
  }
75
192
  if (!HasUnloadedDeletes()) {
76
192
    auto vinfo = GetVersionInfo();
77
192
    if (vinfo) {
78
192
      vinfo->SetStart(new_start);
79
192
    }
80
192
  }
81
192
}
82
83
201
RowGroup::~RowGroup() {
84
201
}
85
86
192
vector<shared_ptr<ColumnData>> &RowGroup::GetColumns() {
87
  // ensure all columns are loaded
88
2.80k
  for (idx_t c = 0; c < GetColumnCount(); c++) {
89
2.61k
    GetColumn(c);
90
2.61k
  }
91
192
  return columns;
92
192
}
93
94
8.92k
idx_t RowGroup::GetColumnCount() const {
95
8.92k
  return columns.size();
96
8.92k
}
97
98
201
idx_t RowGroup::GetRowGroupSize() const {
99
201
  return collection.get().GetRowGroupSize();
100
201
}
101
102
54
ColumnData &RowGroup::GetColumn(const StorageIndex &c) {
103
54
  return GetColumn(c.GetPrimaryIndex());
104
54
}
105
106
10.6k
ColumnData &RowGroup::GetColumn(storage_t c) {
107
10.6k
  D_ASSERT(c < columns.size());
108
10.6k
  if (!is_loaded) {
109
    // not being lazy loaded
110
10.6k
    D_ASSERT(columns[c]);
111
10.6k
    return *columns[c];
112
10.6k
  }
113
0
  if (is_loaded[c]) {
114
0
    D_ASSERT(columns[c]);
115
0
    return *columns[c];
116
0
  }
117
0
  lock_guard<mutex> l(row_group_lock);
118
0
  if (columns[c]) {
119
0
    D_ASSERT(is_loaded[c]);
120
0
    return *columns[c];
121
0
  }
122
0
  if (column_pointers.size() != columns.size()) {
123
0
    throw InternalException("Lazy loading a column but the pointer was not set");
124
0
  }
125
0
  auto &metadata_manager = GetCollection().GetMetadataManager();
126
0
  auto &types = GetCollection().GetTypes();
127
0
  auto &block_pointer = column_pointers[c];
128
0
  MetadataReader column_data_reader(metadata_manager, block_pointer);
129
0
  this->columns[c] =
130
0
      ColumnData::Deserialize(GetBlockManager(), GetTableInfo(), c, start, column_data_reader, types[c]);
131
0
  is_loaded[c] = true;
132
0
  if (this->columns[c]->count != this->count) {
133
0
    throw InternalException("Corrupted database - loaded column with index %llu at row start %llu, count %llu did "
134
0
                            "not match count of row group %llu",
135
0
                            c, start, this->columns[c]->count.load(), this->count.load());
136
0
  }
137
0
  return *columns[c];
138
0
}
139
140
2.65k
BlockManager &RowGroup::GetBlockManager() {
141
2.65k
  return GetCollection().GetBlockManager();
142
2.65k
}
143
2.65k
DataTableInfo &RowGroup::GetTableInfo() {
144
2.65k
  return GetCollection().GetTableInfo();
145
2.65k
}
146
147
201
void RowGroup::InitializeEmpty(const vector<LogicalType> &types) {
148
  // set up the segment trees for the column segments
149
201
  D_ASSERT(columns.empty());
150
2.85k
  for (idx_t i = 0; i < types.size(); i++) {
151
2.65k
    auto column_data = ColumnData::CreateColumn(GetBlockManager(), GetTableInfo(), i, start, types[i]);
152
2.65k
    columns.push_back(std::move(column_data));
153
2.65k
  }
154
201
}
155
156
void ColumnScanState::Initialize(const LogicalType &type, const vector<StorageIndex> &children,
157
27
                                 optional_ptr<TableScanOptions> options) {
158
  // Register the options in the state
159
27
  scan_options = options;
160
161
27
  if (type.id() == LogicalTypeId::VALIDITY) {
162
    // validity - nothing to initialize
163
0
    return;
164
0
  }
165
27
  if (type.InternalType() == PhysicalType::STRUCT) {
166
    // validity + struct children
167
0
    auto &struct_children = StructType::GetChildTypes(type);
168
0
    child_states.resize(struct_children.size() + 1);
169
170
0
    if (children.empty()) {
171
      // scan all struct children
172
0
      scan_child_column.resize(struct_children.size(), true);
173
0
      for (idx_t i = 0; i < struct_children.size(); i++) {
174
0
        child_states[i + 1].Initialize(struct_children[i].second, options);
175
0
      }
176
0
    } else {
177
      // only scan the specified subset of columns
178
0
      scan_child_column.resize(struct_children.size(), false);
179
0
      for (idx_t i = 0; i < children.size(); i++) {
180
0
        auto &child = children[i];
181
0
        auto index = child.GetPrimaryIndex();
182
0
        auto &child_indexes = child.GetChildIndexes();
183
0
        scan_child_column[index] = true;
184
0
        child_states[index + 1].Initialize(struct_children[index].second, child_indexes, options);
185
0
      }
186
0
    }
187
0
    child_states[0].scan_options = options;
188
27
  } else if (type.InternalType() == PhysicalType::LIST) {
189
    // validity + list child
190
0
    child_states.resize(2);
191
0
    child_states[1].Initialize(ListType::GetChildType(type), options);
192
0
    child_states[0].scan_options = options;
193
27
  } else if (type.InternalType() == PhysicalType::ARRAY) {
194
    // validity + array child
195
0
    child_states.resize(2);
196
0
    child_states[0].scan_options = options;
197
0
    child_states[1].Initialize(ArrayType::GetChildType(type), options);
198
27
  } else {
199
    // validity
200
27
    child_states.resize(1);
201
27
    child_states[0].scan_options = options;
202
27
  }
203
27
}
204
205
0
void ColumnScanState::Initialize(const LogicalType &type, optional_ptr<TableScanOptions> options) {
206
0
  vector<StorageIndex> children;
207
0
  Initialize(type, children, options);
208
0
}
209
210
7
void CollectionScanState::Initialize(const vector<LogicalType> &types) {
211
7
  auto &column_ids = GetColumnIds();
212
7
  column_scans = make_unsafe_uniq_array<ColumnScanState>(column_ids.size());
213
34
  for (idx_t i = 0; i < column_ids.size(); i++) {
214
27
    if (column_ids[i].IsRowIdColumn()) {
215
0
      continue;
216
0
    }
217
27
    auto col_id = column_ids[i].GetPrimaryIndex();
218
27
    column_scans[i].Initialize(types[col_id], column_ids[i].GetChildIndexes(), &GetOptions());
219
27
  }
220
7
}
221
222
0
bool RowGroup::InitializeScanWithOffset(CollectionScanState &state, idx_t vector_offset) {
223
0
  auto &column_ids = state.GetColumnIds();
224
0
  auto &filters = state.GetFilterInfo();
225
0
  if (!CheckZonemap(filters)) {
226
0
    return false;
227
0
  }
228
229
0
  state.row_group = this;
230
0
  state.vector_index = vector_offset;
231
0
  state.max_row_group_row =
232
0
      this->start > state.max_row ? 0 : MinValue<idx_t>(this->count, state.max_row - this->start);
233
0
  auto row_number = start + vector_offset * STANDARD_VECTOR_SIZE;
234
0
  if (state.max_row_group_row == 0) {
235
    // exceeded row groups to scan
236
0
    return false;
237
0
  }
238
0
  D_ASSERT(state.column_scans);
239
0
  for (idx_t i = 0; i < column_ids.size(); i++) {
240
0
    const auto &column = column_ids[i];
241
0
    if (!column.IsRowIdColumn()) {
242
0
      auto &column_data = GetColumn(column);
243
0
      column_data.InitializeScanWithOffset(state.column_scans[i], row_number);
244
0
      state.column_scans[i].scan_options = &state.GetOptions();
245
0
    } else {
246
0
      state.column_scans[i].current = nullptr;
247
0
    }
248
0
  }
249
0
  return true;
250
0
}
251
252
7
bool RowGroup::InitializeScan(CollectionScanState &state) {
253
7
  auto &column_ids = state.GetColumnIds();
254
7
  auto &filters = state.GetFilterInfo();
255
7
  if (!CheckZonemap(filters)) {
256
0
    return false;
257
0
  }
258
7
  state.row_group = this;
259
7
  state.vector_index = 0;
260
7
  state.max_row_group_row =
261
7
      this->start > state.max_row ? 0 : MinValue<idx_t>(this->count, state.max_row - this->start);
262
7
  if (state.max_row_group_row == 0) {
263
0
    return false;
264
0
  }
265
7
  D_ASSERT(state.column_scans);
266
34
  for (idx_t i = 0; i < column_ids.size(); i++) {
267
27
    auto column = column_ids[i];
268
27
    if (!column.IsRowIdColumn()) {
269
27
      auto &column_data = GetColumn(column);
270
27
      column_data.InitializeScan(state.column_scans[i]);
271
27
      state.column_scans[i].scan_options = &state.GetOptions();
272
27
    } else {
273
0
      state.column_scans[i].current = nullptr;
274
0
    }
275
27
  }
276
7
  return true;
277
7
}
278
279
unique_ptr<RowGroup> RowGroup::AlterType(RowGroupCollection &new_collection, const LogicalType &target_type,
280
                                         idx_t changed_idx, ExpressionExecutor &executor,
281
0
                                         CollectionScanState &scan_state, DataChunk &scan_chunk) {
282
0
  Verify();
283
284
  // construct a new column data for this type
285
0
  auto column_data = ColumnData::CreateColumn(GetBlockManager(), GetTableInfo(), changed_idx, start, target_type);
286
287
0
  ColumnAppendState append_state;
288
0
  column_data->InitializeAppend(append_state);
289
290
  // scan the original table, and fill the new column with the transformed value
291
0
  scan_state.Initialize(GetCollection().GetTypes());
292
0
  InitializeScan(scan_state);
293
294
0
  DataChunk append_chunk;
295
0
  vector<LogicalType> append_types;
296
0
  append_types.push_back(target_type);
297
0
  append_chunk.Initialize(Allocator::DefaultAllocator(), append_types);
298
0
  auto &append_vector = append_chunk.data[0];
299
0
  while (true) {
300
    // scan the table
301
0
    scan_chunk.Reset();
302
0
    ScanCommitted(scan_state, scan_chunk, TableScanType::TABLE_SCAN_COMMITTED_ROWS);
303
0
    if (scan_chunk.size() == 0) {
304
0
      break;
305
0
    }
306
    // execute the expression
307
0
    append_chunk.Reset();
308
0
    executor.ExecuteExpression(scan_chunk, append_vector);
309
0
    column_data->Append(append_state, append_vector, scan_chunk.size());
310
0
  }
311
312
  // set up the row_group based on this row_group
313
0
  auto row_group = make_uniq<RowGroup>(new_collection, this->start, this->count);
314
0
  row_group->SetVersionInfo(GetOrCreateVersionInfoPtr());
315
0
  auto &cols = GetColumns();
316
0
  for (idx_t i = 0; i < cols.size(); i++) {
317
0
    if (i == changed_idx) {
318
      // this is the altered column: use the new column
319
0
      row_group->columns.push_back(std::move(column_data));
320
0
      column_data.reset();
321
0
    } else {
322
      // this column was not altered: use the data directly
323
0
      row_group->columns.push_back(cols[i]);
324
0
    }
325
0
  }
326
0
  row_group->Verify();
327
0
  return row_group;
328
0
}
329
330
unique_ptr<RowGroup> RowGroup::AddColumn(RowGroupCollection &new_collection, ColumnDefinition &new_column,
331
0
                                         ExpressionExecutor &executor, Vector &result) {
332
0
  Verify();
333
334
  // construct a new column data for the new column
335
0
  auto added_column =
336
0
      ColumnData::CreateColumn(GetBlockManager(), GetTableInfo(), GetColumnCount(), start, new_column.Type());
337
338
0
  idx_t rows_to_write = this->count;
339
0
  if (rows_to_write > 0) {
340
0
    DataChunk dummy_chunk;
341
342
0
    ColumnAppendState state;
343
0
    added_column->InitializeAppend(state);
344
0
    for (idx_t i = 0; i < rows_to_write; i += STANDARD_VECTOR_SIZE) {
345
0
      idx_t rows_in_this_vector = MinValue<idx_t>(rows_to_write - i, STANDARD_VECTOR_SIZE);
346
0
      dummy_chunk.SetCardinality(rows_in_this_vector);
347
0
      executor.ExecuteExpression(dummy_chunk, result);
348
0
      added_column->Append(state, result, rows_in_this_vector);
349
0
    }
350
0
  }
351
352
  // set up the row_group based on this row_group
353
0
  auto row_group = make_uniq<RowGroup>(new_collection, this->start, this->count);
354
0
  row_group->SetVersionInfo(GetOrCreateVersionInfoPtr());
355
0
  row_group->columns = GetColumns();
356
  // now add the new column
357
0
  row_group->columns.push_back(std::move(added_column));
358
359
0
  row_group->Verify();
360
0
  return row_group;
361
0
}
362
363
0
unique_ptr<RowGroup> RowGroup::RemoveColumn(RowGroupCollection &new_collection, idx_t removed_column) {
364
0
  Verify();
365
366
0
  D_ASSERT(removed_column < columns.size());
367
368
0
  auto row_group = make_uniq<RowGroup>(new_collection, this->start, this->count);
369
0
  row_group->SetVersionInfo(GetOrCreateVersionInfoPtr());
370
  // copy over all columns except for the removed one
371
0
  auto &cols = GetColumns();
372
0
  for (idx_t i = 0; i < cols.size(); i++) {
373
0
    if (i != removed_column) {
374
0
      row_group->columns.push_back(cols[i]);
375
0
    }
376
0
  }
377
378
0
  row_group->Verify();
379
0
  return row_group;
380
0
}
381
382
2
void RowGroup::CommitDrop() {
383
11
  for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) {
384
9
    CommitDropColumn(column_idx);
385
9
  }
386
2
}
387
388
9
void RowGroup::CommitDropColumn(const idx_t column_index) {
389
9
  auto &column = GetColumn(column_index);
390
9
  column.CommitDropColumn();
391
9
}
392
393
0
void RowGroup::NextVector(CollectionScanState &state) {
394
0
  state.vector_index++;
395
0
  const auto &column_ids = state.GetColumnIds();
396
0
  for (idx_t i = 0; i < column_ids.size(); i++) {
397
0
    const auto &column = column_ids[i];
398
0
    if (column.IsRowIdColumn()) {
399
0
      continue;
400
0
    }
401
0
    GetColumn(column).Skip(state.column_scans[i]);
402
0
  }
403
0
}
404
405
0
FilterPropagateResult RowGroup::CheckRowIdFilter(const TableFilter &filter, idx_t beg_row, idx_t end_row) {
406
  // RowId columns dont have a zonemap, but we can trivially create stats to check the filter against.
407
0
  BaseStatistics dummy_stats = NumericStats::CreateEmpty(LogicalType::ROW_TYPE);
408
0
  NumericStats::SetMin(dummy_stats, UnsafeNumericCast<row_t>(beg_row));
409
0
  NumericStats::SetMax(dummy_stats, UnsafeNumericCast<row_t>(end_row));
410
411
0
  return filter.CheckStatistics(dummy_stats);
412
0
}
413
414
7
bool RowGroup::CheckZonemap(ScanFilterInfo &filters) {
415
7
  auto &filter_list = filters.GetFilterList();
416
  // new row group - label all filters as up for grabs again
417
7
  filters.CheckAllFilters();
418
7
  for (idx_t i = 0; i < filter_list.size(); i++) {
419
0
    auto &entry = filter_list[i];
420
0
    auto &filter = entry.filter;
421
0
    auto base_column_index = entry.table_column_index;
422
423
0
    FilterPropagateResult prune_result;
424
425
0
    if (base_column_index == COLUMN_IDENTIFIER_ROW_ID) {
426
0
      prune_result = CheckRowIdFilter(filter, this->start, this->start + this->count);
427
0
    } else {
428
0
      prune_result = GetColumn(base_column_index).CheckZonemap(filter);
429
0
    }
430
431
0
    if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
432
0
      return false;
433
0
    }
434
0
    if (filter.filter_type == TableFilterType::OPTIONAL_FILTER) {
435
      // these are only for row group checking, set as always true so we don't check it
436
0
      filters.SetFilterAlwaysTrue(i);
437
0
    } else if (prune_result == FilterPropagateResult::FILTER_ALWAYS_TRUE) {
438
      // filter is always true - no need to check it
439
      // label the filter as always true so we don't need to check it anymore
440
0
      filters.SetFilterAlwaysTrue(i);
441
0
    }
442
0
  }
443
7
  return true;
444
7
}
445
446
7
bool RowGroup::CheckZonemapSegments(CollectionScanState &state) {
447
7
  auto &filters = state.GetFilterInfo();
448
7
  for (auto &entry : filters.GetFilterList()) {
449
0
    if (entry.IsAlwaysTrue()) {
450
      // filter is always true - avoid checking
451
0
      continue;
452
0
    }
453
0
    auto column_idx = entry.scan_column_index;
454
0
    auto base_column_idx = entry.table_column_index;
455
0
    auto &filter = entry.filter;
456
457
0
    FilterPropagateResult prune_result;
458
0
    if (base_column_idx == COLUMN_IDENTIFIER_ROW_ID) {
459
0
      prune_result = CheckRowIdFilter(filter, this->start, this->start + this->count);
460
0
    } else {
461
0
      prune_result = GetColumn(base_column_idx).CheckZonemap(state.column_scans[column_idx], filter);
462
0
    }
463
464
0
    if (prune_result != FilterPropagateResult::FILTER_ALWAYS_FALSE) {
465
0
      continue;
466
0
    }
467
468
    // check zone map segment.
469
0
    auto &column_scan_state = state.column_scans[column_idx];
470
0
    auto current_segment = column_scan_state.current;
471
0
    if (!current_segment) {
472
      // no segment to skip
473
0
      continue;
474
0
    }
475
0
    idx_t target_row = current_segment->start + current_segment->count;
476
0
    if (target_row >= state.max_row) {
477
0
      target_row = state.max_row;
478
0
    }
479
480
0
    D_ASSERT(target_row >= this->start);
481
0
    D_ASSERT(target_row <= this->start + this->count);
482
0
    idx_t target_vector_index = (target_row - this->start) / STANDARD_VECTOR_SIZE;
483
0
    if (state.vector_index == target_vector_index) {
484
      // we can't skip any full vectors because this segment contains less than a full vector
485
      // for now we just bail-out
486
      // FIXME: we could check if we can ALSO skip the next segments, in which case skipping a full vector
487
      // might be possible
488
      // we don't care that much though, since a single segment that fits less than a full vector is
489
      // exceedingly rare
490
0
      return true;
491
0
    }
492
0
    while (state.vector_index < target_vector_index) {
493
0
      NextVector(state);
494
0
    }
495
0
    return false;
496
0
  }
497
498
7
  return true;
499
7
}
500
501
template <TableScanType TYPE>
502
14
void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &state, DataChunk &result) {
503
14
  const bool ALLOW_UPDATES = TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES &&
504
14
                             TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED;
505
14
  const auto &column_ids = state.GetColumnIds();
506
14
  auto &filter_info = state.GetFilterInfo();
507
14
  while (true) {
508
14
    if (state.vector_index * STANDARD_VECTOR_SIZE >= state.max_row_group_row) {
509
      // exceeded the amount of rows to scan
510
7
      return;
511
7
    }
512
7
    idx_t current_row = state.vector_index * STANDARD_VECTOR_SIZE;
513
7
    auto max_count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, state.max_row_group_row - current_row);
514
515
    // check the sampling info if we have to sample this chunk
516
7
    if (state.GetSamplingInfo().do_system_sample &&
517
7
        state.random.NextRandom() > state.GetSamplingInfo().sample_rate) {
518
0
      NextVector(state);
519
0
      continue;
520
0
    }
521
522
    //! first check the zonemap if we have to scan this partition
523
7
    if (!CheckZonemapSegments(state)) {
524
0
      continue;
525
0
    }
526
527
    // second, scan the version chunk manager to figure out which tuples to load for this transaction
528
7
    idx_t count;
529
7
    if (TYPE == TableScanType::TABLE_SCAN_REGULAR) {
530
7
      count = state.row_group->GetSelVector(transaction, state.vector_index, state.valid_sel, max_count);
531
7
      if (count == 0) {
532
        // nothing to scan for this vector, skip the entire vector
533
0
        NextVector(state);
534
0
        continue;
535
0
      }
536
7
    } else if (TYPE == TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED) {
537
0
      count = state.row_group->GetCommittedSelVector(transaction.start_time, transaction.transaction_id,
538
0
                                                     state.vector_index, state.valid_sel, max_count);
539
0
      if (count == 0) {
540
        // nothing to scan for this vector, skip the entire vector
541
0
        NextVector(state);
542
0
        continue;
543
0
      }
544
0
    } else {
545
0
      count = max_count;
546
0
    }
547
7
    auto &block_manager = GetBlockManager();
548
7
#ifndef DUCKDB_ALTERNATIVE_VERIFY
549
    // // in regular operation we only prefetch from remote file systems
550
    // // when alternative verify is set, we always prefetch for testing purposes
551
7
    if (block_manager.IsRemote())
552
#else
553
    if (!block_manager.InMemory())
554
#endif
555
0
    {
556
0
      PrefetchState prefetch_state;
557
0
      for (idx_t i = 0; i < column_ids.size(); i++) {
558
0
        const auto &column = column_ids[i];
559
0
        if (!column.IsRowIdColumn()) {
560
0
          GetColumn(column).InitializePrefetch(prefetch_state, state.column_scans[i], max_count);
561
0
        }
562
0
      }
563
0
      auto &buffer_manager = block_manager.buffer_manager;
564
0
      buffer_manager.Prefetch(prefetch_state.blocks);
565
0
    }
566
567
7
    bool has_filters = filter_info.HasFilters();
568
7
    if (count == max_count && !has_filters) {
569
      // scan all vectors completely: full scan without deletions or table filters
570
34
      for (idx_t i = 0; i < column_ids.size(); i++) {
571
27
        const auto &column = column_ids[i];
572
27
        if (column.IsRowIdColumn()) {
573
          // scan row id
574
0
          D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE);
575
0
          result.data[i].Sequence(UnsafeNumericCast<int64_t>(this->start + current_row), 1, count);
576
27
        } else {
577
27
          auto &col_data = GetColumn(column);
578
27
          if (TYPE != TableScanType::TABLE_SCAN_REGULAR) {
579
0
            col_data.ScanCommitted(state.vector_index, state.column_scans[i], result.data[i],
580
0
                                   ALLOW_UPDATES);
581
27
          } else {
582
27
            col_data.Scan(transaction, state.vector_index, state.column_scans[i], result.data[i]);
583
27
          }
584
27
        }
585
27
      }
586
7
    } else {
587
      // partial scan: we have deletions or table filters
588
0
      idx_t approved_tuple_count = count;
589
0
      SelectionVector sel;
590
0
      if (count != max_count) {
591
0
        sel.Initialize(state.valid_sel);
592
0
      } else {
593
0
        sel.Initialize(nullptr);
594
0
      }
595
      //! first, we scan the columns with filters, fetch their data and generate a selection vector.
596
      //! get runtime statistics
597
0
      auto adaptive_filter = filter_info.GetAdaptiveFilter();
598
0
      auto filter_state = filter_info.BeginFilter();
599
0
      if (has_filters) {
600
0
        D_ASSERT(ALLOW_UPDATES);
601
0
        auto &filter_list = filter_info.GetFilterList();
602
0
        for (idx_t i = 0; i < filter_list.size(); i++) {
603
0
          auto filter_idx = adaptive_filter->permutation[i];
604
0
          auto &filter = filter_list[filter_idx];
605
0
          if (filter.IsAlwaysTrue()) {
606
            // this filter is always true - skip it
607
0
            continue;
608
0
          }
609
0
          auto &table_filter_state = *filter.filter_state;
610
611
0
          const auto scan_idx = filter.scan_column_index;
612
0
          const auto column_idx = filter.table_column_index;
613
614
0
          auto &result_vector = result.data[scan_idx];
615
0
          if (column_idx == COLUMN_IDENTIFIER_ROW_ID) {
616
            // We do another quick statistics scan for row ids here
617
0
            const auto rowid_start = this->start + current_row;
618
0
            const auto rowid_end = this->start + current_row + max_count;
619
0
            const auto prune_result = CheckRowIdFilter(filter.filter, rowid_start, rowid_end);
620
0
            if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
621
              // We can just break out of the loop here.
622
0
              approved_tuple_count = 0;
623
0
              continue;
624
0
            }
625
626
            // Generate row ids
627
            // Create sequence for row ids
628
0
            D_ASSERT(result_vector.GetType().InternalType() == ROW_TYPE);
629
0
            result_vector.SetVectorType(VectorType::FLAT_VECTOR);
630
0
            auto result_data = FlatVector::GetData<int64_t>(result_vector);
631
0
            for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) {
632
0
              result_data[sel.get_index(sel_idx)] =
633
0
                  UnsafeNumericCast<int64_t>(this->start + current_row + sel.get_index(sel_idx));
634
0
            }
635
636
            // Was this filter always true? If so, we dont need to apply it
637
0
            if (prune_result == FilterPropagateResult::FILTER_ALWAYS_TRUE) {
638
0
              continue;
639
0
            }
640
641
            // Now apply the filter
642
0
            UnifiedVectorFormat vdata;
643
0
            result_vector.ToUnifiedFormat(approved_tuple_count, vdata);
644
0
            ColumnSegment::FilterSelection(sel, result_vector, vdata, filter.filter, table_filter_state,
645
0
                                           approved_tuple_count, approved_tuple_count);
646
647
0
          } else {
648
0
            auto &col_data = GetColumn(filter.table_column_index);
649
0
            col_data.Filter(transaction, state.vector_index, state.column_scans[scan_idx], result_vector,
650
0
                            sel, approved_tuple_count, filter.filter, table_filter_state);
651
0
          }
652
0
        }
653
0
        for (auto &table_filter : filter_list) {
654
0
          if (table_filter.IsAlwaysTrue()) {
655
0
            continue;
656
0
          }
657
0
          result.data[table_filter.scan_column_index].Slice(sel, approved_tuple_count);
658
0
        }
659
0
      }
660
0
      if (approved_tuple_count == 0) {
661
        // all rows were filtered out by the table filters
662
0
        D_ASSERT(has_filters);
663
0
        result.Reset();
664
        // skip this vector in all the scans that were not scanned yet
665
0
        for (idx_t i = 0; i < column_ids.size(); i++) {
666
0
          auto &col_idx = column_ids[i];
667
0
          if (col_idx.IsRowIdColumn()) {
668
0
            continue;
669
0
          }
670
0
          if (has_filters && filter_info.ColumnHasFilters(i)) {
671
0
            continue;
672
0
          }
673
0
          auto &col_data = GetColumn(col_idx);
674
0
          col_data.Skip(state.column_scans[i]);
675
0
        }
676
0
        state.vector_index++;
677
0
        continue;
678
0
      }
679
      //! Now we use the selection vector to fetch data for the other columns.
680
0
      for (idx_t i = 0; i < column_ids.size(); i++) {
681
0
        if (has_filters && filter_info.ColumnHasFilters(i)) {
682
          // column has already been scanned as part of the filtering process
683
0
          continue;
684
0
        }
685
0
        auto &column = column_ids[i];
686
0
        if (column.IsRowIdColumn()) {
687
0
          D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE);
688
0
          result.data[i].SetVectorType(VectorType::FLAT_VECTOR);
689
0
          auto result_data = FlatVector::GetData<int64_t>(result.data[i]);
690
0
          for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) {
691
0
            result_data[sel_idx] =
692
0
                UnsafeNumericCast<int64_t>(this->start + current_row + sel.get_index(sel_idx));
693
0
          }
694
0
        } else {
695
0
          auto &col_data = GetColumn(column);
696
0
          if (TYPE == TableScanType::TABLE_SCAN_REGULAR) {
697
0
            col_data.Select(transaction, state.vector_index, state.column_scans[i], result.data[i], sel,
698
0
                            approved_tuple_count);
699
0
          } else {
700
0
            col_data.SelectCommitted(state.vector_index, state.column_scans[i], result.data[i], sel,
701
0
                                     approved_tuple_count, ALLOW_UPDATES);
702
0
          }
703
0
        }
704
0
      }
705
0
      filter_info.EndFilter(filter_state);
706
707
0
      D_ASSERT(approved_tuple_count > 0);
708
0
      count = approved_tuple_count;
709
0
    }
710
7
    result.SetCardinality(count);
711
7
    state.vector_index++;
712
7
    break;
713
7
  }
714
14
}
void duckdb::RowGroup::TemplatedScan<(duckdb::TableScanType)0>(duckdb::TransactionData, duckdb::CollectionScanState&, duckdb::DataChunk&)
Line
Count
Source
502
14
void RowGroup::TemplatedScan(TransactionData transaction, CollectionScanState &state, DataChunk &result) {
503
14
  const bool ALLOW_UPDATES = TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES &&
504
14
                             TYPE != TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED;
505
14
  const auto &column_ids = state.GetColumnIds();
506
14
  auto &filter_info = state.GetFilterInfo();
507
14
  while (true) {
508
14
    if (state.vector_index * STANDARD_VECTOR_SIZE >= state.max_row_group_row) {
509
      // exceeded the amount of rows to scan
510
7
      return;
511
7
    }
512
7
    idx_t current_row = state.vector_index * STANDARD_VECTOR_SIZE;
513
7
    auto max_count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, state.max_row_group_row - current_row);
514
515
    // check the sampling info if we have to sample this chunk
516
7
    if (state.GetSamplingInfo().do_system_sample &&
517
7
        state.random.NextRandom() > state.GetSamplingInfo().sample_rate) {
518
0
      NextVector(state);
519
0
      continue;
520
0
    }
521
522
    //! first check the zonemap if we have to scan this partition
523
7
    if (!CheckZonemapSegments(state)) {
524
0
      continue;
525
0
    }
526
527
    // second, scan the version chunk manager to figure out which tuples to load for this transaction
528
7
    idx_t count;
529
7
    if (TYPE == TableScanType::TABLE_SCAN_REGULAR) {
530
7
      count = state.row_group->GetSelVector(transaction, state.vector_index, state.valid_sel, max_count);
531
7
      if (count == 0) {
532
        // nothing to scan for this vector, skip the entire vector
533
0
        NextVector(state);
534
0
        continue;
535
0
      }
536
7
    } else if (TYPE == TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED) {
537
0
      count = state.row_group->GetCommittedSelVector(transaction.start_time, transaction.transaction_id,
538
0
                                                     state.vector_index, state.valid_sel, max_count);
539
0
      if (count == 0) {
540
        // nothing to scan for this vector, skip the entire vector
541
0
        NextVector(state);
542
0
        continue;
543
0
      }
544
0
    } else {
545
0
      count = max_count;
546
0
    }
547
7
    auto &block_manager = GetBlockManager();
548
7
#ifndef DUCKDB_ALTERNATIVE_VERIFY
549
    // // in regular operation we only prefetch from remote file systems
550
    // // when alternative verify is set, we always prefetch for testing purposes
551
7
    if (block_manager.IsRemote())
552
#else
553
    if (!block_manager.InMemory())
554
#endif
555
0
    {
556
0
      PrefetchState prefetch_state;
557
0
      for (idx_t i = 0; i < column_ids.size(); i++) {
558
0
        const auto &column = column_ids[i];
559
0
        if (!column.IsRowIdColumn()) {
560
0
          GetColumn(column).InitializePrefetch(prefetch_state, state.column_scans[i], max_count);
561
0
        }
562
0
      }
563
0
      auto &buffer_manager = block_manager.buffer_manager;
564
0
      buffer_manager.Prefetch(prefetch_state.blocks);
565
0
    }
566
567
7
    bool has_filters = filter_info.HasFilters();
568
7
    if (count == max_count && !has_filters) {
569
      // scan all vectors completely: full scan without deletions or table filters
570
34
      for (idx_t i = 0; i < column_ids.size(); i++) {
571
27
        const auto &column = column_ids[i];
572
27
        if (column.IsRowIdColumn()) {
573
          // scan row id
574
0
          D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE);
575
0
          result.data[i].Sequence(UnsafeNumericCast<int64_t>(this->start + current_row), 1, count);
576
27
        } else {
577
27
          auto &col_data = GetColumn(column);
578
27
          if (TYPE != TableScanType::TABLE_SCAN_REGULAR) {
579
0
            col_data.ScanCommitted(state.vector_index, state.column_scans[i], result.data[i],
580
0
                                   ALLOW_UPDATES);
581
27
          } else {
582
27
            col_data.Scan(transaction, state.vector_index, state.column_scans[i], result.data[i]);
583
27
          }
584
27
        }
585
27
      }
586
7
    } else {
587
      // partial scan: we have deletions or table filters
588
0
      idx_t approved_tuple_count = count;
589
0
      SelectionVector sel;
590
0
      if (count != max_count) {
591
0
        sel.Initialize(state.valid_sel);
592
0
      } else {
593
0
        sel.Initialize(nullptr);
594
0
      }
595
      //! first, we scan the columns with filters, fetch their data and generate a selection vector.
596
      //! get runtime statistics
597
0
      auto adaptive_filter = filter_info.GetAdaptiveFilter();
598
0
      auto filter_state = filter_info.BeginFilter();
599
0
      if (has_filters) {
600
0
        D_ASSERT(ALLOW_UPDATES);
601
0
        auto &filter_list = filter_info.GetFilterList();
602
0
        for (idx_t i = 0; i < filter_list.size(); i++) {
603
0
          auto filter_idx = adaptive_filter->permutation[i];
604
0
          auto &filter = filter_list[filter_idx];
605
0
          if (filter.IsAlwaysTrue()) {
606
            // this filter is always true - skip it
607
0
            continue;
608
0
          }
609
0
          auto &table_filter_state = *filter.filter_state;
610
611
0
          const auto scan_idx = filter.scan_column_index;
612
0
          const auto column_idx = filter.table_column_index;
613
614
0
          auto &result_vector = result.data[scan_idx];
615
0
          if (column_idx == COLUMN_IDENTIFIER_ROW_ID) {
616
            // We do another quick statistics scan for row ids here
617
0
            const auto rowid_start = this->start + current_row;
618
0
            const auto rowid_end = this->start + current_row + max_count;
619
0
            const auto prune_result = CheckRowIdFilter(filter.filter, rowid_start, rowid_end);
620
0
            if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
621
              // We can just break out of the loop here.
622
0
              approved_tuple_count = 0;
623
0
              continue;
624
0
            }
625
626
            // Generate row ids
627
            // Create sequence for row ids
628
0
            D_ASSERT(result_vector.GetType().InternalType() == ROW_TYPE);
629
0
            result_vector.SetVectorType(VectorType::FLAT_VECTOR);
630
0
            auto result_data = FlatVector::GetData<int64_t>(result_vector);
631
0
            for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) {
632
0
              result_data[sel.get_index(sel_idx)] =
633
0
                  UnsafeNumericCast<int64_t>(this->start + current_row + sel.get_index(sel_idx));
634
0
            }
635
636
            // Was this filter always true? If so, we dont need to apply it
637
0
            if (prune_result == FilterPropagateResult::FILTER_ALWAYS_TRUE) {
638
0
              continue;
639
0
            }
640
641
            // Now apply the filter
642
0
            UnifiedVectorFormat vdata;
643
0
            result_vector.ToUnifiedFormat(approved_tuple_count, vdata);
644
0
            ColumnSegment::FilterSelection(sel, result_vector, vdata, filter.filter, table_filter_state,
645
0
                                           approved_tuple_count, approved_tuple_count);
646
647
0
          } else {
648
0
            auto &col_data = GetColumn(filter.table_column_index);
649
0
            col_data.Filter(transaction, state.vector_index, state.column_scans[scan_idx], result_vector,
650
0
                            sel, approved_tuple_count, filter.filter, table_filter_state);
651
0
          }
652
0
        }
653
0
        for (auto &table_filter : filter_list) {
654
0
          if (table_filter.IsAlwaysTrue()) {
655
0
            continue;
656
0
          }
657
0
          result.data[table_filter.scan_column_index].Slice(sel, approved_tuple_count);
658
0
        }
659
0
      }
660
0
      if (approved_tuple_count == 0) {
661
        // all rows were filtered out by the table filters
662
0
        D_ASSERT(has_filters);
663
0
        result.Reset();
664
        // skip this vector in all the scans that were not scanned yet
665
0
        for (idx_t i = 0; i < column_ids.size(); i++) {
666
0
          auto &col_idx = column_ids[i];
667
0
          if (col_idx.IsRowIdColumn()) {
668
0
            continue;
669
0
          }
670
0
          if (has_filters && filter_info.ColumnHasFilters(i)) {
671
0
            continue;
672
0
          }
673
0
          auto &col_data = GetColumn(col_idx);
674
0
          col_data.Skip(state.column_scans[i]);
675
0
        }
676
0
        state.vector_index++;
677
0
        continue;
678
0
      }
679
      //! Now we use the selection vector to fetch data for the other columns.
680
0
      for (idx_t i = 0; i < column_ids.size(); i++) {
681
0
        if (has_filters && filter_info.ColumnHasFilters(i)) {
682
          // column has already been scanned as part of the filtering process
683
0
          continue;
684
0
        }
685
0
        auto &column = column_ids[i];
686
0
        if (column.IsRowIdColumn()) {
687
0
          D_ASSERT(result.data[i].GetType().InternalType() == ROW_TYPE);
688
0
          result.data[i].SetVectorType(VectorType::FLAT_VECTOR);
689
0
          auto result_data = FlatVector::GetData<int64_t>(result.data[i]);
690
0
          for (size_t sel_idx = 0; sel_idx < approved_tuple_count; sel_idx++) {
691
0
            result_data[sel_idx] =
692
0
                UnsafeNumericCast<int64_t>(this->start + current_row + sel.get_index(sel_idx));
693
0
          }
694
0
        } else {
695
0
          auto &col_data = GetColumn(column);
696
0
          if (TYPE == TableScanType::TABLE_SCAN_REGULAR) {
697
0
            col_data.Select(transaction, state.vector_index, state.column_scans[i], result.data[i], sel,
698
0
                            approved_tuple_count);
699
0
          } else {
700
0
            col_data.SelectCommitted(state.vector_index, state.column_scans[i], result.data[i], sel,
701
0
                                     approved_tuple_count, ALLOW_UPDATES);
702
0
          }
703
0
        }
704
0
      }
705
0
      filter_info.EndFilter(filter_state);
706
707
0
      D_ASSERT(approved_tuple_count > 0);
708
0
      count = approved_tuple_count;
709
0
    }
710
7
    result.SetCardinality(count);
711
7
    state.vector_index++;
712
7
    break;
713
7
  }
714
14
}
Unexecuted instantiation: void duckdb::RowGroup::TemplatedScan<(duckdb::TableScanType)1>(duckdb::TransactionData, duckdb::CollectionScanState&, duckdb::DataChunk&)
Unexecuted instantiation: void duckdb::RowGroup::TemplatedScan<(duckdb::TableScanType)2>(duckdb::TransactionData, duckdb::CollectionScanState&, duckdb::DataChunk&)
Unexecuted instantiation: void duckdb::RowGroup::TemplatedScan<(duckdb::TableScanType)3>(duckdb::TransactionData, duckdb::CollectionScanState&, duckdb::DataChunk&)
715
716
14
void RowGroup::Scan(TransactionData transaction, CollectionScanState &state, DataChunk &result) {
717
14
  TemplatedScan<TableScanType::TABLE_SCAN_REGULAR>(transaction, state, result);
718
14
}
719
720
0
void RowGroup::ScanCommitted(CollectionScanState &state, DataChunk &result, TableScanType type) {
721
0
  auto &transaction_manager = DuckTransactionManager::Get(GetCollection().GetAttached());
722
723
0
  transaction_t start_ts;
724
0
  transaction_t transaction_id;
725
0
  if (type == TableScanType::TABLE_SCAN_LATEST_COMMITTED_ROWS) {
726
0
    start_ts = transaction_manager.GetLastCommit() + 1;
727
0
    transaction_id = MAX_TRANSACTION_ID;
728
0
  } else {
729
0
    start_ts = transaction_manager.LowestActiveStart();
730
0
    transaction_id = transaction_manager.LowestActiveId();
731
0
  }
732
0
  TransactionData data(transaction_id, start_ts);
733
0
  switch (type) {
734
0
  case TableScanType::TABLE_SCAN_COMMITTED_ROWS:
735
0
    TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS>(data, state, result);
736
0
    break;
737
0
  case TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES:
738
0
    TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS_DISALLOW_UPDATES>(data, state, result);
739
0
    break;
740
0
  case TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED:
741
0
  case TableScanType::TABLE_SCAN_LATEST_COMMITTED_ROWS:
742
0
    TemplatedScan<TableScanType::TABLE_SCAN_COMMITTED_ROWS_OMIT_PERMANENTLY_DELETED>(data, state, result);
743
0
    break;
744
0
  default:
745
0
    throw InternalException("Unrecognized table scan type");
746
0
  }
747
0
}
748
749
784
optional_ptr<RowVersionManager> RowGroup::GetVersionInfo() {
750
784
  if (!HasUnloadedDeletes()) {
751
    // deletes are loaded - return the version info
752
784
    return version_info;
753
784
  }
754
0
  lock_guard<mutex> lock(row_group_lock);
755
  // double-check after obtaining the lock whether or not deletes are still not loaded to avoid double load
756
0
  if (!HasUnloadedDeletes()) {
757
0
    return version_info;
758
0
  }
759
  // deletes are not loaded - reload
760
0
  auto root_delete = deletes_pointers[0];
761
0
  auto loaded_info = RowVersionManager::Deserialize(root_delete, GetBlockManager().GetMetadataManager(), start);
762
0
  SetVersionInfo(std::move(loaded_info));
763
0
  deletes_is_loaded = true;
764
0
  return version_info;
765
0
}
766
767
201
void RowGroup::SetVersionInfo(shared_ptr<RowVersionManager> version) {
768
201
  owned_version_info = std::move(version);
769
201
  version_info = owned_version_info.get();
770
201
}
771
772
201
shared_ptr<RowVersionManager> RowGroup::GetOrCreateVersionInfoInternal() {
773
  // version info does not exist - need to create it
774
201
  lock_guard<mutex> lock(row_group_lock);
775
201
  if (!owned_version_info) {
776
201
    auto new_info = make_shared_ptr<RowVersionManager>(start);
777
201
    SetVersionInfo(std::move(new_info));
778
201
  }
779
201
  return owned_version_info;
780
201
}
781
782
0
shared_ptr<RowVersionManager> RowGroup::GetOrCreateVersionInfoPtr() {
783
0
  auto vinfo = GetVersionInfo();
784
0
  if (vinfo) {
785
    // version info exists - return it directly
786
0
    return owned_version_info;
787
0
  }
788
0
  return GetOrCreateVersionInfoInternal();
789
0
}
790
791
585
RowVersionManager &RowGroup::GetOrCreateVersionInfo() {
792
585
  auto vinfo = GetVersionInfo();
793
585
  if (vinfo) {
794
    // version info exists - return it directly
795
384
    return *vinfo;
796
384
  }
797
201
  return *GetOrCreateVersionInfoInternal();
798
585
}
799
800
idx_t RowGroup::GetSelVector(TransactionData transaction, idx_t vector_idx, SelectionVector &sel_vector,
801
7
                             idx_t max_count) {
802
7
  auto vinfo = GetVersionInfo();
803
7
  if (!vinfo) {
804
0
    return max_count;
805
0
  }
806
7
  return vinfo->GetSelVector(transaction, vector_idx, sel_vector, max_count);
807
7
}
808
809
idx_t RowGroup::GetCommittedSelVector(transaction_t start_time, transaction_t transaction_id, idx_t vector_idx,
810
0
                                      SelectionVector &sel_vector, idx_t max_count) {
811
0
  auto vinfo = GetVersionInfo();
812
0
  if (!vinfo) {
813
0
    return max_count;
814
0
  }
815
0
  return vinfo->GetCommittedSelVector(start_time, transaction_id, vector_idx, sel_vector, max_count);
816
0
}
817
818
0
bool RowGroup::Fetch(TransactionData transaction, idx_t row) {
819
0
  D_ASSERT(row < this->count);
820
0
  auto vinfo = GetVersionInfo();
821
0
  if (!vinfo) {
822
0
    return true;
823
0
  }
824
0
  return vinfo->Fetch(transaction, row);
825
0
}
826
827
void RowGroup::FetchRow(TransactionData transaction, ColumnFetchState &state, const vector<StorageIndex> &column_ids,
828
0
                        row_t row_id, DataChunk &result, idx_t result_idx) {
829
0
  for (idx_t col_idx = 0; col_idx < column_ids.size(); col_idx++) {
830
0
    auto &column = column_ids[col_idx];
831
0
    auto &result_vector = result.data[col_idx];
832
0
    D_ASSERT(result_vector.GetVectorType() == VectorType::FLAT_VECTOR);
833
0
    D_ASSERT(!FlatVector::IsNull(result_vector, result_idx));
834
0
    if (column.IsRowIdColumn()) {
835
      // row id column: fill in the row ids
836
0
      D_ASSERT(result_vector.GetType().InternalType() == PhysicalType::INT64);
837
0
      result_vector.SetVectorType(VectorType::FLAT_VECTOR);
838
0
      auto data = FlatVector::GetData<row_t>(result_vector);
839
0
      data[result_idx] = row_id;
840
0
    } else {
841
      // regular column: fetch data from the base column
842
0
      auto &col_data = GetColumn(column);
843
0
      col_data.FetchRow(transaction, state, row_id, result_vector, result_idx);
844
0
    }
845
0
  }
846
0
}
847
848
201
void RowGroup::AppendVersionInfo(TransactionData transaction, idx_t count) {
849
201
  const idx_t row_group_size = GetRowGroupSize();
850
201
  idx_t row_group_start = this->count.load();
851
201
  idx_t row_group_end = row_group_start + count;
852
201
  if (row_group_end > row_group_size) {
853
0
    row_group_end = row_group_size;
854
0
  }
855
  // create the version_info if it doesn't exist yet
856
201
  auto &vinfo = GetOrCreateVersionInfo();
857
201
  vinfo.AppendVersionInfo(transaction, count, row_group_start, row_group_end);
858
201
  this->count = row_group_end;
859
201
}
860
861
192
void RowGroup::CommitAppend(transaction_t commit_id, idx_t row_group_start, idx_t count) {
862
192
  auto &vinfo = GetOrCreateVersionInfo();
863
192
  vinfo.CommitAppend(commit_id, row_group_start, count);
864
192
}
865
866
0
void RowGroup::RevertAppend(idx_t row_group_start) {
867
0
  auto &vinfo = GetOrCreateVersionInfo();
868
0
  vinfo.RevertAppend(row_group_start - this->start);
869
0
  for (auto &column : columns) {
870
0
    column->RevertAppend(UnsafeNumericCast<row_t>(row_group_start));
871
0
  }
872
0
  this->count = MinValue<idx_t>(row_group_start - this->start, this->count);
873
0
  Verify();
874
0
}
875
876
201
void RowGroup::InitializeAppend(RowGroupAppendState &append_state) {
877
201
  append_state.row_group = this;
878
201
  append_state.offset_in_row_group = this->count;
879
  // for each column, initialize the append state
880
201
  append_state.states = make_unsafe_uniq_array<ColumnAppendState>(GetColumnCount());
881
2.85k
  for (idx_t i = 0; i < GetColumnCount(); i++) {
882
2.65k
    auto &col_data = GetColumn(i);
883
2.65k
    col_data.InitializeAppend(append_state.states[i]);
884
2.65k
  }
885
201
}
886
887
201
void RowGroup::Append(RowGroupAppendState &state, DataChunk &chunk, idx_t append_count) {
888
  // append to the current row_group
889
201
  D_ASSERT(chunk.ColumnCount() == GetColumnCount());
890
2.85k
  for (idx_t i = 0; i < GetColumnCount(); i++) {
891
2.65k
    auto &col_data = GetColumn(i);
892
2.65k
    auto prev_allocation_size = col_data.GetAllocationSize();
893
2.65k
    col_data.Append(state.states[i], chunk.data[i], append_count);
894
2.65k
    allocation_size += col_data.GetAllocationSize() - prev_allocation_size;
895
2.65k
  }
896
201
  state.offset_in_row_group += append_count;
897
201
}
898
899
192
void RowGroup::CleanupAppend(transaction_t lowest_transaction, idx_t start, idx_t count) {
900
192
  auto &vinfo = GetOrCreateVersionInfo();
901
192
  vinfo.CleanupAppend(lowest_transaction, start, count);
902
192
}
903
904
void RowGroup::Update(TransactionData transaction, DataChunk &update_chunk, row_t *ids, idx_t offset, idx_t count,
905
0
                      const vector<PhysicalIndex> &column_ids) {
906
#ifdef DEBUG
907
  for (size_t i = offset; i < offset + count; i++) {
908
    D_ASSERT(ids[i] >= row_t(this->start) && ids[i] < row_t(this->start + this->count));
909
  }
910
#endif
911
0
  for (idx_t i = 0; i < column_ids.size(); i++) {
912
0
    auto column = column_ids[i];
913
0
    D_ASSERT(column.index != COLUMN_IDENTIFIER_ROW_ID);
914
0
    auto &col_data = GetColumn(column.index);
915
0
    D_ASSERT(col_data.type.id() == update_chunk.data[i].GetType().id());
916
0
    if (offset > 0) {
917
0
      Vector sliced_vector(update_chunk.data[i], offset, offset + count);
918
0
      sliced_vector.Flatten(count);
919
0
      col_data.Update(transaction, column.index, sliced_vector, ids + offset, count);
920
0
    } else {
921
0
      col_data.Update(transaction, column.index, update_chunk.data[i], ids, count);
922
0
    }
923
0
    MergeStatistics(column.index, *col_data.GetUpdateStatistics());
924
0
  }
925
0
}
926
927
void RowGroup::UpdateColumn(TransactionData transaction, DataChunk &updates, Vector &row_ids,
928
0
                            const vector<column_t> &column_path) {
929
0
  D_ASSERT(updates.ColumnCount() == 1);
930
0
  auto ids = FlatVector::GetData<row_t>(row_ids);
931
932
0
  auto primary_column_idx = column_path[0];
933
0
  D_ASSERT(primary_column_idx != COLUMN_IDENTIFIER_ROW_ID);
934
0
  D_ASSERT(primary_column_idx < columns.size());
935
0
  auto &col_data = GetColumn(primary_column_idx);
936
0
  col_data.UpdateColumn(transaction, column_path, updates.data[0], ids, updates.size(), 1);
937
0
  MergeStatistics(primary_column_idx, *col_data.GetUpdateStatistics());
938
0
}
939
940
0
unique_ptr<BaseStatistics> RowGroup::GetStatistics(idx_t column_idx) {
941
0
  auto &col_data = GetColumn(column_idx);
942
0
  return col_data.GetStatistics();
943
0
}
944
945
0
void RowGroup::MergeStatistics(idx_t column_idx, const BaseStatistics &other) {
946
0
  auto &col_data = GetColumn(column_idx);
947
0
  col_data.MergeStatistics(other);
948
0
}
949
950
2.65k
void RowGroup::MergeIntoStatistics(idx_t column_idx, BaseStatistics &other) {
951
2.65k
  auto &col_data = GetColumn(column_idx);
952
2.65k
  col_data.MergeIntoStatistics(other);
953
2.65k
}
954
955
201
void RowGroup::MergeIntoStatistics(TableStatistics &other) {
956
201
  auto stats_lock = other.GetLock();
957
2.85k
  for (idx_t i = 0; i < columns.size(); i++) {
958
2.65k
    MergeIntoStatistics(i, other.GetStats(*stats_lock, i).Statistics());
959
2.65k
  }
960
201
}
961
962
0
CompressionType ColumnCheckpointInfo::GetCompressionType() {
963
0
  return info.compression_types[column_idx];
964
0
}
965
966
0
RowGroupWriteData RowGroup::WriteToDisk(RowGroupWriteInfo &info) {
967
0
  RowGroupWriteData result;
968
0
  result.states.reserve(columns.size());
969
0
  result.statistics.reserve(columns.size());
970
971
  // Checkpoint the individual columns of the row group
972
  // Here we're iterating over columns. Each column can have multiple segments.
973
  // (Some columns will be wider than others, and require different numbers
974
  // of blocks to encode.) Segments cannot span blocks.
975
  //
976
  // Some of these columns are composite (list, struct). The data is written
977
  // first sequentially, and the pointers are written later, so that the
978
  // pointers all end up densely packed, and thus more cache-friendly.
979
0
  for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) {
980
0
    auto &column = GetColumn(column_idx);
981
0
    ColumnCheckpointInfo checkpoint_info(info, column_idx);
982
0
    auto checkpoint_state = column.Checkpoint(*this, checkpoint_info);
983
0
    D_ASSERT(checkpoint_state);
984
985
0
    auto stats = checkpoint_state->GetStatistics();
986
0
    D_ASSERT(stats);
987
988
0
    result.statistics.push_back(stats->Copy());
989
0
    result.states.push_back(std::move(checkpoint_state));
990
0
  }
991
0
  D_ASSERT(result.states.size() == result.statistics.size());
992
0
  return result;
993
0
}
994
995
0
idx_t RowGroup::GetCommittedRowCount() {
996
0
  auto vinfo = GetVersionInfo();
997
0
  if (!vinfo) {
998
0
    return count;
999
0
  }
1000
0
  return count - vinfo->GetCommittedDeletedCount(count);
1001
0
}
1002
1003
976
bool RowGroup::HasUnloadedDeletes() const {
1004
976
  if (deletes_pointers.empty()) {
1005
    // no stored deletes at all
1006
976
    return false;
1007
976
  }
1008
  // return whether or not the deletes have been loaded
1009
0
  return !deletes_is_loaded;
1010
976
}
1011
1012
0
RowGroupWriteData RowGroup::WriteToDisk(RowGroupWriter &writer) {
1013
0
  vector<CompressionType> compression_types;
1014
0
  compression_types.reserve(columns.size());
1015
1016
0
  for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) {
1017
0
    auto &column = GetColumn(column_idx);
1018
0
    if (column.count != this->count) {
1019
0
      throw InternalException("Corrupted in-memory column - column with index %llu has misaligned count (row "
1020
0
                              "group has %llu rows, column has %llu)",
1021
0
                              column_idx, this->count.load(), column.count.load());
1022
0
    }
1023
0
    auto compression_type = writer.GetColumnCompressionType(column_idx);
1024
0
    compression_types.push_back(compression_type);
1025
0
  }
1026
1027
0
  RowGroupWriteInfo info(writer.GetPartialBlockManager(), compression_types, writer.GetCheckpointType());
1028
0
  return WriteToDisk(info);
1029
0
}
1030
1031
RowGroupPointer RowGroup::Checkpoint(RowGroupWriteData write_data, RowGroupWriter &writer,
1032
0
                                     TableStatistics &global_stats) {
1033
0
  RowGroupPointer row_group_pointer;
1034
1035
0
  auto lock = global_stats.GetLock();
1036
0
  for (idx_t column_idx = 0; column_idx < GetColumnCount(); column_idx++) {
1037
0
    global_stats.GetStats(*lock, column_idx).Statistics().Merge(write_data.statistics[column_idx]);
1038
0
  }
1039
1040
  // construct the row group pointer and write the column meta data to disk
1041
0
  D_ASSERT(write_data.states.size() == columns.size());
1042
0
  row_group_pointer.row_start = start;
1043
0
  row_group_pointer.tuple_count = count;
1044
0
  for (auto &state : write_data.states) {
1045
    // get the current position of the table data writer
1046
0
    auto &data_writer = writer.GetPayloadWriter();
1047
0
    auto pointer = data_writer.GetMetaBlockPointer();
1048
1049
    // store the stats and the data pointers in the row group pointers
1050
0
    row_group_pointer.data_pointers.push_back(pointer);
1051
1052
    // Write pointers to the column segments.
1053
    //
1054
    // Just as above, the state can refer to many other states, so this
1055
    // can cascade recursively into more pointer writes.
1056
0
    auto persistent_data = state->ToPersistentData();
1057
0
    BinarySerializer serializer(data_writer);
1058
0
    serializer.Begin();
1059
0
    persistent_data.Serialize(serializer);
1060
0
    serializer.End();
1061
0
  }
1062
0
  row_group_pointer.deletes_pointers = CheckpointDeletes(writer.GetPayloadWriter().GetManager());
1063
0
  Verify();
1064
0
  return row_group_pointer;
1065
0
}
1066
1067
0
bool RowGroup::IsPersistent() const {
1068
0
  for (auto &column : columns) {
1069
0
    if (!column->IsPersistent()) {
1070
      // column is not persistent
1071
0
      return false;
1072
0
    }
1073
0
  }
1074
0
  return true;
1075
0
}
1076
1077
0
PersistentRowGroupData RowGroup::SerializeRowGroupInfo() const {
1078
  // all columns are persistent - serialize
1079
0
  PersistentRowGroupData result;
1080
0
  for (auto &col : columns) {
1081
0
    result.column_data.push_back(col->Serialize());
1082
0
  }
1083
0
  result.start = start;
1084
0
  result.count = count;
1085
0
  return result;
1086
0
}
1087
1088
0
vector<MetaBlockPointer> RowGroup::CheckpointDeletes(MetadataManager &manager) {
1089
0
  if (HasUnloadedDeletes()) {
1090
    // deletes were not loaded so they cannot be changed
1091
    // re-use them as-is
1092
0
    manager.ClearModifiedBlocks(deletes_pointers);
1093
0
    return deletes_pointers;
1094
0
  }
1095
0
  auto vinfo = GetVersionInfo();
1096
0
  if (!vinfo) {
1097
    // no version information: write nothing
1098
0
    return vector<MetaBlockPointer>();
1099
0
  }
1100
0
  return vinfo->Checkpoint(manager);
1101
0
}
1102
1103
0
void RowGroup::Serialize(RowGroupPointer &pointer, Serializer &serializer) {
1104
0
  serializer.WriteProperty(100, "row_start", pointer.row_start);
1105
0
  serializer.WriteProperty(101, "tuple_count", pointer.tuple_count);
1106
0
  serializer.WriteProperty(102, "data_pointers", pointer.data_pointers);
1107
0
  serializer.WriteProperty(103, "delete_pointers", pointer.deletes_pointers);
1108
0
}
1109
1110
0
RowGroupPointer RowGroup::Deserialize(Deserializer &deserializer) {
1111
0
  RowGroupPointer result;
1112
0
  result.row_start = deserializer.ReadProperty<uint64_t>(100, "row_start");
1113
0
  result.tuple_count = deserializer.ReadProperty<uint64_t>(101, "tuple_count");
1114
0
  result.data_pointers = deserializer.ReadProperty<vector<MetaBlockPointer>>(102, "data_pointers");
1115
0
  result.deletes_pointers = deserializer.ReadProperty<vector<MetaBlockPointer>>(103, "delete_pointers");
1116
0
  return result;
1117
0
}
1118
1119
//===--------------------------------------------------------------------===//
1120
// GetPartitionStats
1121
//===--------------------------------------------------------------------===//
1122
0
PartitionStatistics RowGroup::GetPartitionStats() const {
1123
0
  PartitionStatistics result;
1124
0
  result.row_start = start;
1125
0
  result.count = count;
1126
0
  if (HasUnloadedDeletes() || version_info.load().get()) {
1127
    // we have version info - approx count
1128
0
    result.count_type = CountType::COUNT_APPROXIMATE;
1129
0
  } else {
1130
0
    result.count_type = CountType::COUNT_EXACT;
1131
0
  }
1132
0
  return result;
1133
0
}
1134
1135
//===--------------------------------------------------------------------===//
1136
// GetColumnSegmentInfo
1137
//===--------------------------------------------------------------------===//
1138
0
void RowGroup::GetColumnSegmentInfo(idx_t row_group_index, vector<ColumnSegmentInfo> &result) {
1139
0
  for (idx_t col_idx = 0; col_idx < GetColumnCount(); col_idx++) {
1140
0
    auto &col_data = GetColumn(col_idx);
1141
0
    col_data.GetColumnSegmentInfo(row_group_index, {col_idx}, result);
1142
0
  }
1143
0
}
1144
1145
//===--------------------------------------------------------------------===//
1146
// Version Delete Information
1147
//===--------------------------------------------------------------------===//
1148
class VersionDeleteState {
1149
public:
1150
  VersionDeleteState(RowGroup &info, TransactionData transaction, DataTable &table, idx_t base_row)
1151
0
      : info(info), transaction(transaction), table(table), current_chunk(DConstants::INVALID_INDEX), count(0),
1152
0
        base_row(base_row), delete_count(0) {
1153
0
  }
1154
1155
  RowGroup &info;
1156
  TransactionData transaction;
1157
  DataTable &table;
1158
  idx_t current_chunk;
1159
  row_t rows[STANDARD_VECTOR_SIZE];
1160
  idx_t count;
1161
  idx_t base_row;
1162
  idx_t chunk_row;
1163
  idx_t delete_count;
1164
1165
public:
1166
  void Delete(row_t row_id);
1167
  void Flush();
1168
};
1169
1170
0
idx_t RowGroup::Delete(TransactionData transaction, DataTable &table, row_t *ids, idx_t count) {
1171
0
  VersionDeleteState del_state(*this, transaction, table, this->start);
1172
1173
  // obtain a write lock
1174
0
  for (idx_t i = 0; i < count; i++) {
1175
0
    D_ASSERT(ids[i] >= 0);
1176
0
    D_ASSERT(idx_t(ids[i]) >= this->start && idx_t(ids[i]) < this->start + this->count);
1177
0
    del_state.Delete(ids[i] - UnsafeNumericCast<row_t>(this->start));
1178
0
  }
1179
0
  del_state.Flush();
1180
0
  return del_state.delete_count;
1181
0
}
1182
1183
201
void RowGroup::Verify() {
1184
#ifdef DEBUG
1185
  for (auto &column : GetColumns()) {
1186
    column->Verify(*this);
1187
  }
1188
#endif
1189
201
}
1190
1191
0
idx_t RowGroup::DeleteRows(idx_t vector_idx, transaction_t transaction_id, row_t rows[], idx_t count) {
1192
0
  return GetOrCreateVersionInfo().DeleteRows(vector_idx, transaction_id, rows, count);
1193
0
}
1194
1195
0
void VersionDeleteState::Delete(row_t row_id) {
1196
0
  D_ASSERT(row_id >= 0);
1197
0
  idx_t vector_idx = UnsafeNumericCast<idx_t>(row_id) / STANDARD_VECTOR_SIZE;
1198
0
  idx_t idx_in_vector = UnsafeNumericCast<idx_t>(row_id) - vector_idx * STANDARD_VECTOR_SIZE;
1199
0
  if (current_chunk != vector_idx) {
1200
0
    Flush();
1201
1202
0
    current_chunk = vector_idx;
1203
0
    chunk_row = vector_idx * STANDARD_VECTOR_SIZE;
1204
0
  }
1205
0
  rows[count++] = UnsafeNumericCast<row_t>(idx_in_vector);
1206
0
}
1207
1208
0
void VersionDeleteState::Flush() {
1209
0
  if (count == 0) {
1210
0
    return;
1211
0
  }
1212
  // it is possible for delete statements to delete the same tuple multiple times when combined with a USING clause
1213
  // in the current_info->Delete, we check which tuples are actually deleted (excluding duplicate deletions)
1214
  // this is returned in the actual_delete_count
1215
0
  auto actual_delete_count = info.DeleteRows(current_chunk, transaction.transaction_id, rows, count);
1216
0
  delete_count += actual_delete_count;
1217
0
  if (transaction.transaction && actual_delete_count > 0) {
1218
    // now push the delete into the undo buffer, but only if any deletes were actually performed
1219
0
    transaction.transaction->PushDelete(table, info.GetOrCreateVersionInfo(), current_chunk, rows,
1220
0
                                        actual_delete_count, base_row + chunk_row);
1221
0
  }
1222
0
  count = 0;
1223
0
}
1224
1225
} // namespace duckdb