Coverage Report

Created: 2025-06-12 07:25

/src/duckdb/src/execution/perfect_aggregate_hashtable.cpp
Line
Count
Source (jump to first uncovered line)
1
#include "duckdb/execution/perfect_aggregate_hashtable.hpp"
2
3
#include "duckdb/common/numeric_utils.hpp"
4
#include "duckdb/common/row_operations/row_operations.hpp"
5
#include "duckdb/execution/expression_executor.hpp"
6
7
namespace duckdb {
8
9
PerfectAggregateHashTable::PerfectAggregateHashTable(ClientContext &context, Allocator &allocator,
10
                                                     const vector<LogicalType> &group_types_p,
11
                                                     vector<LogicalType> payload_types_p,
12
                                                     vector<AggregateObject> aggregate_objects_p,
13
                                                     vector<Value> group_minima_p, vector<idx_t> required_bits_p)
14
0
    : BaseAggregateHashTable(context, allocator, aggregate_objects_p, std::move(payload_types_p)),
15
0
      addresses(LogicalType::POINTER), required_bits(std::move(required_bits_p)), total_required_bits(0),
16
0
      group_minima(std::move(group_minima_p)), sel(STANDARD_VECTOR_SIZE),
17
0
      aggregate_allocator(make_uniq<ArenaAllocator>(allocator)) {
18
0
  for (auto &group_bits : required_bits) {
19
0
    total_required_bits += group_bits;
20
0
  }
21
  // the total amount of groups we allocate space for is 2^required_bits
22
0
  total_groups = (uint64_t)1 << total_required_bits;
23
  // we don't need to store the groups in a perfect hash table, since the group keys can be deduced by their location
24
0
  grouping_columns = group_types_p.size();
25
0
  layout_ptr->Initialize(std::move(aggregate_objects_p));
26
0
  tuple_size = layout_ptr->GetRowWidth();
27
28
  // allocate and null initialize the data
29
0
  owned_data = make_unsafe_uniq_array_uninitialized<data_t>(tuple_size * total_groups);
30
0
  data = owned_data.get();
31
32
  // set up the empty payloads for every tuple, and initialize the "occupied" flag to false
33
0
  group_is_set = make_unsafe_uniq_array_uninitialized<bool>(total_groups);
34
0
  memset(group_is_set.get(), 0, total_groups * sizeof(bool));
35
36
  // initialize the hash table for each entry
37
0
  auto address_data = FlatVector::GetData<uintptr_t>(addresses);
38
0
  idx_t init_count = 0;
39
0
  for (idx_t i = 0; i < total_groups; i++) {
40
0
    address_data[init_count] = uintptr_t(data) + (tuple_size * i);
41
0
    init_count++;
42
0
    if (init_count == STANDARD_VECTOR_SIZE) {
43
0
      RowOperations::InitializeStates(*layout_ptr, addresses, *FlatVector::IncrementalSelectionVector(),
44
0
                                      init_count);
45
0
      init_count = 0;
46
0
    }
47
0
  }
48
0
  RowOperations::InitializeStates(*layout_ptr, addresses, *FlatVector::IncrementalSelectionVector(), init_count);
49
0
}
50
51
0
PerfectAggregateHashTable::~PerfectAggregateHashTable() {
52
0
  Destroy();
53
0
}
54
55
template <class T>
56
static void ComputeGroupLocationTemplated(UnifiedVectorFormat &group_data, Value &min, uintptr_t *address_data,
57
0
                                          idx_t current_shift, idx_t count) {
58
0
  auto data = UnifiedVectorFormat::GetData<T>(group_data);
59
0
  auto min_val = min.GetValueUnsafe<T>();
60
0
  if (!group_data.validity.AllValid()) {
61
0
    for (idx_t i = 0; i < count; i++) {
62
0
      auto index = group_data.sel->get_index(i);
63
      // check if the value is NULL
64
      // NULL groups are considered as "0" in the hash table
65
      // that is to say, they have no effect on the position of the element (because 0 << shift is 0)
66
      // we only need to handle non-null values here
67
0
      if (group_data.validity.RowIsValid(index)) {
68
0
        D_ASSERT(data[index] >= min_val);
69
0
        auto adjusted_value = UnsafeNumericCast<uintptr_t>((data[index] - min_val) + 1);
70
0
        address_data[i] += adjusted_value << current_shift;
71
0
      }
72
0
    }
73
0
  } else {
74
    // no null values: we can directly compute the addresses
75
0
    for (idx_t i = 0; i < count; i++) {
76
0
      auto index = group_data.sel->get_index(i);
77
0
      auto adjusted_value = UnsafeNumericCast<uintptr_t>((data[index] - min_val) + 1);
78
0
      address_data[i] += adjusted_value << current_shift;
79
0
    }
80
0
  }
81
0
}
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<signed char>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<short>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<int>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<long>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<unsigned char>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<unsigned short>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<unsigned int>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<unsigned long>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long)
82
83
0
static void ComputeGroupLocation(Vector &group, Value &min, uintptr_t *address_data, idx_t current_shift, idx_t count) {
84
0
  UnifiedVectorFormat vdata;
85
0
  group.ToUnifiedFormat(count, vdata);
86
87
0
  switch (group.GetType().InternalType()) {
88
0
  case PhysicalType::INT8:
89
0
    ComputeGroupLocationTemplated<int8_t>(vdata, min, address_data, current_shift, count);
90
0
    break;
91
0
  case PhysicalType::INT16:
92
0
    ComputeGroupLocationTemplated<int16_t>(vdata, min, address_data, current_shift, count);
93
0
    break;
94
0
  case PhysicalType::INT32:
95
0
    ComputeGroupLocationTemplated<int32_t>(vdata, min, address_data, current_shift, count);
96
0
    break;
97
0
  case PhysicalType::INT64:
98
0
    ComputeGroupLocationTemplated<int64_t>(vdata, min, address_data, current_shift, count);
99
0
    break;
100
0
  case PhysicalType::UINT8:
101
0
    ComputeGroupLocationTemplated<uint8_t>(vdata, min, address_data, current_shift, count);
102
0
    break;
103
0
  case PhysicalType::UINT16:
104
0
    ComputeGroupLocationTemplated<uint16_t>(vdata, min, address_data, current_shift, count);
105
0
    break;
106
0
  case PhysicalType::UINT32:
107
0
    ComputeGroupLocationTemplated<uint32_t>(vdata, min, address_data, current_shift, count);
108
0
    break;
109
0
  case PhysicalType::UINT64:
110
0
    ComputeGroupLocationTemplated<uint64_t>(vdata, min, address_data, current_shift, count);
111
0
    break;
112
0
  default:
113
0
    throw InternalException("Unsupported group type for perfect aggregate hash table");
114
0
  }
115
0
}
116
117
0
void PerfectAggregateHashTable::AddChunk(DataChunk &groups, DataChunk &payload) {
118
  // first we need to find the location in the HT of each of the groups
119
0
  auto address_data = FlatVector::GetData<uintptr_t>(addresses);
120
  // zero-initialize the address data
121
0
  memset(address_data, 0, groups.size() * sizeof(uintptr_t));
122
0
  D_ASSERT(groups.ColumnCount() == group_minima.size());
123
124
  // then compute the actual group location by iterating over each of the groups
125
0
  idx_t current_shift = total_required_bits;
126
0
  for (idx_t i = 0; i < groups.ColumnCount(); i++) {
127
0
    current_shift -= required_bits[i];
128
0
    ComputeGroupLocation(groups.data[i], group_minima[i], address_data, current_shift, groups.size());
129
0
  }
130
  // now we have the HT entry number for every tuple
131
  // compute the actual pointer to the data by adding it to the base HT pointer and multiplying by the tuple size
132
0
  for (idx_t i = 0; i < groups.size(); i++) {
133
0
    const auto group = address_data[i];
134
0
    if (group >= total_groups) {
135
0
      throw InvalidInputException("Perfect hash aggregate: aggregate group %llu exceeded total groups %llu. This "
136
0
                                  "likely means that the statistics in your data source are corrupt.\n* PRAGMA "
137
0
                                  "disable_optimizer to disable optimizations that rely on correct statistics",
138
0
                                  group, total_groups);
139
0
    }
140
0
    group_is_set[group] = true;
141
0
    address_data[i] = uintptr_t(data) + group * tuple_size;
142
0
  }
143
144
  // after finding the group location we update the aggregates
145
0
  idx_t payload_idx = 0;
146
0
  auto &aggregates = layout_ptr->GetAggregates();
147
0
  RowOperationsState row_state(*aggregate_allocator);
148
0
  for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) {
149
0
    auto &aggregate = aggregates[aggr_idx];
150
0
    auto input_count = (idx_t)aggregate.child_count;
151
0
    if (aggregate.filter) {
152
0
      RowOperations::UpdateFilteredStates(row_state, filter_set.GetFilterData(aggr_idx), aggregate, addresses,
153
0
                                          payload, payload_idx);
154
0
    } else {
155
0
      RowOperations::UpdateStates(row_state, aggregate, addresses, payload, payload_idx, payload.size());
156
0
    }
157
    // move to the next aggregate
158
0
    payload_idx += input_count;
159
0
    VectorOperations::AddInPlace(addresses, UnsafeNumericCast<int64_t>(aggregate.payload_size), payload.size());
160
0
  }
