/src/duckdb/extension/parquet/parquet_statistics.cpp
Line | Count | Source |
1 | | #include "parquet_statistics.hpp" |
2 | | |
3 | | #include "duckdb.hpp" |
4 | | #include "parquet_decimal_utils.hpp" |
5 | | #include "parquet_timestamp.hpp" |
6 | | #include "parquet_float16.hpp" |
7 | | #include "parquet_reader.hpp" |
8 | | #include "reader/string_column_reader.hpp" |
9 | | #include "reader/struct_column_reader.hpp" |
10 | | #include "zstd/common/xxhash.hpp" |
11 | | #include "duckdb/common/types/blob.hpp" |
12 | | #include "duckdb/common/types/time.hpp" |
13 | | #include "duckdb/common/types/value.hpp" |
14 | | #include "duckdb/storage/statistics/struct_stats.hpp" |
15 | | #include "duckdb/planner/filter/constant_filter.hpp" |
16 | | #include "reader/uuid_column_reader.hpp" |
17 | | |
18 | | namespace duckdb { |
19 | | |
20 | | using duckdb_parquet::ConvertedType; |
21 | | using duckdb_parquet::Type; |
22 | | |
23 | | unique_ptr<BaseStatistics> ParquetStatisticsUtils::CreateNumericStats(const LogicalType &type, |
24 | | const ParquetColumnSchema &schema_ele, |
25 | 0 | const duckdb_parquet::Statistics &parquet_stats) { |
26 | 0 | auto stats = NumericStats::CreateUnknown(type); |
27 | | |
28 | | // for reasons unknown to science, Parquet defines *both* `min` and `min_value` as well as `max` and |
29 | | // `max_value`. All are optional. such elegance. |
30 | 0 | Value min; |
31 | 0 | Value max; |
32 | 0 | if (parquet_stats.__isset.min_value) { |
33 | 0 | min = ParquetStatisticsUtils::ConvertValue(type, schema_ele, parquet_stats.min_value); |
34 | 0 | } else if (parquet_stats.__isset.min) { |
35 | 0 | min = ParquetStatisticsUtils::ConvertValue(type, schema_ele, parquet_stats.min); |
36 | 0 | } else { |
37 | 0 | min = Value(type); |
38 | 0 | } |
39 | 0 | if (parquet_stats.__isset.max_value) { |
40 | 0 | max = ParquetStatisticsUtils::ConvertValue(type, schema_ele, parquet_stats.max_value); |
41 | 0 | } else if (parquet_stats.__isset.max) { |
42 | 0 | max = ParquetStatisticsUtils::ConvertValue(type, schema_ele, parquet_stats.max); |
43 | 0 | } else { |
44 | 0 | max = Value(type); |
45 | 0 | } |
46 | 0 | NumericStats::SetMin(stats, min); |
47 | 0 | NumericStats::SetMax(stats, max); |
48 | 0 | return stats.ToUnique(); |
49 | 0 | } |
50 | | |
51 | | static unique_ptr<BaseStatistics> CreateFloatingPointStats(const LogicalType &type, |
52 | | const ParquetColumnSchema &schema_ele, |
53 | 0 | const duckdb_parquet::Statistics &parquet_stats) { |
54 | 0 | auto stats = NumericStats::CreateUnknown(type); |
55 | | |
56 | | // floating point values can always have NaN values - hence we cannot use the max value from the file |
57 | 0 | Value min; |
58 | 0 | Value max; |
59 | 0 | if (parquet_stats.__isset.min_value) { |
60 | 0 | min = ParquetStatisticsUtils::ConvertValue(type, schema_ele, parquet_stats.min_value); |
61 | 0 | } else if (parquet_stats.__isset.min) { |
62 | 0 | min = ParquetStatisticsUtils::ConvertValue(type, schema_ele, parquet_stats.min); |
63 | 0 | } else { |
64 | 0 | min = Value(type); |
65 | 0 | } |
66 | 0 | max = Value("nan").DefaultCastAs(type); |
67 | 0 | NumericStats::SetMin(stats, min); |
68 | 0 | NumericStats::SetMax(stats, max); |
69 | 0 | return stats.ToUnique(); |
70 | 0 | } |
71 | | |
72 | | Value ParquetStatisticsUtils::ConvertValue(const LogicalType &type, const ParquetColumnSchema &schema_ele, |
73 | 0 | const std::string &stats) { |
74 | 0 | Value result; |
75 | 0 | string error; |
76 | 0 | auto stats_val = ConvertValueInternal(type, schema_ele, stats); |
77 | 0 | if (!stats_val.DefaultTryCastAs(type, result, &error)) { |
78 | 0 | return Value(type); |
79 | 0 | } |
80 | 0 | return result; |
81 | 0 | } |
82 | | Value ParquetStatisticsUtils::ConvertValueInternal(const LogicalType &type, const ParquetColumnSchema &schema_ele, |
83 | 0 | const std::string &stats) { |
84 | 0 | auto stats_data = const_data_ptr_cast(stats.c_str()); |
85 | 0 | switch (type.id()) { |
86 | 0 | case LogicalTypeId::BOOLEAN: { |
87 | 0 | if (stats.size() != sizeof(bool)) { |
88 | 0 | throw InvalidInputException("Incorrect stats size for type BOOLEAN"); |
89 | 0 | } |
90 | 0 | return Value::BOOLEAN(Load<bool>(stats_data)); |
91 | 0 | } |
92 | 0 | case LogicalTypeId::UTINYINT: |
93 | 0 | case LogicalTypeId::USMALLINT: |
94 | 0 | case LogicalTypeId::UINTEGER: |
95 | 0 | if (stats.size() != sizeof(uint32_t)) { |
96 | 0 | throw InvalidInputException("Incorrect stats size for type UINTEGER"); |
97 | 0 | } |
98 | 0 | return Value::UINTEGER(Load<uint32_t>(stats_data)); |
99 | 0 | case LogicalTypeId::UBIGINT: |
100 | 0 | if (stats.size() != sizeof(uint64_t)) { |
101 | 0 | throw InvalidInputException("Incorrect stats size for type UBIGINT"); |
102 | 0 | } |
103 | 0 | return Value::UBIGINT(Load<uint64_t>(stats_data)); |
104 | 0 | case LogicalTypeId::TINYINT: |
105 | 0 | case LogicalTypeId::SMALLINT: |
106 | 0 | case LogicalTypeId::INTEGER: |
107 | 0 | if (stats.size() != sizeof(int32_t)) { |
108 | 0 | throw InvalidInputException("Incorrect stats size for type INTEGER"); |
109 | 0 | } |
110 | 0 | return Value::INTEGER(Load<int32_t>(stats_data)); |
111 | 0 | case LogicalTypeId::BIGINT: |
112 | 0 | if (stats.size() != sizeof(int64_t)) { |
113 | 0 | throw InvalidInputException("Incorrect stats size for type BIGINT"); |
114 | 0 | } |
115 | 0 | return Value::BIGINT(Load<int64_t>(stats_data)); |
116 | 0 | case LogicalTypeId::FLOAT: { |
117 | 0 | float val; |
118 | 0 | if (schema_ele.type_info == ParquetExtraTypeInfo::FLOAT16) { |
119 | 0 | if (stats.size() != sizeof(uint16_t)) { |
120 | 0 | throw InvalidInputException("Incorrect stats size for type FLOAT16"); |
121 | 0 | } |
122 | 0 | val = Float16ToFloat32(Load<uint16_t>(stats_data)); |
123 | 0 | } else { |
124 | 0 | if (stats.size() != sizeof(float)) { |
125 | 0 | throw InvalidInputException("Incorrect stats size for type FLOAT"); |
126 | 0 | } |
127 | 0 | val = Load<float>(stats_data); |
128 | 0 | } |
129 | 0 | if (!Value::FloatIsFinite(val)) { |
130 | 0 | return Value(); |
131 | 0 | } |
132 | 0 | return Value::FLOAT(val); |
133 | 0 | } |
134 | 0 | case LogicalTypeId::DOUBLE: { |
135 | 0 | if (schema_ele.type_info == ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY) { |
136 | | // decimals cast to double |
137 | 0 | return Value::DOUBLE(ParquetDecimalUtils::ReadDecimalValue<double>(stats_data, stats.size(), schema_ele)); |
138 | 0 | } |
139 | 0 | if (stats.size() != sizeof(double)) { |
140 | 0 | throw InvalidInputException("Incorrect stats size for type DOUBLE"); |
141 | 0 | } |
142 | 0 | auto val = Load<double>(stats_data); |
143 | 0 | if (!Value::DoubleIsFinite(val)) { |
144 | 0 | return Value(); |
145 | 0 | } |
146 | 0 | return Value::DOUBLE(val); |
147 | 0 | } |
148 | 0 | case LogicalTypeId::DECIMAL: { |
149 | 0 | auto width = DecimalType::GetWidth(type); |
150 | 0 | auto scale = DecimalType::GetScale(type); |
151 | 0 | switch (schema_ele.type_info) { |
152 | 0 | case ParquetExtraTypeInfo::DECIMAL_INT32: |
153 | 0 | if (stats.size() != sizeof(int32_t)) { |
154 | 0 | throw InvalidInputException("Incorrect stats size for type %s", type.ToString()); |
155 | 0 | } |
156 | 0 | return Value::DECIMAL(Load<int32_t>(stats_data), width, scale); |
157 | 0 | case ParquetExtraTypeInfo::DECIMAL_INT64: |
158 | 0 | if (stats.size() != sizeof(int64_t)) { |
159 | 0 | throw InvalidInputException("Incorrect stats size for type %s", type.ToString()); |
160 | 0 | } |
161 | 0 | return Value::DECIMAL(Load<int64_t>(stats_data), width, scale); |
162 | 0 | case ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY: |
163 | 0 | switch (type.InternalType()) { |
164 | 0 | case PhysicalType::INT16: |
165 | 0 | return Value::DECIMAL( |
166 | 0 | ParquetDecimalUtils::ReadDecimalValue<int16_t>(stats_data, stats.size(), schema_ele), width, scale); |
167 | 0 | case PhysicalType::INT32: |
168 | 0 | return Value::DECIMAL( |
169 | 0 | ParquetDecimalUtils::ReadDecimalValue<int32_t>(stats_data, stats.size(), schema_ele), width, scale); |
170 | 0 | case PhysicalType::INT64: |
171 | 0 | return Value::DECIMAL( |
172 | 0 | ParquetDecimalUtils::ReadDecimalValue<int64_t>(stats_data, stats.size(), schema_ele), width, scale); |
173 | 0 | case PhysicalType::INT128: |
174 | 0 | return Value::DECIMAL( |
175 | 0 | ParquetDecimalUtils::ReadDecimalValue<hugeint_t>(stats_data, stats.size(), schema_ele), width, |
176 | 0 | scale); |
177 | 0 | default: |
178 | 0 | throw InvalidInputException("Unsupported internal type for decimal"); |
179 | 0 | } |
180 | 0 | default: |
181 | 0 | throw NotImplementedException("Unrecognized Parquet type for Decimal"); |
182 | 0 | } |
183 | 0 | } |
184 | 0 | case LogicalTypeId::VARCHAR: |
185 | 0 | case LogicalTypeId::BLOB: |
186 | 0 | if (type.id() == LogicalTypeId::BLOB || !Value::StringIsValid(stats)) { |
187 | 0 | return Value(Blob::ToString(string_t(stats))); |
188 | 0 | } |
189 | 0 | return Value(stats); |
190 | 0 | case LogicalTypeId::DATE: |
191 | 0 | if (stats.size() != sizeof(int32_t)) { |
192 | 0 | throw InvalidInputException("Incorrect stats size for type DATE"); |
193 | 0 | } |
194 | 0 | return Value::DATE(date_t(Load<int32_t>(stats_data))); |
195 | 0 | case LogicalTypeId::TIME: { |
196 | 0 | int64_t val; |
197 | 0 | if (stats.size() == sizeof(int32_t)) { |
198 | 0 | val = Load<int32_t>(stats_data); |
199 | 0 | } else if (stats.size() == sizeof(int64_t)) { |
200 | 0 | val = Load<int64_t>(stats_data); |
201 | 0 | } else { |
202 | 0 | throw InvalidInputException("Incorrect stats size for type TIME"); |
203 | 0 | } |
204 | 0 | switch (schema_ele.type_info) { |
205 | 0 | case ParquetExtraTypeInfo::UNIT_MS: |
206 | 0 | return Value::TIME(Time::FromTimeMs(val)); |
207 | 0 | case ParquetExtraTypeInfo::UNIT_NS: |
208 | 0 | return Value::TIME(Time::FromTimeNs(val)); |
209 | 0 | case ParquetExtraTypeInfo::UNIT_MICROS: |
210 | 0 | default: |
211 | 0 | return Value::TIME(dtime_t(val)); |
212 | 0 | } |
213 | 0 | } |
214 | 0 | case LogicalTypeId::TIME_NS: { |
215 | 0 | int64_t val; |
216 | 0 | if (stats.size() == sizeof(int32_t)) { |
217 | 0 | val = Load<int32_t>(stats_data); |
218 | 0 | } else if (stats.size() == sizeof(int64_t)) { |
219 | 0 | val = Load<int64_t>(stats_data); |
220 | 0 | } else { |
221 | 0 | throw InvalidInputException("Incorrect stats size for type TIME"); |
222 | 0 | } |
223 | 0 | switch (schema_ele.type_info) { |
224 | 0 | case ParquetExtraTypeInfo::UNIT_MS: |
225 | 0 | return Value::TIME_NS(ParquetMsIntToTimeNs(val)); |
226 | 0 | case ParquetExtraTypeInfo::UNIT_NS: |
227 | 0 | return Value::TIME_NS(ParquetIntToTimeNs(val)); |
228 | 0 | case ParquetExtraTypeInfo::UNIT_MICROS: |
229 | 0 | default: |
230 | 0 | return Value::TIME_NS(dtime_ns_t(val)); |
231 | 0 | } |
232 | 0 | } |
233 | 0 | case LogicalTypeId::TIME_TZ: { |
234 | 0 | int64_t val; |
235 | 0 | if (stats.size() == sizeof(int32_t)) { |
236 | 0 | val = Load<int32_t>(stats_data); |
237 | 0 | } else if (stats.size() == sizeof(int64_t)) { |
238 | 0 | val = Load<int64_t>(stats_data); |
239 | 0 | } else { |
240 | 0 | throw InvalidInputException("Incorrect stats size for type TIMETZ"); |
241 | 0 | } |
242 | 0 | switch (schema_ele.type_info) { |
243 | 0 | case ParquetExtraTypeInfo::UNIT_MS: |
244 | 0 | return Value::TIMETZ(ParquetIntToTimeMsTZ(NumericCast<int32_t>(val))); |
245 | 0 | case ParquetExtraTypeInfo::UNIT_NS: |
246 | 0 | return Value::TIMETZ(ParquetIntToTimeNsTZ(val)); |
247 | 0 | case ParquetExtraTypeInfo::UNIT_MICROS: |
248 | 0 | default: |
249 | 0 | return Value::TIMETZ(ParquetIntToTimeTZ(val)); |
250 | 0 | } |
251 | 0 | } |
252 | 0 | case LogicalTypeId::TIMESTAMP: |
253 | 0 | case LogicalTypeId::TIMESTAMP_TZ: { |
254 | 0 | timestamp_t timestamp_value; |
255 | 0 | if (schema_ele.type_info == ParquetExtraTypeInfo::IMPALA_TIMESTAMP) { |
256 | 0 | if (stats.size() != sizeof(Int96)) { |
257 | 0 | throw InvalidInputException("Incorrect stats size for type TIMESTAMP"); |
258 | 0 | } |
259 | 0 | timestamp_value = ImpalaTimestampToTimestamp(Load<Int96>(stats_data)); |
260 | 0 | } else { |
261 | 0 | if (stats.size() != sizeof(int64_t)) { |
262 | 0 | throw InvalidInputException("Incorrect stats size for type TIMESTAMP"); |
263 | 0 | } |
264 | 0 | auto val = Load<int64_t>(stats_data); |
265 | 0 | switch (schema_ele.type_info) { |
266 | 0 | case ParquetExtraTypeInfo::UNIT_MS: |
267 | 0 | timestamp_value = Timestamp::FromEpochMs(val); |
268 | 0 | break; |
269 | 0 | case ParquetExtraTypeInfo::UNIT_NS: |
270 | 0 | timestamp_value = Timestamp::FromEpochNanoSeconds(val); |
271 | 0 | break; |
272 | 0 | case ParquetExtraTypeInfo::UNIT_MICROS: |
273 | 0 | default: |
274 | 0 | timestamp_value = timestamp_t(val); |
275 | 0 | break; |
276 | 0 | } |
277 | 0 | } |
278 | 0 | if (type.id() == LogicalTypeId::TIMESTAMP_TZ) { |
279 | 0 | return Value::TIMESTAMPTZ(timestamp_tz_t(timestamp_value)); |
280 | 0 | } |
281 | 0 | return Value::TIMESTAMP(timestamp_value); |
282 | 0 | } |
283 | 0 | case LogicalTypeId::TIMESTAMP_NS: { |
284 | 0 | timestamp_ns_t timestamp_value; |
285 | 0 | if (schema_ele.type_info == ParquetExtraTypeInfo::IMPALA_TIMESTAMP) { |
286 | 0 | if (stats.size() != sizeof(Int96)) { |
287 | 0 | throw InvalidInputException("Incorrect stats size for type TIMESTAMP_NS"); |
288 | 0 | } |
289 | 0 | timestamp_value = ImpalaTimestampToTimestampNS(Load<Int96>(stats_data)); |
290 | 0 | } else { |
291 | 0 | if (stats.size() != sizeof(int64_t)) { |
292 | 0 | throw InvalidInputException("Incorrect stats size for type TIMESTAMP_NS"); |
293 | 0 | } |
294 | 0 | auto val = Load<int64_t>(stats_data); |
295 | 0 | switch (schema_ele.type_info) { |
296 | 0 | case ParquetExtraTypeInfo::UNIT_MS: |
297 | 0 | timestamp_value = ParquetTimestampMsToTimestampNs(val); |
298 | 0 | break; |
299 | 0 | case ParquetExtraTypeInfo::UNIT_NS: |
300 | 0 | timestamp_value = ParquetTimestampNsToTimestampNs(val); |
301 | 0 | break; |
302 | 0 | case ParquetExtraTypeInfo::UNIT_MICROS: |
303 | 0 | default: |
304 | 0 | timestamp_value = ParquetTimestampUsToTimestampNs(val); |
305 | 0 | break; |
306 | 0 | } |
307 | 0 | } |
308 | 0 | return Value::TIMESTAMPNS(timestamp_value); |
309 | 0 | } |
310 | 0 | case LogicalTypeId::UUID: { |
311 | 0 | if (stats.size() != 16) { |
312 | 0 | throw InvalidInputException("Incorrect stats size for type UUID"); |
313 | 0 | } |
314 | 0 | auto uuid_val = UUIDValueConversion::ReadParquetUUID(const_data_ptr_cast(stats.c_str())); |
315 | 0 | return Value::UUID(uuid_val); |
316 | 0 | } |
317 | 0 | default: |
318 | 0 | throw InternalException("Unsupported type for stats %s", type.ToString()); |
319 | 0 | } |
320 | 0 | } |
321 | | |
322 | | unique_ptr<BaseStatistics> ParquetStatisticsUtils::TransformColumnStatistics(const ParquetColumnSchema &schema, |
323 | | const vector<ColumnChunk> &columns, |
324 | 0 | bool can_have_nan) { |
325 | | // Not supported types |
326 | 0 | auto &type = schema.type; |
327 | 0 | if (type.id() == LogicalTypeId::ARRAY || type.id() == LogicalTypeId::MAP || type.id() == LogicalTypeId::LIST) { |
328 | 0 | return nullptr; |
329 | 0 | } |
330 | | |
331 | 0 | unique_ptr<BaseStatistics> row_group_stats; |
332 | | |
333 | | // Structs are handled differently (they dont have stats) |
334 | 0 | if (type.id() == LogicalTypeId::STRUCT) { |
335 | 0 | auto struct_stats = StructStats::CreateUnknown(type); |
336 | | // Recurse into child readers |
337 | 0 | for (idx_t i = 0; i < schema.children.size(); i++) { |
338 | 0 | auto &child_schema = schema.children[i]; |
339 | 0 | auto child_stats = ParquetStatisticsUtils::TransformColumnStatistics(child_schema, columns, can_have_nan); |
340 | 0 | StructStats::SetChildStats(struct_stats, i, std::move(child_stats)); |
341 | 0 | } |
342 | 0 | row_group_stats = struct_stats.ToUnique(); |
343 | | |
344 | | // null count is generic |
345 | 0 | if (row_group_stats) { |
346 | 0 | row_group_stats->Set(StatsInfo::CAN_HAVE_NULL_AND_VALID_VALUES); |
347 | 0 | } |
348 | 0 | return row_group_stats; |
349 | 0 | } else if (schema.schema_type == ParquetColumnSchemaType::VARIANT) { |
350 | | //! FIXME: there are situations where VARIANT columns can have stats |
351 | 0 | return nullptr; |
352 | 0 | } |
353 | | |
354 | | // Otherwise, its a standard column with stats |
355 | | |
356 | 0 | auto &column_chunk = columns[schema.column_index]; |
357 | 0 | if (!column_chunk.__isset.meta_data || !column_chunk.meta_data.__isset.statistics) { |
358 | | // no stats present for row group |
359 | 0 | return nullptr; |
360 | 0 | } |
361 | 0 | auto &parquet_stats = column_chunk.meta_data.statistics; |
362 | |
|
363 | 0 | switch (type.id()) { |
364 | 0 | case LogicalTypeId::UTINYINT: |
365 | 0 | case LogicalTypeId::USMALLINT: |
366 | 0 | case LogicalTypeId::UINTEGER: |
367 | 0 | case LogicalTypeId::UBIGINT: |
368 | 0 | case LogicalTypeId::TINYINT: |
369 | 0 | case LogicalTypeId::SMALLINT: |
370 | 0 | case LogicalTypeId::INTEGER: |
371 | 0 | case LogicalTypeId::BIGINT: |
372 | 0 | case LogicalTypeId::DATE: |
373 | 0 | case LogicalTypeId::TIME: |
374 | 0 | case LogicalTypeId::TIME_TZ: |
375 | 0 | case LogicalTypeId::TIMESTAMP: |
376 | 0 | case LogicalTypeId::TIMESTAMP_TZ: |
377 | 0 | case LogicalTypeId::TIMESTAMP_SEC: |
378 | 0 | case LogicalTypeId::TIMESTAMP_MS: |
379 | 0 | case LogicalTypeId::TIMESTAMP_NS: |
380 | 0 | case LogicalTypeId::DECIMAL: |
381 | 0 | row_group_stats = CreateNumericStats(type, schema, parquet_stats); |
382 | 0 | break; |
383 | 0 | case LogicalTypeId::FLOAT: |
384 | 0 | case LogicalTypeId::DOUBLE: |
385 | 0 | if (can_have_nan) { |
386 | | // Since parquet doesn't tell us if the column has NaN values, if the user has explicitly declared that it |
387 | | // does, we create stats without an upper max value, as NaN compares larger than anything else. |
388 | 0 | row_group_stats = CreateFloatingPointStats(type, schema, parquet_stats); |
389 | 0 | } else { |
390 | | // Otherwise we use the numeric stats as usual, which might lead to "wrong" pruning if the column contains |
391 | | // NaN values. The parquet spec is not clear on how to handle NaN values in statistics, and so this is |
392 | | // probably the best we can do for now. |
393 | 0 | row_group_stats = CreateNumericStats(type, schema, parquet_stats); |
394 | 0 | } |
395 | 0 | break; |
396 | 0 | case LogicalTypeId::VARCHAR: { |
397 | 0 | auto string_stats = StringStats::CreateUnknown(type); |
398 | 0 | if (parquet_stats.__isset.min_value) { |
399 | 0 | StringColumnReader::VerifyString(parquet_stats.min_value.c_str(), parquet_stats.min_value.size(), true); |
400 | 0 | StringStats::SetMin(string_stats, parquet_stats.min_value); |
401 | 0 | } else if (parquet_stats.__isset.min) { |
402 | 0 | StringColumnReader::VerifyString(parquet_stats.min.c_str(), parquet_stats.min.size(), true); |
403 | 0 | StringStats::SetMin(string_stats, parquet_stats.min); |
404 | 0 | } |
405 | 0 | if (parquet_stats.__isset.max_value) { |
406 | 0 | StringColumnReader::VerifyString(parquet_stats.max_value.c_str(), parquet_stats.max_value.size(), true); |
407 | 0 | StringStats::SetMax(string_stats, parquet_stats.max_value); |
408 | 0 | } else if (parquet_stats.__isset.max) { |
409 | 0 | StringColumnReader::VerifyString(parquet_stats.max.c_str(), parquet_stats.max.size(), true); |
410 | 0 | StringStats::SetMax(string_stats, parquet_stats.max); |
411 | 0 | } |
412 | 0 | row_group_stats = string_stats.ToUnique(); |
413 | 0 | break; |
414 | 0 | } |
415 | 0 | case LogicalTypeId::GEOMETRY: { |
416 | 0 | auto geo_stats = GeometryStats::CreateUnknown(type); |
417 | 0 | if (column_chunk.meta_data.__isset.geospatial_statistics) { |
418 | 0 | if (column_chunk.meta_data.geospatial_statistics.__isset.bbox) { |
419 | 0 | auto &bbox = column_chunk.meta_data.geospatial_statistics.bbox; |
420 | 0 | auto &stats_bbox = GeometryStats::GetExtent(geo_stats); |
421 | | |
422 | | // xmin > xmax is allowed if the geometry crosses the antimeridian, |
423 | | // but we don't handle this right now |
424 | 0 | if (bbox.xmin <= bbox.xmax) { |
425 | 0 | stats_bbox.x_min = bbox.xmin; |
426 | 0 | stats_bbox.x_max = bbox.xmax; |
427 | 0 | } |
428 | |
|
429 | 0 | if (bbox.ymin <= bbox.ymax) { |
430 | 0 | stats_bbox.y_min = bbox.ymin; |
431 | 0 | stats_bbox.y_max = bbox.ymax; |
432 | 0 | } |
433 | |
|
434 | 0 | if (bbox.__isset.zmin && bbox.__isset.zmax && bbox.zmin <= bbox.zmax) { |
435 | 0 | stats_bbox.z_min = bbox.zmin; |
436 | 0 | stats_bbox.z_max = bbox.zmax; |
437 | 0 | } |
438 | |
|
439 | 0 | if (bbox.__isset.mmin && bbox.__isset.mmax && bbox.mmin <= bbox.mmax) { |
440 | 0 | stats_bbox.m_min = bbox.mmin; |
441 | 0 | stats_bbox.m_max = bbox.mmax; |
442 | 0 | } |
443 | 0 | } |
444 | 0 | if (column_chunk.meta_data.geospatial_statistics.__isset.geospatial_types) { |
445 | 0 | auto &types = column_chunk.meta_data.geospatial_statistics.geospatial_types; |
446 | 0 | auto &stats_types = GeometryStats::GetTypes(geo_stats); |
447 | | |
448 | | // if types are set but empty, that still means "any type" - so we leave stats_types as-is (unknown) |
449 | | // otherwise, clear and set to the actual types |
450 | |
|
451 | 0 | if (!types.empty()) { |
452 | 0 | stats_types.Clear(); |
453 | 0 | for (auto &geom_type : types) { |
454 | 0 | stats_types.AddWKBType(geom_type); |
455 | 0 | } |
456 | 0 | } |
457 | 0 | } |
458 | 0 | } |
459 | 0 | row_group_stats = geo_stats.ToUnique(); |
460 | 0 | break; |
461 | 0 | } |
462 | 0 | default: |
463 | | // no stats for you |
464 | 0 | break; |
465 | 0 | } // end of type switch |
466 | | |
467 | | // null count is generic |
468 | 0 | if (row_group_stats) { |
469 | 0 | row_group_stats->Set(StatsInfo::CAN_HAVE_NULL_AND_VALID_VALUES); |
470 | 0 | if (parquet_stats.__isset.null_count && parquet_stats.null_count == 0) { |
471 | 0 | row_group_stats->Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); |
472 | 0 | } |
473 | 0 | if (parquet_stats.__isset.null_count && parquet_stats.null_count == column_chunk.meta_data.num_values) { |
474 | 0 | row_group_stats->Set(StatsInfo::CANNOT_HAVE_VALID_VALUES); |
475 | 0 | } |
476 | 0 | } |
477 | 0 | return row_group_stats; |
478 | 0 | } |
479 | | |
480 | 0 | static bool HasFilterConstants(const TableFilter &duckdb_filter) { |
481 | 0 | switch (duckdb_filter.filter_type) { |
482 | 0 | case TableFilterType::CONSTANT_COMPARISON: { |
483 | 0 | auto &constant_filter = duckdb_filter.Cast<ConstantFilter>(); |
484 | 0 | return (constant_filter.comparison_type == ExpressionType::COMPARE_EQUAL && !constant_filter.constant.IsNull()); |
485 | 0 | } |
486 | 0 | case TableFilterType::CONJUNCTION_AND: { |
487 | 0 | auto &conjunction_and_filter = duckdb_filter.Cast<ConjunctionAndFilter>(); |
488 | 0 | bool child_has_constant = false; |
489 | 0 | for (auto &child_filter : conjunction_and_filter.child_filters) { |
490 | 0 | child_has_constant |= HasFilterConstants(*child_filter); |
491 | 0 | } |
492 | 0 | return child_has_constant; |
493 | 0 | } |
494 | 0 | case TableFilterType::CONJUNCTION_OR: { |
495 | 0 | auto &conjunction_or_filter = duckdb_filter.Cast<ConjunctionOrFilter>(); |
496 | 0 | bool child_has_constant = false; |
497 | 0 | for (auto &child_filter : conjunction_or_filter.child_filters) { |
498 | 0 | child_has_constant |= HasFilterConstants(*child_filter); |
499 | 0 | } |
500 | 0 | return child_has_constant; |
501 | 0 | } |
502 | 0 | default: |
503 | 0 | return false; |
504 | 0 | } |
505 | 0 | } |
506 | | |
507 | | template <class T> |
508 | 0 | static uint64_t ValueXH64FixedWidth(const Value &constant) { |
509 | 0 | T val = constant.GetValue<T>(); |
510 | 0 | return duckdb_zstd::XXH64(&val, sizeof(val), 0); |
511 | 0 | } Unexecuted instantiation: parquet_statistics.cpp:unsigned long duckdb::ValueXH64FixedWidth<int>(duckdb::Value const&) Unexecuted instantiation: parquet_statistics.cpp:unsigned long duckdb::ValueXH64FixedWidth<unsigned int>(duckdb::Value const&) Unexecuted instantiation: parquet_statistics.cpp:unsigned long duckdb::ValueXH64FixedWidth<unsigned long>(duckdb::Value const&) Unexecuted instantiation: parquet_statistics.cpp:unsigned long duckdb::ValueXH64FixedWidth<long>(duckdb::Value const&) Unexecuted instantiation: parquet_statistics.cpp:unsigned long duckdb::ValueXH64FixedWidth<float>(duckdb::Value const&) Unexecuted instantiation: parquet_statistics.cpp:unsigned long duckdb::ValueXH64FixedWidth<double>(duckdb::Value const&) |
512 | | |
513 | | // TODO we can only this if the parquet representation of the type exactly matches the duckdb rep! |
514 | | // TODO TEST THIS! |
515 | | // TODO perhaps we can re-use some writer infra here |
516 | 0 | static uint64_t ValueXXH64(const Value &constant) { |
517 | 0 | switch (constant.type().InternalType()) { |
518 | 0 | case PhysicalType::UINT8: |
519 | 0 | return ValueXH64FixedWidth<int32_t>(constant); |
520 | 0 | case PhysicalType::INT8: |
521 | 0 | return ValueXH64FixedWidth<int32_t>(constant); |
522 | 0 | case PhysicalType::UINT16: |
523 | 0 | return ValueXH64FixedWidth<int32_t>(constant); |
524 | 0 | case PhysicalType::INT16: |
525 | 0 | return ValueXH64FixedWidth<int32_t>(constant); |
526 | 0 | case PhysicalType::UINT32: |
527 | 0 | return ValueXH64FixedWidth<uint32_t>(constant); |
528 | 0 | case PhysicalType::INT32: |
529 | 0 | return ValueXH64FixedWidth<int32_t>(constant); |
530 | 0 | case PhysicalType::UINT64: |
531 | 0 | return ValueXH64FixedWidth<uint64_t>(constant); |
532 | 0 | case PhysicalType::INT64: |
533 | 0 | return ValueXH64FixedWidth<int64_t>(constant); |
534 | 0 | case PhysicalType::FLOAT: |
535 | 0 | return ValueXH64FixedWidth<float>(constant); |
536 | 0 | case PhysicalType::DOUBLE: |
537 | 0 | return ValueXH64FixedWidth<double>(constant); |
538 | 0 | case PhysicalType::VARCHAR: { |
539 | 0 | auto val = constant.GetValue<string>(); |
540 | 0 | return duckdb_zstd::XXH64(val.c_str(), val.length(), 0); |
541 | 0 | } |
542 | 0 | default: |
543 | 0 | return 0; |
544 | 0 | } |
545 | 0 | } |
546 | | |
547 | 0 | static bool ApplyBloomFilter(const TableFilter &duckdb_filter, ParquetBloomFilter &bloom_filter) { |
548 | 0 | switch (duckdb_filter.filter_type) { |
549 | 0 | case TableFilterType::CONSTANT_COMPARISON: { |
550 | 0 | auto &constant_filter = duckdb_filter.Cast<ConstantFilter>(); |
551 | 0 | auto is_compare_equal = constant_filter.comparison_type == ExpressionType::COMPARE_EQUAL; |
552 | 0 | D_ASSERT(!constant_filter.constant.IsNull()); |
553 | 0 | auto hash = ValueXXH64(constant_filter.constant); |
554 | 0 | return hash > 0 && !bloom_filter.FilterCheck(hash) && is_compare_equal; |
555 | 0 | } |
556 | 0 | case TableFilterType::CONJUNCTION_AND: { |
557 | 0 | auto &conjunction_and_filter = duckdb_filter.Cast<ConjunctionAndFilter>(); |
558 | 0 | bool any_children_true = false; |
559 | 0 | for (auto &child_filter : conjunction_and_filter.child_filters) { |
560 | 0 | any_children_true |= ApplyBloomFilter(*child_filter, bloom_filter); |
561 | 0 | } |
562 | 0 | return any_children_true; |
563 | 0 | } |
564 | 0 | case TableFilterType::CONJUNCTION_OR: { |
565 | 0 | auto &conjunction_or_filter = duckdb_filter.Cast<ConjunctionOrFilter>(); |
566 | 0 | bool all_children_true = true; |
567 | 0 | for (auto &child_filter : conjunction_or_filter.child_filters) { |
568 | 0 | all_children_true &= ApplyBloomFilter(*child_filter, bloom_filter); |
569 | 0 | } |
570 | 0 | return all_children_true; |
571 | 0 | } |
572 | 0 | default: |
573 | 0 | return false; |
574 | 0 | } |
575 | 0 | } |
576 | | |
577 | 0 | bool ParquetStatisticsUtils::BloomFilterSupported(const LogicalTypeId &type_id) { |
578 | 0 | switch (type_id) { |
579 | 0 | case LogicalTypeId::TINYINT: |
580 | 0 | case LogicalTypeId::UTINYINT: |
581 | 0 | case LogicalTypeId::SMALLINT: |
582 | 0 | case LogicalTypeId::USMALLINT: |
583 | 0 | case LogicalTypeId::INTEGER: |
584 | 0 | case LogicalTypeId::UINTEGER: |
585 | 0 | case LogicalTypeId::BIGINT: |
586 | 0 | case LogicalTypeId::UBIGINT: |
587 | 0 | case LogicalTypeId::FLOAT: |
588 | 0 | case LogicalTypeId::DOUBLE: |
589 | 0 | case LogicalTypeId::VARCHAR: |
590 | 0 | case LogicalTypeId::BLOB: |
591 | 0 | return true; |
592 | 0 | default: |
593 | 0 | return false; |
594 | 0 | } |
595 | 0 | } |
596 | | |
597 | | bool ParquetStatisticsUtils::BloomFilterExcludes(const TableFilter &duckdb_filter, |
598 | | const duckdb_parquet::ColumnMetaData &column_meta_data, |
599 | 0 | TProtocol &file_proto, Allocator &allocator) { |
600 | 0 | if (!HasFilterConstants(duckdb_filter) || !column_meta_data.__isset.bloom_filter_offset || |
601 | 0 | column_meta_data.bloom_filter_offset <= 0) { |
602 | 0 | return false; |
603 | 0 | } |
604 | | // TODO check length against file length! |
605 | | |
606 | 0 | auto &transport = reinterpret_cast<ThriftFileTransport &>(*file_proto.getTransport()); |
607 | 0 | transport.SetLocation(column_meta_data.bloom_filter_offset); |
608 | 0 | if (column_meta_data.__isset.bloom_filter_length && column_meta_data.bloom_filter_length > 0) { |
609 | 0 | transport.Prefetch(column_meta_data.bloom_filter_offset, column_meta_data.bloom_filter_length); |
610 | 0 | } |
611 | |
|
612 | 0 | duckdb_parquet::BloomFilterHeader filter_header; |
613 | | // TODO the bloom filter could be encrypted, too, so need to double check that this is NOT the case |
614 | 0 | filter_header.read(&file_proto); |
615 | 0 | if (!filter_header.algorithm.__isset.BLOCK || !filter_header.compression.__isset.UNCOMPRESSED || |
616 | 0 | !filter_header.hash.__isset.XXHASH) { |
617 | 0 | return false; |
618 | 0 | } |
619 | | |
620 | 0 | auto new_buffer = make_uniq<ResizeableBuffer>(allocator, filter_header.numBytes); |
621 | 0 | transport.read(new_buffer->ptr, filter_header.numBytes); |
622 | 0 | ParquetBloomFilter bloom_filter(std::move(new_buffer)); |
623 | 0 | return ApplyBloomFilter(duckdb_filter, bloom_filter); |
624 | 0 | } |
625 | | |
626 | 0 | ParquetBloomFilter::ParquetBloomFilter(idx_t num_entries, double bloom_filter_false_positive_ratio) { |
627 | | // aim for hit ratio of 0.01% |
628 | | // see http://tfk.mit.edu/pdf/bloom.pdf |
629 | 0 | double f = bloom_filter_false_positive_ratio; |
630 | 0 | double k = 8.0; |
631 | 0 | double n = LossyNumericCast<double>(num_entries); |
632 | 0 | double m = -k * n / std::log(1 - std::pow(f, 1 / k)); |
633 | 0 | auto b = MaxValue<idx_t>(NextPowerOfTwo(LossyNumericCast<idx_t>(m / k)) / 32, 1); |
634 | |
|
635 | 0 | D_ASSERT(b > 0 && IsPowerOfTwo(b)); |
636 | |
|
637 | 0 | data = make_uniq<ResizeableBuffer>(Allocator::DefaultAllocator(), sizeof(ParquetBloomBlock) * b); |
638 | 0 | data->zero(); |
639 | 0 | block_count = data->len / sizeof(ParquetBloomBlock); |
640 | 0 | D_ASSERT(data->len % sizeof(ParquetBloomBlock) == 0); |
641 | 0 | } |
642 | | |
643 | 0 | ParquetBloomFilter::ParquetBloomFilter(unique_ptr<ResizeableBuffer> data_p) { |
644 | 0 | D_ASSERT(data_p->len % sizeof(ParquetBloomBlock) == 0); |
645 | 0 | data = std::move(data_p); |
646 | 0 | block_count = data->len / sizeof(ParquetBloomBlock); |
647 | 0 | D_ASSERT(data->len % sizeof(ParquetBloomBlock) == 0); |
648 | 0 | } |
649 | | |
650 | 0 | void ParquetBloomFilter::FilterInsert(uint64_t x) { |
651 | 0 | auto blocks = reinterpret_cast<ParquetBloomBlock *>(data->ptr); |
652 | 0 | uint64_t i = ((x >> 32) * block_count) >> 32; |
653 | 0 | auto &b = blocks[i]; |
654 | 0 | ParquetBloomBlock::BlockInsert(b, x); |
655 | 0 | } |
656 | | |
657 | 0 | bool ParquetBloomFilter::FilterCheck(uint64_t x) { |
658 | 0 | auto blocks = reinterpret_cast<ParquetBloomBlock *>(data->ptr); |
659 | 0 | auto i = ((x >> 32) * block_count) >> 32; |
660 | 0 | return ParquetBloomBlock::BlockCheck(blocks[i], x); |
661 | 0 | } |
662 | | |
663 | | // compiler optimizes this into a single instruction (popcnt) |
664 | 0 | static uint8_t PopCnt64(uint64_t n) { |
665 | 0 | uint8_t c = 0; |
666 | 0 | for (; n; ++c) { |
667 | 0 | n &= n - 1; |
668 | 0 | } |
669 | 0 | return c; |
670 | 0 | } |
671 | | |
672 | 0 | double ParquetBloomFilter::OneRatio() { |
673 | 0 | auto bloom_ptr = reinterpret_cast<uint64_t *>(data->ptr); |
674 | 0 | idx_t one_count = 0; |
675 | 0 | for (idx_t b_idx = 0; b_idx < data->len / sizeof(uint64_t); ++b_idx) { |
676 | 0 | one_count += PopCnt64(bloom_ptr[b_idx]); |
677 | 0 | } |
678 | 0 | return LossyNumericCast<double>(one_count) / (LossyNumericCast<double>(data->len) * 8.0); |
679 | 0 | } |
680 | | |
681 | 0 | ResizeableBuffer *ParquetBloomFilter::Get() { |
682 | 0 | return data.get(); |
683 | 0 | } |
684 | | |
685 | | } // namespace duckdb |