/src/duckdb/src/execution/perfect_aggregate_hashtable.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | #include "duckdb/execution/perfect_aggregate_hashtable.hpp" |
2 | | |
3 | | #include "duckdb/common/numeric_utils.hpp" |
4 | | #include "duckdb/common/row_operations/row_operations.hpp" |
5 | | #include "duckdb/execution/expression_executor.hpp" |
6 | | |
7 | | namespace duckdb { |
8 | | |
9 | | PerfectAggregateHashTable::PerfectAggregateHashTable(ClientContext &context, Allocator &allocator, |
10 | | const vector<LogicalType> &group_types_p, |
11 | | vector<LogicalType> payload_types_p, |
12 | | vector<AggregateObject> aggregate_objects_p, |
13 | | vector<Value> group_minima_p, vector<idx_t> required_bits_p) |
14 | 0 | : BaseAggregateHashTable(context, allocator, aggregate_objects_p, std::move(payload_types_p)), |
15 | 0 | addresses(LogicalType::POINTER), required_bits(std::move(required_bits_p)), total_required_bits(0), |
16 | 0 | group_minima(std::move(group_minima_p)), sel(STANDARD_VECTOR_SIZE), |
17 | 0 | aggregate_allocator(make_uniq<ArenaAllocator>(allocator)) { |
18 | 0 | for (auto &group_bits : required_bits) { |
19 | 0 | total_required_bits += group_bits; |
20 | 0 | } |
21 | | // the total amount of groups we allocate space for is 2^required_bits |
22 | 0 | total_groups = (uint64_t)1 << total_required_bits; |
23 | | // we don't need to store the groups in a perfect hash table, since the group keys can be deduced by their location |
24 | 0 | grouping_columns = group_types_p.size(); |
25 | 0 | layout_ptr->Initialize(std::move(aggregate_objects_p)); |
26 | 0 | tuple_size = layout_ptr->GetRowWidth(); |
27 | | |
28 | | // allocate and null initialize the data |
29 | 0 | owned_data = make_unsafe_uniq_array_uninitialized<data_t>(tuple_size * total_groups); |
30 | 0 | data = owned_data.get(); |
31 | | |
32 | | // set up the empty payloads for every tuple, and initialize the "occupied" flag to false |
33 | 0 | group_is_set = make_unsafe_uniq_array_uninitialized<bool>(total_groups); |
34 | 0 | memset(group_is_set.get(), 0, total_groups * sizeof(bool)); |
35 | | |
36 | | // initialize the hash table for each entry |
37 | 0 | auto address_data = FlatVector::GetData<uintptr_t>(addresses); |
38 | 0 | idx_t init_count = 0; |
39 | 0 | for (idx_t i = 0; i < total_groups; i++) { |
40 | 0 | address_data[init_count] = uintptr_t(data) + (tuple_size * i); |
41 | 0 | init_count++; |
42 | 0 | if (init_count == STANDARD_VECTOR_SIZE) { |
43 | 0 | RowOperations::InitializeStates(*layout_ptr, addresses, *FlatVector::IncrementalSelectionVector(), |
44 | 0 | init_count); |
45 | 0 | init_count = 0; |
46 | 0 | } |
47 | 0 | } |
48 | 0 | RowOperations::InitializeStates(*layout_ptr, addresses, *FlatVector::IncrementalSelectionVector(), init_count); |
49 | 0 | } |
50 | | |
51 | 0 | PerfectAggregateHashTable::~PerfectAggregateHashTable() { |
52 | 0 | Destroy(); |
53 | 0 | } |
54 | | |
55 | | template <class T> |
56 | | static void ComputeGroupLocationTemplated(UnifiedVectorFormat &group_data, Value &min, uintptr_t *address_data, |
57 | 0 | idx_t current_shift, idx_t count) { |
58 | 0 | auto data = UnifiedVectorFormat::GetData<T>(group_data); |
59 | 0 | auto min_val = min.GetValueUnsafe<T>(); |
60 | 0 | if (!group_data.validity.AllValid()) { |
61 | 0 | for (idx_t i = 0; i < count; i++) { |
62 | 0 | auto index = group_data.sel->get_index(i); |
63 | | // check if the value is NULL |
64 | | // NULL groups are considered as "0" in the hash table |
65 | | // that is to say, they have no effect on the position of the element (because 0 << shift is 0) |
66 | | // we only need to handle non-null values here |
67 | 0 | if (group_data.validity.RowIsValid(index)) { |
68 | 0 | D_ASSERT(data[index] >= min_val); |
69 | 0 | auto adjusted_value = UnsafeNumericCast<uintptr_t>((data[index] - min_val) + 1); |
70 | 0 | address_data[i] += adjusted_value << current_shift; |
71 | 0 | } |
72 | 0 | } |
73 | 0 | } else { |
74 | | // no null values: we can directly compute the addresses |
75 | 0 | for (idx_t i = 0; i < count; i++) { |
76 | 0 | auto index = group_data.sel->get_index(i); |
77 | 0 | auto adjusted_value = UnsafeNumericCast<uintptr_t>((data[index] - min_val) + 1); |
78 | 0 | address_data[i] += adjusted_value << current_shift; |
79 | 0 | } |
80 | 0 | } |
81 | 0 | } Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<signed char>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<short>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<int>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<long>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<unsigned char>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<unsigned short>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<unsigned int>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ComputeGroupLocationTemplated<unsigned long>(duckdb::UnifiedVectorFormat&, duckdb::Value&, unsigned long*, unsigned long, unsigned long) |
82 | | |
83 | 0 | static void ComputeGroupLocation(Vector &group, Value &min, uintptr_t *address_data, idx_t current_shift, idx_t count) { |
84 | 0 | UnifiedVectorFormat vdata; |
85 | 0 | group.ToUnifiedFormat(count, vdata); |
86 | |
|
87 | 0 | switch (group.GetType().InternalType()) { |
88 | 0 | case PhysicalType::INT8: |
89 | 0 | ComputeGroupLocationTemplated<int8_t>(vdata, min, address_data, current_shift, count); |
90 | 0 | break; |
91 | 0 | case PhysicalType::INT16: |
92 | 0 | ComputeGroupLocationTemplated<int16_t>(vdata, min, address_data, current_shift, count); |
93 | 0 | break; |
94 | 0 | case PhysicalType::INT32: |
95 | 0 | ComputeGroupLocationTemplated<int32_t>(vdata, min, address_data, current_shift, count); |
96 | 0 | break; |
97 | 0 | case PhysicalType::INT64: |
98 | 0 | ComputeGroupLocationTemplated<int64_t>(vdata, min, address_data, current_shift, count); |
99 | 0 | break; |
100 | 0 | case PhysicalType::UINT8: |
101 | 0 | ComputeGroupLocationTemplated<uint8_t>(vdata, min, address_data, current_shift, count); |
102 | 0 | break; |
103 | 0 | case PhysicalType::UINT16: |
104 | 0 | ComputeGroupLocationTemplated<uint16_t>(vdata, min, address_data, current_shift, count); |
105 | 0 | break; |
106 | 0 | case PhysicalType::UINT32: |
107 | 0 | ComputeGroupLocationTemplated<uint32_t>(vdata, min, address_data, current_shift, count); |
108 | 0 | break; |
109 | 0 | case PhysicalType::UINT64: |
110 | 0 | ComputeGroupLocationTemplated<uint64_t>(vdata, min, address_data, current_shift, count); |
111 | 0 | break; |
112 | 0 | default: |
113 | 0 | throw InternalException("Unsupported group type for perfect aggregate hash table"); |
114 | 0 | } |
115 | 0 | } |
116 | | |
117 | 0 | void PerfectAggregateHashTable::AddChunk(DataChunk &groups, DataChunk &payload) { |
118 | | // first we need to find the location in the HT of each of the groups |
119 | 0 | auto address_data = FlatVector::GetData<uintptr_t>(addresses); |
120 | | // zero-initialize the address data |
121 | 0 | memset(address_data, 0, groups.size() * sizeof(uintptr_t)); |
122 | 0 | D_ASSERT(groups.ColumnCount() == group_minima.size()); |
123 | | |
124 | | // then compute the actual group location by iterating over each of the groups |
125 | 0 | idx_t current_shift = total_required_bits; |
126 | 0 | for (idx_t i = 0; i < groups.ColumnCount(); i++) { |
127 | 0 | current_shift -= required_bits[i]; |
128 | 0 | ComputeGroupLocation(groups.data[i], group_minima[i], address_data, current_shift, groups.size()); |
129 | 0 | } |
130 | | // now we have the HT entry number for every tuple |
131 | | // compute the actual pointer to the data by adding it to the base HT pointer and multiplying by the tuple size |
132 | 0 | for (idx_t i = 0; i < groups.size(); i++) { |
133 | 0 | const auto group = address_data[i]; |
134 | 0 | if (group >= total_groups) { |
135 | 0 | throw InvalidInputException("Perfect hash aggregate: aggregate group %llu exceeded total groups %llu. This " |
136 | 0 | "likely means that the statistics in your data source are corrupt.\n* PRAGMA " |
137 | 0 | "disable_optimizer to disable optimizations that rely on correct statistics", |
138 | 0 | group, total_groups); |
139 | 0 | } |
140 | 0 | group_is_set[group] = true; |
141 | 0 | address_data[i] = uintptr_t(data) + group * tuple_size; |
142 | 0 | } |
143 | | |
144 | | // after finding the group location we update the aggregates |
145 | 0 | idx_t payload_idx = 0; |
146 | 0 | auto &aggregates = layout_ptr->GetAggregates(); |
147 | 0 | RowOperationsState row_state(*aggregate_allocator); |
148 | 0 | for (idx_t aggr_idx = 0; aggr_idx < aggregates.size(); aggr_idx++) { |
149 | 0 | auto &aggregate = aggregates[aggr_idx]; |
150 | 0 | auto input_count = (idx_t)aggregate.child_count; |
151 | 0 | if (aggregate.filter) { |
152 | 0 | RowOperations::UpdateFilteredStates(row_state, filter_set.GetFilterData(aggr_idx), aggregate, addresses, |
153 | 0 | payload, payload_idx); |
154 | 0 | } else { |
155 | 0 | RowOperations::UpdateStates(row_state, aggregate, addresses, payload, payload_idx, payload.size()); |
156 | 0 | } |
157 | | // move to the next aggregate |
158 | 0 | payload_idx += input_count; |
159 | 0 | VectorOperations::AddInPlace(addresses, UnsafeNumericCast<int64_t>(aggregate.payload_size), payload.size()); |
160 | 0 | } |
161 | 0 | } |
162 | | |
163 | 0 | void PerfectAggregateHashTable::Combine(PerfectAggregateHashTable &other) { |
164 | 0 | D_ASSERT(total_groups == other.total_groups); |
165 | 0 | D_ASSERT(tuple_size == other.tuple_size); |
166 | |
|
167 | 0 | Vector source_addresses(LogicalType::POINTER); |
168 | 0 | Vector target_addresses(LogicalType::POINTER); |
169 | 0 | auto source_addresses_ptr = FlatVector::GetData<data_ptr_t>(source_addresses); |
170 | 0 | auto target_addresses_ptr = FlatVector::GetData<data_ptr_t>(target_addresses); |
171 | | |
172 | | // iterate over all entries of both hash tables and call combine for all entries that can be combined |
173 | 0 | data_ptr_t source_ptr = other.data; |
174 | 0 | data_ptr_t target_ptr = data; |
175 | 0 | idx_t combine_count = 0; |
176 | 0 | RowOperationsState row_state(*aggregate_allocator); |
177 | 0 | for (idx_t i = 0; i < total_groups; i++) { |
178 | 0 | auto has_entry_source = other.group_is_set[i]; |
179 | | // we only have any work to do if the source has an entry for this group |
180 | 0 | if (has_entry_source) { |
181 | 0 | group_is_set[i] = true; |
182 | 0 | source_addresses_ptr[combine_count] = source_ptr; |
183 | 0 | target_addresses_ptr[combine_count] = target_ptr; |
184 | 0 | combine_count++; |
185 | 0 | if (combine_count == STANDARD_VECTOR_SIZE) { |
186 | 0 | RowOperations::CombineStates(row_state, *layout_ptr, source_addresses, target_addresses, combine_count); |
187 | 0 | combine_count = 0; |
188 | 0 | } |
189 | 0 | } |
190 | 0 | source_ptr += tuple_size; |
191 | 0 | target_ptr += tuple_size; |
192 | 0 | } |
193 | 0 | RowOperations::CombineStates(row_state, *layout_ptr, source_addresses, target_addresses, combine_count); |
194 | | |
195 | | // FIXME: after moving the arena allocator, we currently have to ensure that the pointer is not nullptr, because the |
196 | | // FIXME: Destroy()-function of the hash table expects an allocator in some cases (e.g., for sorted aggregates) |
197 | 0 | stored_allocators.push_back(std::move(other.aggregate_allocator)); |
198 | 0 | other.aggregate_allocator = make_uniq<ArenaAllocator>(allocator); |
199 | 0 | } |
200 | | |
201 | | template <class T> |
202 | | static void ReconstructGroupVectorTemplated(uint32_t group_values[], Value &min, idx_t mask, idx_t shift, |
203 | 0 | idx_t entry_count, Vector &result) { |
204 | 0 | auto data = FlatVector::GetData<T>(result); |
205 | 0 | auto &validity_mask = FlatVector::Validity(result); |
206 | 0 | auto min_data = min.GetValueUnsafe<T>(); |
207 | 0 | for (idx_t i = 0; i < entry_count; i++) { |
208 | | // extract the value of this group from the total group index |
209 | 0 | auto group_index = UnsafeNumericCast<int32_t>((group_values[i] >> shift) & mask); |
210 | 0 | if (group_index == 0) { |
211 | | // if it is 0, the value is NULL |
212 | 0 | validity_mask.SetInvalid(i); |
213 | 0 | } else { |
214 | | // otherwise we add the value (minus 1) to the min value |
215 | 0 | data[i] = UnsafeNumericCast<T>(UnsafeNumericCast<int64_t>(min_data) + |
216 | 0 | UnsafeNumericCast<int64_t>(group_index) - 1); |
217 | 0 | } |
218 | 0 | } |
219 | 0 | } Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<signed char>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<short>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<int>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<long>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<unsigned char>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<unsigned short>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<unsigned int>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&) Unexecuted instantiation: ub_duckdb_execution.cpp:void duckdb::ReconstructGroupVectorTemplated<unsigned long>(unsigned int*, duckdb::Value&, unsigned long, unsigned long, unsigned long, duckdb::Vector&) |
220 | | |
221 | | static void ReconstructGroupVector(uint32_t group_values[], Value &min, idx_t required_bits, idx_t shift, |
222 | 0 | idx_t entry_count, Vector &result) { |
223 | | // construct the mask for this entry |
224 | 0 | idx_t mask = ((uint64_t)1 << required_bits) - 1; |
225 | 0 | switch (result.GetType().InternalType()) { |
226 | 0 | case PhysicalType::INT8: |
227 | 0 | ReconstructGroupVectorTemplated<int8_t>(group_values, min, mask, shift, entry_count, result); |
228 | 0 | break; |
229 | 0 | case PhysicalType::INT16: |
230 | 0 | ReconstructGroupVectorTemplated<int16_t>(group_values, min, mask, shift, entry_count, result); |
231 | 0 | break; |
232 | 0 | case PhysicalType::INT32: |
233 | 0 | ReconstructGroupVectorTemplated<int32_t>(group_values, min, mask, shift, entry_count, result); |
234 | 0 | break; |
235 | 0 | case PhysicalType::INT64: |
236 | 0 | ReconstructGroupVectorTemplated<int64_t>(group_values, min, mask, shift, entry_count, result); |
237 | 0 | break; |
238 | 0 | case PhysicalType::UINT8: |
239 | 0 | ReconstructGroupVectorTemplated<uint8_t>(group_values, min, mask, shift, entry_count, result); |
240 | 0 | break; |
241 | 0 | case PhysicalType::UINT16: |
242 | 0 | ReconstructGroupVectorTemplated<uint16_t>(group_values, min, mask, shift, entry_count, result); |
243 | 0 | break; |
244 | 0 | case PhysicalType::UINT32: |
245 | 0 | ReconstructGroupVectorTemplated<uint32_t>(group_values, min, mask, shift, entry_count, result); |
246 | 0 | break; |
247 | 0 | case PhysicalType::UINT64: |
248 | 0 | ReconstructGroupVectorTemplated<uint64_t>(group_values, min, mask, shift, entry_count, result); |
249 | 0 | break; |
250 | 0 | default: |
251 | 0 | throw InternalException("Invalid type for perfect aggregate HT group"); |
252 | 0 | } |
253 | 0 | } |
254 | | |
255 | 0 | void PerfectAggregateHashTable::Scan(idx_t &scan_position, DataChunk &result) { |
256 | 0 | auto data_pointers = FlatVector::GetData<data_ptr_t>(addresses); |
257 | 0 | uint32_t group_values[STANDARD_VECTOR_SIZE]; |
258 | | |
259 | | // iterate over the HT until we either have exhausted the entire HT, or |
260 | 0 | idx_t entry_count = 0; |
261 | 0 | for (; scan_position < total_groups; scan_position++) { |
262 | 0 | if (group_is_set[scan_position]) { |
263 | | // this group is set: add it to the set of groups to extract |
264 | 0 | data_pointers[entry_count] = data + tuple_size * scan_position; |
265 | 0 | group_values[entry_count] = NumericCast<uint32_t>(scan_position); |
266 | 0 | entry_count++; |
267 | 0 | if (entry_count == STANDARD_VECTOR_SIZE) { |
268 | 0 | scan_position++; |
269 | 0 | break; |
270 | 0 | } |
271 | 0 | } |
272 | 0 | } |
273 | 0 | if (entry_count == 0) { |
274 | | // no entries found |
275 | 0 | return; |
276 | 0 | } |
277 | | // first reconstruct the groups from the group index |
278 | 0 | idx_t shift = total_required_bits; |
279 | 0 | for (idx_t i = 0; i < grouping_columns; i++) { |
280 | 0 | shift -= required_bits[i]; |
281 | 0 | ReconstructGroupVector(group_values, group_minima[i], required_bits[i], shift, entry_count, result.data[i]); |
282 | 0 | } |
283 | | // then construct the payloads |
284 | 0 | result.SetCardinality(entry_count); |
285 | 0 | RowOperationsState row_state(*aggregate_allocator); |
286 | 0 | RowOperations::FinalizeStates(row_state, *layout_ptr, addresses, result, grouping_columns); |
287 | 0 | } |
288 | | |
289 | 0 | void PerfectAggregateHashTable::Destroy() { |
290 | | // check if there is any destructor to call |
291 | 0 | bool has_destructor = false; |
292 | 0 | for (auto &aggr : layout_ptr->GetAggregates()) { |
293 | 0 | if (aggr.function.destructor) { |
294 | 0 | has_destructor = true; |
295 | 0 | } |
296 | 0 | } |
297 | 0 | if (!has_destructor) { |
298 | 0 | return; |
299 | 0 | } |
300 | | // there are aggregates with destructors: loop over the hash table |
301 | | // and call the destructor method for each of the aggregates |
302 | 0 | auto data_pointers = FlatVector::GetData<data_ptr_t>(addresses); |
303 | 0 | idx_t count = 0; |
304 | | |
305 | | // iterate over all initialised slots of the hash table |
306 | 0 | RowOperationsState row_state(*aggregate_allocator); |
307 | 0 | data_ptr_t payload_ptr = data; |
308 | 0 | for (idx_t i = 0; i < total_groups; i++) { |
309 | 0 | data_pointers[count++] = payload_ptr; |
310 | 0 | if (count == STANDARD_VECTOR_SIZE) { |
311 | 0 | RowOperations::DestroyStates(row_state, *layout_ptr, addresses, count); |
312 | 0 | count = 0; |
313 | 0 | } |
314 | 0 | payload_ptr += tuple_size; |
315 | 0 | } |
316 | 0 | RowOperations::DestroyStates(row_state, *layout_ptr, addresses, count); |
317 | 0 | } |
318 | | |
319 | | } // namespace duckdb |