161
0
}
162
163
0
void PerfectAggregateHashTable::Combine(PerfectAggregateHashTable &other) {
164
0
  D_ASSERT(total_groups == other.total_groups);
165
0
  D_ASSERT(tuple_size == other.tuple_size);
166
167
0
  Vector source_addresses(LogicalType::POINTER);
168
0
  Vector target_addresses(LogicalType::POINTER);
169
0
  auto source_addresses_ptr = FlatVector::GetData<data_ptr_t>(source_addresses);
170
0
  auto target_addresses_ptr = FlatVector::GetData<data_ptr_t>(target_addresses);
171
172
  // iterate over all entries of both hash tables and call combine for all entries that can be combined
173
0
  data_ptr_t source_ptr = other.data;
174
0
  data_ptr_t target_ptr = data;
175
0
  idx_t combine_count = 0;
176
0
  RowOperationsState row_state(*aggregate_allocator);
177
0
  for (idx_t i = 0; i < total_groups; i++) {
178
0
    auto has_entry_source = other.group_is_set[i];
179
    // we only have any work to do if the source has an entry for this group
180
0
    if (has_entry_source) {
181
0
      group_is_set[i] = true;
182
0
      source_addresses_ptr[combine_count] = source_ptr;
183
0
      target_addresses_ptr[combine_count] = target_ptr;
184
0
      combine_count++;
185
0
      if (combine_count == STANDARD_VECTOR_SIZE) {
186
0
        RowOperations::CombineStates(row_state, *layout_ptr, source_addresses, target_addresses, combine_count);
187
0
        combine_count = 0;
188
0
      }
189
0
    }
190
0
    source_ptr += tuple_size;
191
0
    target_ptr += tuple_size;
192
0
  }
193
0
  RowOperations::CombineStates(row_state, *layout_ptr, source_addresses, target_addresses, combine_count);
194
195
  // FIXME: after moving the arena allocator, we currently have to ensure that the pointer is not nullptr, because the
196
  // FIXME: Destroy()-function of the hash table expects an allocator in some cases (e.g., for sorted aggregates)
197
0
  stored_allocators.push_back(std::move(other.aggregate_allocator));
198
0
  other.aggregate_allocator = make_uniq<ArenaAllocator>(allocator);
199
0
}
200
201
template <class T>
202
static void ReconstructGroupVectorTemplated(uint32_t group_values[], Value &min, idx_t mask, idx_t shift,
203
0
                                            idx_t entry_count, Vector &result) {
204
0
  auto data = FlatVector::GetData<T>(result);
205
0
  auto &validity_mask = FlatVector::Validity(result);
206
0
  auto min_data = min.GetValueUnsafe<T>();
207
0
  for (idx_t i = 0; i < entry_count; i++) {
208
    // extract the value of this group from the total group index
209
0
    auto group_index = UnsafeNumericCast<int32_t>((group_values[i] >> shift) & mask);
210
0
    if (group_index == 0) {
211
      // if it is 0, the value is NULL
212
0
      validity_mask.SetInvalid(i);
213
0
    } else {
214
      // otherwise we add the value (minus 1) to the min value
215
0
      data[i] = UnsafeNumericCast<T>(UnsafeNumericCast<int64_t>(min_data) +
216
0
                                     UnsafeNumericCast<int64_t>(group_index) - 1);
217
0
    }
218
0
  }
219
0
}
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<signed char>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<short>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<int>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<long>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<unsigned char>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<unsigned short>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<unsigned int>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&)
Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<unsigned long>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&)
220
221
static void ReconstructGroupVector(uint32_t group_values[], Value &min, idx_t required_bits, idx_t shift,
222
0
                                   idx_t entry_count, Vector &result) {
223
  // construct the mask for this entry
224
0
  idx_t mask = ((uint64_t)1 << required_bits) - 1;
225
0
  switch (result.GetType().InternalType()) {
226
0
  case PhysicalType::INT8:
227
0
    ReconstructGroupVectorTemplated<int8_t>(group_values, min, mask, shift, entry_count, result);
228
0
    break;
229
0
  case PhysicalType::INT16:
230
0
    ReconstructGroupVectorTemplated<int16_t>(group_values, min, mask, shift, entry_count, result);
231
0
    break;
232
0
  case PhysicalType::INT32:
233
0
    ReconstructGroupVectorTemplated<int32_t>(group_values, min, mask, shift, entry_count, result);
234
0
    break;
235
0
  case PhysicalType::INT64:
236
0
    ReconstructGroupVectorTemplated<int64_t>(group_values, min, mask, shift, entry_count, result);
237
0
    break;
238
0
  case PhysicalType::UINT8:
239
0
    ReconstructGroupVectorTemplated<uint8_t>(group_values, min, mask, shift, entry_count, result);
240
0
    break;
241
0
  case PhysicalType::UINT16:
242
0
    ReconstructGroupVectorTemplated<uint16_t>(group_values, min, mask, shift, entry_count, result);
243
0
    break;
244
0
  case PhysicalType::UINT32:
245
0
    ReconstructGroupVectorTemplated<uint32_t>(group_values, min, mask, shift, entry_count, result);
246
0
    break;
247
0
  case PhysicalType::UINT64:
248
0
    ReconstructGroupVectorTemplated<uint64_t>(group_values, min, mask, shift, entry_count, result);
249
0
    break;
250
0
  default:
251
0
    throw InternalException("Invalid type for perfect aggregate HT group");
252
0
  }
253
0
}
254
255
0
void PerfectAggregateHashTable::Scan(idx_t &scan_position, DataChunk &result) {
256
0
  auto data_pointers = FlatVector::GetData<data_ptr_t>(addresses);
257
0
  uint32_t group_values[STANDARD_VECTOR_SIZE];
258
259
  // iterate over the HT until we either have exhausted the entire HT, or
260
0
  idx_t entry_count = 0;
261
0
  for (; scan_position < total_groups; scan_position++) {
262
0
    if (group_is_set[scan_position]) {
263
      // this group is set: add it to the set of groups to extract
264
0
      data_pointers[entry_count] = data + tuple_size * scan_position;
265
0
      group_values[entry_count] = NumericCast<uint32_t>(scan_position);
266
0
      entry_count++;
267
0
      if (entry_count == STANDARD_VECTOR_SIZE) {
268
0
        scan_position++;
269
0
        break;
270
0
      }
271
0
    }
272
0
  }
273
0
  if (entry_count == 0) {
274
    // no entries found
275
0
    return;
276
0
  }
277
  // first reconstruct the groups from the group index
278
0
  idx_t shift = total_required_bits;
279
0
  for (idx_t i = 0; i < grouping_columns; i++) {
280
0
    shift -= required_bits[i];
281
0
    ReconstructGroupVector(group_values, group_minima[i], required_bits[i], shift, entry_count, result.data[i]);
282
0
  }
283
  // then construct the payloads
284
0
  result.SetCardinality(entry_count);
285
0
  RowOperationsState row_state(*aggregate_allocator);
286
0
  RowOperations::FinalizeStates(row_state, *layout_ptr, addresses, result, grouping_columns);
287
0
}
288
289
0
void PerfectAggregateHashTable::Destroy() {
290
  // check if there is any destructor to call
291
0
  bool has_destructor = false;
292
0
  for (auto &aggr : layout_ptr->GetAggregates()) {
293
0
    if (aggr.function.destructor) {
294
0
      has_destructor = true;
295
0
    }
296
0
  }
297
0
  if (!has_destructor) {
298
0
    return;
299
0
  }
300
  // there are aggregates with destructors: loop over the hash table
301
  // and call the destructor method for each of the aggregates
302
0
  auto data_pointers = FlatVector::GetData<data_ptr_t>(addresses);
303
0
  idx_t count = 0;
304
305
  // iterate over all initialised slots of the hash table
306
0
  RowOperationsState row_state(*aggregate_allocator);
307
0
  data_ptr_t payload_ptr = data;
308
0
  for (idx_t i = 0; i < total_groups; i++) {
309
0
    data_pointers[count++] = payload_ptr;
310
0
    if (count == STANDARD_VECTOR_SIZE) {
311
0
      RowOperations::DestroyStates(row_state, *layout_ptr, addresses, count);
312
0
      count = 0;
313
0
    }
314
0
    payload_ptr += tuple_size;
315
0
  }
316
0
  RowOperations::DestroyStates(row_state, *layout_ptr, addresses, count);
317
0
}
318
319
} // namespace duckdb