/src/duckdb/extension/parquet/parquet_reader.cpp
Line | Count | Source |
1 | | #include "parquet_reader.hpp" |
2 | | |
3 | | #include "reader/boolean_column_reader.hpp" |
4 | | #include "reader/callback_column_reader.hpp" |
5 | | #include "column_reader.hpp" |
6 | | #include "duckdb.hpp" |
7 | | #include "reader/expression_column_reader.hpp" |
8 | | #include "parquet_geometry.hpp" |
9 | | #include "reader/list_column_reader.hpp" |
10 | | #include "parquet_crypto.hpp" |
11 | | #include "parquet_file_metadata_cache.hpp" |
12 | | #include "parquet_statistics.hpp" |
13 | | #include "parquet_timestamp.hpp" |
14 | | #include "mbedtls_wrapper.hpp" |
15 | | #include "reader/row_number_column_reader.hpp" |
16 | | #include "reader/string_column_reader.hpp" |
17 | | #include "reader/variant_column_reader.hpp" |
18 | | #include "reader/struct_column_reader.hpp" |
19 | | #include "reader/templated_column_reader.hpp" |
20 | | #include "thrift_tools.hpp" |
21 | | #include "duckdb/main/config.hpp" |
22 | | #include "duckdb/common/encryption_state.hpp" |
23 | | #include "duckdb/common/file_system.hpp" |
24 | | #include "duckdb/common/helper.hpp" |
25 | | #include "duckdb/common/hive_partitioning.hpp" |
26 | | #include "duckdb/common/string_util.hpp" |
27 | | #include "duckdb/planner/table_filter.hpp" |
28 | | #include "duckdb/storage/object_cache.hpp" |
29 | | #include "duckdb/optimizer/statistics_propagator.hpp" |
30 | | #include "duckdb/planner/table_filter_state.hpp" |
31 | | #include "duckdb/common/multi_file/multi_file_reader.hpp" |
32 | | #include "duckdb/logging/log_manager.hpp" |
33 | | |
34 | | #include <cassert> |
35 | | #include <chrono> |
36 | | #include <cstring> |
37 | | #include <sstream> |
38 | | |
39 | | namespace duckdb { |
40 | | |
41 | | using duckdb_parquet::ColumnChunk; |
42 | | using duckdb_parquet::ConvertedType; |
43 | | using duckdb_parquet::FieldRepetitionType; |
44 | | using duckdb_parquet::FileCryptoMetaData; |
45 | | using duckdb_parquet::FileMetaData; |
46 | | using ParquetRowGroup = duckdb_parquet::RowGroup; |
47 | | using duckdb_parquet::SchemaElement; |
48 | | using duckdb_parquet::Statistics; |
49 | | using duckdb_parquet::Type; |
50 | | |
51 | | static unique_ptr<duckdb_apache::thrift::protocol::TProtocol> |
52 | 0 | CreateThriftFileProtocol(QueryContext context, CachingFileHandle &file_handle, bool prefetch_mode) { |
53 | 0 | auto transport = duckdb_base_std::make_shared<ThriftFileTransport>(file_handle, prefetch_mode); |
54 | 0 | return make_uniq<duckdb_apache::thrift::protocol::TCompactProtocolT<ThriftFileTransport>>(std::move(transport)); |
55 | 0 | } |
56 | | |
57 | 0 | static bool ShouldAndCanPrefetch(ClientContext &context, CachingFileHandle &file_handle) { |
58 | 0 | Value disable_prefetch = false; |
59 | 0 | Value prefetch_all_files = false; |
60 | 0 | context.TryGetCurrentSetting("disable_parquet_prefetching", disable_prefetch); |
61 | 0 | context.TryGetCurrentSetting("prefetch_all_parquet_files", prefetch_all_files); |
62 | 0 | bool should_prefetch = !file_handle.OnDiskFile() || prefetch_all_files.GetValue<bool>(); |
63 | 0 | bool can_prefetch = file_handle.CanSeek() && !disable_prefetch.GetValue<bool>(); |
64 | 0 | return should_prefetch && can_prefetch; |
65 | 0 | } |
66 | | |
67 | | static void ParseParquetFooter(data_ptr_t buffer, const string &file_path, idx_t file_size, |
68 | | const shared_ptr<const ParquetEncryptionConfig> &encryption_config, uint32_t &footer_len, |
69 | 0 | bool &footer_encrypted) { |
70 | 0 | if (memcmp(buffer + 4, "PAR1", 4) == 0) { |
71 | 0 | footer_encrypted = false; |
72 | 0 | if (encryption_config) { |
73 | 0 | throw InvalidInputException("File '%s' is not encrypted, but 'encryption_config' was set", file_path); |
74 | 0 | } |
75 | 0 | } else if (memcmp(buffer + 4, "PARE", 4) == 0) { |
76 | 0 | footer_encrypted = true; |
77 | 0 | if (!encryption_config) { |
78 | 0 | throw InvalidInputException("File '%s' is encrypted, but 'encryption_config' was not set", file_path); |
79 | 0 | } |
80 | 0 | } else { |
81 | 0 | throw InvalidInputException("No magic bytes found at end of file '%s'", file_path); |
82 | 0 | } |
83 | | |
84 | | // read four-byte footer length from just before the end magic bytes |
85 | 0 | footer_len = Load<uint32_t>(buffer); |
86 | 0 | if (footer_len == 0 || file_size < 12 + footer_len) { |
87 | 0 | throw InvalidInputException("Footer length error in file '%s'", file_path); |
88 | 0 | } |
89 | 0 | } |
90 | | |
91 | | static shared_ptr<ParquetFileMetadataCache> |
92 | | LoadMetadata(ClientContext &context, Allocator &allocator, CachingFileHandle &file_handle, |
93 | | const shared_ptr<const ParquetEncryptionConfig> &encryption_config, const EncryptionUtil &encryption_util, |
94 | 0 | optional_idx footer_size) { |
95 | 0 | auto file_proto = CreateThriftFileProtocol(context, file_handle, false); |
96 | 0 | auto &transport = reinterpret_cast<ThriftFileTransport &>(*file_proto->getTransport()); |
97 | 0 | auto file_size = transport.GetSize(); |
98 | 0 | if (file_size < 12) { |
99 | 0 | throw InvalidInputException("File '%s' too small to be a Parquet file", file_handle.GetPath()); |
100 | 0 | } |
101 | | |
102 | 0 | bool footer_encrypted; |
103 | 0 | uint32_t footer_len; |
104 | | // footer size is not provided - read it from the back |
105 | 0 | if (!footer_size.IsValid()) { |
106 | | // We have to do two reads here: |
107 | | // 1. The 8 bytes from the back to check if it's a Parquet file and the footer size |
108 | | // 2. The footer (after getting the size) |
109 | | // For local reads this doesn't matter much, but for remote reads this means two round trips, |
110 | | // which is especially bad for small Parquet files where the read cost is mostly round trips. |
111 | | // So, we prefetch more, to hopefully save a round trip. |
112 | 0 | static constexpr idx_t ESTIMATED_FOOTER_RATIO = 1000; // Estimate 1/1000th of the file to be footer |
113 | 0 | static constexpr idx_t MIN_PREFETCH_SIZE = 16384; // Prefetch at least this many bytes |
114 | 0 | static constexpr idx_t MAX_PREFETCH_SIZE = 262144; // Prefetch at most this many bytes |
115 | 0 | idx_t prefetch_size = 8; |
116 | 0 | if (ShouldAndCanPrefetch(context, file_handle)) { |
117 | 0 | prefetch_size = ClampValue(file_size / ESTIMATED_FOOTER_RATIO, MIN_PREFETCH_SIZE, MAX_PREFETCH_SIZE); |
118 | 0 | prefetch_size = MinValue(NextPowerOfTwo(prefetch_size), file_size); |
119 | 0 | } |
120 | |
|
121 | 0 | ResizeableBuffer buf; |
122 | 0 | buf.resize(allocator, 8); |
123 | 0 | buf.zero(); |
124 | |
|
125 | 0 | transport.Prefetch(file_size - prefetch_size, prefetch_size); |
126 | 0 | transport.SetLocation(file_size - 8); |
127 | 0 | transport.read(buf.ptr, 8); |
128 | |
|
129 | 0 | ParseParquetFooter(buf.ptr, file_handle.GetPath(), file_size, encryption_config, footer_len, footer_encrypted); |
130 | |
|
131 | 0 | auto metadata_pos = file_size - (footer_len + 8); |
132 | 0 | transport.SetLocation(metadata_pos); |
133 | 0 | if (footer_len > prefetch_size - 8) { |
134 | 0 | transport.Prefetch(metadata_pos, footer_len); |
135 | 0 | } |
136 | 0 | } else { |
137 | 0 | footer_len = UnsafeNumericCast<uint32_t>(footer_size.GetIndex()); |
138 | 0 | if (footer_len == 0 || file_size < 12 + footer_len) { |
139 | 0 | throw InvalidInputException("Invalid footer length provided for file '%s'", file_handle.GetPath()); |
140 | 0 | } |
141 | | |
142 | 0 | idx_t total_footer_len = footer_len + 8; |
143 | 0 | auto metadata_pos = file_size - total_footer_len; |
144 | 0 | transport.SetLocation(metadata_pos); |
145 | 0 | transport.Prefetch(metadata_pos, total_footer_len); |
146 | |
|
147 | 0 | auto read_head = transport.GetReadHead(metadata_pos); |
148 | 0 | auto data_ptr = read_head->buffer_ptr; |
149 | |
|
150 | 0 | uint32_t read_footer_len; |
151 | 0 | ParseParquetFooter(data_ptr + footer_len, file_handle.GetPath(), file_size, encryption_config, read_footer_len, |
152 | 0 | footer_encrypted); |
153 | 0 | if (read_footer_len != footer_len) { |
154 | 0 | throw InvalidInputException("Parquet footer length stored in file is not equal to footer length provided"); |
155 | 0 | } |
156 | 0 | } |
157 | | |
158 | 0 | auto metadata = make_uniq<FileMetaData>(); |
159 | 0 | if (footer_encrypted) { |
160 | 0 | auto crypto_metadata = make_uniq<FileCryptoMetaData>(); |
161 | 0 | crypto_metadata->read(file_proto.get()); |
162 | 0 | if (crypto_metadata->encryption_algorithm.__isset.AES_GCM_CTR_V1) { |
163 | 0 | throw InvalidInputException("File '%s' is encrypted with AES_GCM_CTR_V1, but only AES_GCM_V1 is supported", |
164 | 0 | file_handle.GetPath()); |
165 | 0 | } |
166 | 0 | ParquetCrypto::Read(*metadata, *file_proto, encryption_config->GetFooterKey(), encryption_util); |
167 | 0 | } else { |
168 | 0 | metadata->read(file_proto.get()); |
169 | 0 | } |
170 | | |
171 | | // Try to read the GeoParquet metadata (if present) |
172 | 0 | auto geo_metadata = GeoParquetFileMetadata::TryRead(*metadata, context); |
173 | 0 | return make_shared_ptr<ParquetFileMetadataCache>(std::move(metadata), file_handle, std::move(geo_metadata), |
174 | 0 | footer_len); |
175 | 0 | } |
176 | | |
177 | 0 | LogicalType ParquetReader::DeriveLogicalType(const SchemaElement &s_ele, ParquetColumnSchema &schema) const { |
178 | | // inner node |
179 | 0 | if (s_ele.type == Type::FIXED_LEN_BYTE_ARRAY && !s_ele.__isset.type_length) { |
180 | 0 | throw IOException("FIXED_LEN_BYTE_ARRAY requires length to be set"); |
181 | 0 | } |
182 | 0 | if (s_ele.__isset.type_length) { |
183 | 0 | schema.type_length = NumericCast<uint32_t>(s_ele.type_length); |
184 | 0 | } |
185 | 0 | schema.parquet_type = s_ele.type; |
186 | 0 | if (s_ele.__isset.logicalType) { |
187 | 0 | if (s_ele.logicalType.__isset.UNKNOWN) { |
188 | 0 | return LogicalType::SQLNULL; |
189 | 0 | } else if (s_ele.logicalType.__isset.UUID) { |
190 | 0 | if (s_ele.type == Type::FIXED_LEN_BYTE_ARRAY) { |
191 | 0 | return LogicalType::UUID; |
192 | 0 | } |
193 | 0 | } else if (s_ele.logicalType.__isset.FLOAT16) { |
194 | 0 | if (s_ele.type == Type::FIXED_LEN_BYTE_ARRAY && s_ele.type_length == 2) { |
195 | 0 | schema.type_info = ParquetExtraTypeInfo::FLOAT16; |
196 | 0 | return LogicalType::FLOAT; |
197 | 0 | } |
198 | 0 | } else if (s_ele.logicalType.__isset.TIMESTAMP) { |
199 | 0 | if (s_ele.logicalType.TIMESTAMP.unit.__isset.MILLIS) { |
200 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_MS; |
201 | 0 | } else if (s_ele.logicalType.TIMESTAMP.unit.__isset.MICROS) { |
202 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_MICROS; |
203 | 0 | } else if (s_ele.logicalType.TIMESTAMP.unit.__isset.NANOS) { |
204 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_NS; |
205 | 0 | } else { |
206 | 0 | throw NotImplementedException("Unimplemented TIMESTAMP encoding - missing UNIT"); |
207 | 0 | } |
208 | 0 | if (s_ele.logicalType.TIMESTAMP.isAdjustedToUTC) { |
209 | 0 | return LogicalType::TIMESTAMP_TZ; |
210 | 0 | } else if (s_ele.logicalType.TIMESTAMP.unit.__isset.NANOS) { |
211 | 0 | return LogicalType::TIMESTAMP_NS; |
212 | 0 | } |
213 | 0 | return LogicalType::TIMESTAMP; |
214 | 0 | } else if (s_ele.logicalType.__isset.TIME) { |
215 | 0 | if (s_ele.logicalType.TIME.unit.__isset.MILLIS) { |
216 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_MS; |
217 | 0 | } else if (s_ele.logicalType.TIME.unit.__isset.MICROS) { |
218 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_MICROS; |
219 | 0 | } else if (s_ele.logicalType.TIME.unit.__isset.NANOS) { |
220 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_NS; |
221 | 0 | } else { |
222 | 0 | throw NotImplementedException("Unimplemented TIME encoding - missing UNIT"); |
223 | 0 | } |
224 | 0 | if (s_ele.logicalType.TIME.isAdjustedToUTC) { |
225 | 0 | return LogicalType::TIME_TZ; |
226 | 0 | } |
227 | 0 | return LogicalType::TIME; |
228 | 0 | } |
229 | 0 | } |
230 | 0 | if (s_ele.__isset.converted_type) { |
231 | | // Legacy NULL type, does no longer exist, but files are still around of course |
232 | 0 | if (static_cast<uint8_t>(s_ele.converted_type) == 24) { |
233 | 0 | return LogicalTypeId::SQLNULL; |
234 | 0 | } |
235 | 0 | switch (s_ele.converted_type) { |
236 | 0 | case ConvertedType::INT_8: |
237 | 0 | if (s_ele.type == Type::INT32) { |
238 | 0 | return LogicalType::TINYINT; |
239 | 0 | } else { |
240 | 0 | throw IOException("INT8 converted type can only be set for value of Type::INT32"); |
241 | 0 | } |
242 | 0 | case ConvertedType::INT_16: |
243 | 0 | if (s_ele.type == Type::INT32) { |
244 | 0 | return LogicalType::SMALLINT; |
245 | 0 | } else { |
246 | 0 | throw IOException("INT16 converted type can only be set for value of Type::INT32"); |
247 | 0 | } |
248 | 0 | case ConvertedType::INT_32: |
249 | 0 | if (s_ele.type == Type::INT32) { |
250 | 0 | return LogicalType::INTEGER; |
251 | 0 | } else { |
252 | 0 | throw IOException("INT32 converted type can only be set for value of Type::INT32"); |
253 | 0 | } |
254 | 0 | case ConvertedType::INT_64: |
255 | 0 | if (s_ele.type == Type::INT64) { |
256 | 0 | return LogicalType::BIGINT; |
257 | 0 | } else { |
258 | 0 | throw IOException("INT64 converted type can only be set for value of Type::INT32"); |
259 | 0 | } |
260 | 0 | case ConvertedType::UINT_8: |
261 | 0 | if (s_ele.type == Type::INT32) { |
262 | 0 | return LogicalType::UTINYINT; |
263 | 0 | } else { |
264 | 0 | throw IOException("UINT8 converted type can only be set for value of Type::INT32"); |
265 | 0 | } |
266 | 0 | case ConvertedType::UINT_16: |
267 | 0 | if (s_ele.type == Type::INT32) { |
268 | 0 | return LogicalType::USMALLINT; |
269 | 0 | } else { |
270 | 0 | throw IOException("UINT16 converted type can only be set for value of Type::INT32"); |
271 | 0 | } |
272 | 0 | case ConvertedType::UINT_32: |
273 | 0 | if (s_ele.type == Type::INT32) { |
274 | 0 | return LogicalType::UINTEGER; |
275 | 0 | } else { |
276 | 0 | throw IOException("UINT32 converted type can only be set for value of Type::INT32"); |
277 | 0 | } |
278 | 0 | case ConvertedType::UINT_64: |
279 | 0 | if (s_ele.type == Type::INT64) { |
280 | 0 | return LogicalType::UBIGINT; |
281 | 0 | } else { |
282 | 0 | throw IOException("UINT64 converted type can only be set for value of Type::INT64"); |
283 | 0 | } |
284 | 0 | case ConvertedType::DATE: |
285 | 0 | if (s_ele.type == Type::INT32) { |
286 | 0 | return LogicalType::DATE; |
287 | 0 | } else { |
288 | 0 | throw IOException("DATE converted type can only be set for value of Type::INT32"); |
289 | 0 | } |
290 | 0 | case ConvertedType::TIMESTAMP_MICROS: |
291 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_MICROS; |
292 | 0 | if (s_ele.type == Type::INT64) { |
293 | 0 | return LogicalType::TIMESTAMP; |
294 | 0 | } else { |
295 | 0 | throw IOException("TIMESTAMP converted type can only be set for value of Type::INT64"); |
296 | 0 | } |
297 | 0 | case ConvertedType::TIMESTAMP_MILLIS: |
298 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_MS; |
299 | 0 | if (s_ele.type == Type::INT64) { |
300 | 0 | return LogicalType::TIMESTAMP; |
301 | 0 | } else { |
302 | 0 | throw IOException("TIMESTAMP converted type can only be set for value of Type::INT64"); |
303 | 0 | } |
304 | 0 | case ConvertedType::DECIMAL: |
305 | 0 | if (!s_ele.__isset.precision || !s_ele.__isset.scale) { |
306 | 0 | throw IOException("DECIMAL requires a length and scale specifier!"); |
307 | 0 | } |
308 | 0 | schema.type_scale = NumericCast<uint32_t>(s_ele.scale); |
309 | 0 | if (s_ele.precision > DecimalType::MaxWidth()) { |
310 | 0 | schema.type_info = ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY; |
311 | 0 | return LogicalType::DOUBLE; |
312 | 0 | } |
313 | 0 | switch (s_ele.type) { |
314 | 0 | case Type::BYTE_ARRAY: |
315 | 0 | case Type::FIXED_LEN_BYTE_ARRAY: |
316 | 0 | schema.type_info = ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY; |
317 | 0 | break; |
318 | 0 | case Type::INT32: |
319 | 0 | schema.type_info = ParquetExtraTypeInfo::DECIMAL_INT32; |
320 | 0 | break; |
321 | 0 | case Type::INT64: |
322 | 0 | schema.type_info = ParquetExtraTypeInfo::DECIMAL_INT64; |
323 | 0 | break; |
324 | 0 | default: |
325 | 0 | throw IOException( |
326 | 0 | "DECIMAL converted type can only be set for value of Type::(FIXED_LEN_)BYTE_ARRAY/INT32/INT64"); |
327 | 0 | } |
328 | 0 | return LogicalType::DECIMAL(s_ele.precision, s_ele.scale); |
329 | 0 | case ConvertedType::UTF8: |
330 | 0 | case ConvertedType::ENUM: |
331 | 0 | switch (s_ele.type) { |
332 | 0 | case Type::BYTE_ARRAY: |
333 | 0 | case Type::FIXED_LEN_BYTE_ARRAY: |
334 | 0 | return LogicalType::VARCHAR; |
335 | 0 | default: |
336 | 0 | throw IOException("UTF8 converted type can only be set for Type::(FIXED_LEN_)BYTE_ARRAY"); |
337 | 0 | } |
338 | 0 | case ConvertedType::TIME_MILLIS: |
339 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_MS; |
340 | 0 | if (s_ele.type == Type::INT32) { |
341 | 0 | return LogicalType::TIME; |
342 | 0 | } else { |
343 | 0 | throw IOException("TIME_MILLIS converted type can only be set for value of Type::INT32"); |
344 | 0 | } |
345 | 0 | case ConvertedType::TIME_MICROS: |
346 | 0 | schema.type_info = ParquetExtraTypeInfo::UNIT_MICROS; |
347 | 0 | if (s_ele.type == Type::INT64) { |
348 | 0 | return LogicalType::TIME; |
349 | 0 | } else { |
350 | 0 | throw IOException("TIME_MICROS converted type can only be set for value of Type::INT64"); |
351 | 0 | } |
352 | 0 | case ConvertedType::INTERVAL: |
353 | 0 | return LogicalType::INTERVAL; |
354 | 0 | case ConvertedType::JSON: |
355 | 0 | return LogicalType::JSON(); |
356 | 0 | case ConvertedType::MAP: |
357 | 0 | case ConvertedType::MAP_KEY_VALUE: |
358 | 0 | case ConvertedType::LIST: |
359 | 0 | case ConvertedType::BSON: |
360 | 0 | default: |
361 | 0 | throw IOException("Unsupported converted type (%d)", (int32_t)s_ele.converted_type); |
362 | 0 | } |
363 | 0 | } else { |
364 | | // no converted type set |
365 | | // use default type for each physical type |
366 | 0 | switch (s_ele.type) { |
367 | 0 | case Type::BOOLEAN: |
368 | 0 | return LogicalType::BOOLEAN; |
369 | 0 | case Type::INT32: |
370 | 0 | return LogicalType::INTEGER; |
371 | 0 | case Type::INT64: |
372 | 0 | return LogicalType::BIGINT; |
373 | 0 | case Type::INT96: // always a timestamp it would seem |
374 | 0 | schema.type_info = ParquetExtraTypeInfo::IMPALA_TIMESTAMP; |
375 | 0 | return LogicalType::TIMESTAMP; |
376 | 0 | case Type::FLOAT: |
377 | 0 | return LogicalType::FLOAT; |
378 | 0 | case Type::DOUBLE: |
379 | 0 | return LogicalType::DOUBLE; |
380 | 0 | case Type::BYTE_ARRAY: |
381 | 0 | case Type::FIXED_LEN_BYTE_ARRAY: |
382 | 0 | if (parquet_options.binary_as_string) { |
383 | 0 | return LogicalType::VARCHAR; |
384 | 0 | } |
385 | 0 | return LogicalType::BLOB; |
386 | 0 | default: |
387 | 0 | return LogicalType::INVALID; |
388 | 0 | } |
389 | 0 | } |
390 | 0 | } |
391 | | |
392 | | ParquetColumnSchema ParquetReader::ParseColumnSchema(const SchemaElement &s_ele, idx_t max_define, idx_t max_repeat, |
393 | | idx_t schema_index, idx_t column_index, |
394 | 0 | ParquetColumnSchemaType type) { |
395 | 0 | ParquetColumnSchema schema(max_define, max_repeat, schema_index, column_index, type); |
396 | 0 | schema.name = s_ele.name; |
397 | 0 | schema.type = DeriveLogicalType(s_ele, schema); |
398 | 0 | return schema; |
399 | 0 | } |
400 | | |
401 | | unique_ptr<ColumnReader> ParquetReader::CreateReaderRecursive(ClientContext &context, |
402 | | const vector<ColumnIndex> &indexes, |
403 | 0 | const ParquetColumnSchema &schema) { |
404 | 0 | switch (schema.schema_type) { |
405 | 0 | case ParquetColumnSchemaType::FILE_ROW_NUMBER: |
406 | 0 | return make_uniq<RowNumberColumnReader>(*this, schema); |
407 | 0 | case ParquetColumnSchemaType::GEOMETRY: { |
408 | 0 | return GeometryColumnReader::Create(*this, schema, context); |
409 | 0 | } |
410 | 0 | case ParquetColumnSchemaType::COLUMN: { |
411 | 0 | if (schema.children.empty()) { |
412 | | // leaf reader |
413 | 0 | return ColumnReader::CreateReader(*this, schema); |
414 | 0 | } |
415 | 0 | vector<unique_ptr<ColumnReader>> children; |
416 | 0 | children.resize(schema.children.size()); |
417 | 0 | if (indexes.empty()) { |
418 | 0 | for (idx_t child_index = 0; child_index < schema.children.size(); child_index++) { |
419 | 0 | children[child_index] = CreateReaderRecursive(context, indexes, schema.children[child_index]); |
420 | 0 | } |
421 | 0 | } else { |
422 | 0 | for (idx_t i = 0; i < indexes.size(); i++) { |
423 | 0 | auto child_index = indexes[i].GetPrimaryIndex(); |
424 | 0 | children[child_index] = |
425 | 0 | CreateReaderRecursive(context, indexes[i].GetChildIndexes(), schema.children[child_index]); |
426 | 0 | } |
427 | 0 | } |
428 | 0 | switch (schema.type.id()) { |
429 | 0 | case LogicalTypeId::LIST: |
430 | 0 | case LogicalTypeId::MAP: |
431 | 0 | D_ASSERT(children.size() == 1); |
432 | 0 | return make_uniq<ListColumnReader>(*this, schema, std::move(children[0])); |
433 | 0 | case LogicalTypeId::STRUCT: |
434 | 0 | return make_uniq<StructColumnReader>(*this, schema, std::move(children)); |
435 | 0 | default: |
436 | 0 | throw InternalException("Unsupported schema type for schema with children"); |
437 | 0 | } |
438 | 0 | } |
439 | 0 | case ParquetColumnSchemaType::VARIANT: { |
440 | 0 | if (schema.children.size() < 2) { |
441 | 0 | throw InternalException("VARIANT schema type used for a non-variant type column"); |
442 | 0 | } |
443 | 0 | vector<unique_ptr<ColumnReader>> children; |
444 | 0 | children.resize(schema.children.size()); |
445 | 0 | for (idx_t child_index = 0; child_index < schema.children.size(); child_index++) { |
446 | 0 | children[child_index] = CreateReaderRecursive(context, indexes, schema.children[child_index]); |
447 | 0 | } |
448 | 0 | return make_uniq<VariantColumnReader>(context, *this, schema, std::move(children)); |
449 | 0 | } |
450 | 0 | default: |
451 | 0 | throw InternalException("Unsupported ParquetColumnSchemaType"); |
452 | 0 | } |
453 | 0 | } |
454 | | |
455 | 0 | unique_ptr<ColumnReader> ParquetReader::CreateReader(ClientContext &context) { |
456 | 0 | auto ret = CreateReaderRecursive(context, column_indexes, *root_schema); |
457 | 0 | if (ret->Type().id() != LogicalTypeId::STRUCT) { |
458 | 0 | throw InternalException("Root element of Parquet file must be a struct"); |
459 | 0 | } |
460 | | // add expressions if required |
461 | 0 | auto &root_struct_reader = ret->Cast<StructColumnReader>(); |
462 | 0 | for (auto &entry : expression_map) { |
463 | 0 | auto column_id = entry.first; |
464 | 0 | auto &expression = entry.second; |
465 | 0 | auto child_reader = std::move(root_struct_reader.child_readers[column_id]); |
466 | 0 | auto expr_schema = make_uniq<ParquetColumnSchema>(child_reader->Schema(), expression->return_type, |
467 | 0 | ParquetColumnSchemaType::EXPRESSION); |
468 | 0 | auto expr_reader = make_uniq<ExpressionColumnReader>(context, std::move(child_reader), expression->Copy(), |
469 | 0 | std::move(expr_schema)); |
470 | 0 | root_struct_reader.child_readers[column_id] = std::move(expr_reader); |
471 | 0 | } |
472 | 0 | return ret; |
473 | 0 | } |
474 | | |
475 | | ParquetColumnSchema::ParquetColumnSchema(idx_t max_define, idx_t max_repeat, idx_t schema_index, idx_t column_index, |
476 | | ParquetColumnSchemaType schema_type) |
477 | 0 | : ParquetColumnSchema(string(), LogicalTypeId::INVALID, max_define, max_repeat, schema_index, column_index, |
478 | 0 | schema_type) { |
479 | 0 | } |
480 | | |
481 | | ParquetColumnSchema::ParquetColumnSchema(string name_p, LogicalType type_p, idx_t max_define, idx_t max_repeat, |
482 | | idx_t schema_index, idx_t column_index, ParquetColumnSchemaType schema_type) |
483 | 0 | : schema_type(schema_type), name(std::move(name_p)), type(std::move(type_p)), max_define(max_define), |
484 | 0 | max_repeat(max_repeat), schema_index(schema_index), column_index(column_index) { |
485 | 0 | } |
486 | | |
487 | | ParquetColumnSchema::ParquetColumnSchema(ParquetColumnSchema child, LogicalType result_type, |
488 | | ParquetColumnSchemaType schema_type) |
489 | 0 | : schema_type(schema_type), name(child.name), type(std::move(result_type)), max_define(child.max_define), |
490 | 0 | max_repeat(child.max_repeat), schema_index(child.schema_index), column_index(child.column_index) { |
491 | 0 | children.push_back(std::move(child)); |
492 | 0 | } |
493 | | |
494 | | unique_ptr<BaseStatistics> ParquetColumnSchema::Stats(const FileMetaData &file_meta_data, |
495 | | const ParquetOptions &parquet_options, idx_t row_group_idx_p, |
496 | 0 | const vector<ColumnChunk> &columns) const { |
497 | 0 | if (schema_type == ParquetColumnSchemaType::EXPRESSION) { |
498 | 0 | return nullptr; |
499 | 0 | } |
500 | 0 | if (schema_type == ParquetColumnSchemaType::FILE_ROW_NUMBER) { |
501 | 0 | auto stats = NumericStats::CreateUnknown(type); |
502 | 0 | auto &row_groups = file_meta_data.row_groups; |
503 | 0 | D_ASSERT(row_group_idx_p < row_groups.size()); |
504 | 0 | idx_t row_group_offset_min = 0; |
505 | 0 | for (idx_t i = 0; i < row_group_idx_p; i++) { |
506 | 0 | row_group_offset_min += row_groups[i].num_rows; |
507 | 0 | } |
508 | |
|
509 | 0 | NumericStats::SetMin(stats, Value::BIGINT(UnsafeNumericCast<int64_t>(row_group_offset_min))); |
510 | 0 | NumericStats::SetMax(stats, Value::BIGINT(UnsafeNumericCast<int64_t>(row_group_offset_min + |
511 | 0 | row_groups[row_group_idx_p].num_rows))); |
512 | 0 | stats.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); |
513 | 0 | return stats.ToUnique(); |
514 | 0 | } |
515 | 0 | return ParquetStatisticsUtils::TransformColumnStatistics(*this, columns, parquet_options.can_have_nan); |
516 | 0 | } |
517 | | |
518 | 0 | static bool IsGeometryType(const SchemaElement &s_ele, const ParquetFileMetadataCache &metadata, idx_t depth) { |
519 | 0 | const auto is_blob = s_ele.__isset.type && s_ele.type == Type::BYTE_ARRAY; |
520 | 0 | if (!is_blob) { |
521 | 0 | return false; |
522 | 0 | } |
523 | | |
524 | | // TODO: Handle CRS in the future |
525 | 0 | const auto is_native_geom = s_ele.__isset.logicalType && s_ele.logicalType.__isset.GEOMETRY; |
526 | 0 | const auto is_native_geog = s_ele.__isset.logicalType && s_ele.logicalType.__isset.GEOGRAPHY; |
527 | 0 | if (is_native_geom || is_native_geog) { |
528 | 0 | return true; |
529 | 0 | } |
530 | | |
531 | | // geoparquet types have to be at the root of the schema, and have to be present in the kv metadata. |
532 | 0 | const auto is_at_root = depth == 1; |
533 | 0 | const auto is_in_gpq_metadata = metadata.geo_metadata && metadata.geo_metadata->IsGeometryColumn(s_ele.name); |
534 | 0 | const auto is_leaf = s_ele.num_children == 0; |
535 | 0 | const auto is_geoparquet_geom = is_at_root && is_in_gpq_metadata && is_leaf; |
536 | |
|
537 | 0 | if (is_geoparquet_geom) { |
538 | 0 | return true; |
539 | 0 | } |
540 | | |
541 | 0 | return false; |
542 | 0 | } |
543 | | |
544 | | ParquetColumnSchema ParquetReader::ParseSchemaRecursive(idx_t depth, idx_t max_define, idx_t max_repeat, |
545 | | idx_t &next_schema_idx, idx_t &next_file_idx, |
546 | 0 | ClientContext &context) { |
547 | 0 | auto file_meta_data = GetFileMetadata(); |
548 | 0 | D_ASSERT(file_meta_data); |
549 | 0 | if (next_schema_idx >= file_meta_data->schema.size()) { |
550 | 0 | throw InvalidInputException("Malformed Parquet schema in file \"%s\": invalid schema index %d", file.path, |
551 | 0 | next_schema_idx); |
552 | 0 | } |
553 | 0 | auto &s_ele = file_meta_data->schema[next_schema_idx]; |
554 | 0 | auto this_idx = next_schema_idx; |
555 | |
|
556 | 0 | auto repetition_type = FieldRepetitionType::REQUIRED; |
557 | 0 | if (s_ele.__isset.repetition_type && this_idx > 0) { |
558 | 0 | repetition_type = s_ele.repetition_type; |
559 | 0 | } |
560 | 0 | if (repetition_type != FieldRepetitionType::REQUIRED) { |
561 | 0 | max_define++; |
562 | 0 | } |
563 | 0 | if (repetition_type == FieldRepetitionType::REPEATED) { |
564 | 0 | max_repeat++; |
565 | 0 | } |
566 | | |
567 | | // Check for geometry type |
568 | 0 | if (IsGeometryType(s_ele, *metadata, depth)) { |
569 | | // Geometries in both GeoParquet and native parquet are stored as a WKB-encoded BLOB. |
570 | | // Because we don't just want to validate that the WKB encoding is correct, but also transform it into |
571 | | // little-endian if necessary, we cant just make use of the StringColumnReader without heavily modifying it. |
572 | | // Therefore, we create a dedicated GEOMETRY parquet column schema type, which wraps the underlying BLOB column. |
573 | | // This schema type gets instantiated as a ExpressionColumnReader on top of the standard Blob/String reader, |
574 | | // which performs the WKB validation/transformation using the `ST_GeomFromWKB` function of DuckDB. |
575 | | // This enables us to also support other geometry encodings (such as GeoArrow geometries) easier in the future. |
576 | | |
577 | | // Inner BLOB schema |
578 | 0 | ParquetColumnSchema blob_schema(max_define, max_repeat, this_idx, next_file_idx++, |
579 | 0 | ParquetColumnSchemaType::COLUMN); |
580 | 0 | blob_schema.name = s_ele.name; |
581 | 0 | blob_schema.type = LogicalType::BLOB; |
582 | | |
583 | | // Wrap in geometry schema |
584 | 0 | ParquetColumnSchema geom_schema(std::move(blob_schema), LogicalType::GEOMETRY(), |
585 | 0 | ParquetColumnSchemaType::GEOMETRY); |
586 | 0 | return geom_schema; |
587 | 0 | } |
588 | | |
589 | 0 | if (s_ele.__isset.num_children && s_ele.num_children > 0) { // inner node |
590 | 0 | vector<ParquetColumnSchema> child_schemas; |
591 | |
|
592 | 0 | idx_t c_idx = 0; |
593 | 0 | while (c_idx < NumericCast<idx_t>(s_ele.num_children)) { |
594 | 0 | next_schema_idx++; |
595 | |
|
596 | 0 | auto child_schema = |
597 | 0 | ParseSchemaRecursive(depth + 1, max_define, max_repeat, next_schema_idx, next_file_idx, context); |
598 | 0 | child_schemas.push_back(std::move(child_schema)); |
599 | 0 | c_idx++; |
600 | 0 | } |
601 | | // rename child type entries if there are case-insensitive duplicates by appending _1, _2 etc. |
602 | | // behavior consistent with CSV reader fwiw |
603 | 0 | case_insensitive_map_t<idx_t> name_collision_count; |
604 | 0 | for (auto &child_schema : child_schemas) { |
605 | 0 | auto &col_name = child_schema.name; |
606 | | // avoid duplicate header names |
607 | 0 | while (name_collision_count.find(col_name) != name_collision_count.end()) { |
608 | 0 | name_collision_count[col_name] += 1; |
609 | 0 | col_name = col_name + "_" + to_string(name_collision_count[col_name]); |
610 | 0 | } |
611 | 0 | child_schema.name = col_name; |
612 | 0 | name_collision_count[col_name] = 0; |
613 | 0 | } |
614 | |
|
615 | 0 | bool is_repeated = repetition_type == FieldRepetitionType::REPEATED; |
616 | 0 | const bool is_list = s_ele.__isset.converted_type && s_ele.converted_type == ConvertedType::LIST; |
617 | 0 | const bool is_map = s_ele.__isset.converted_type && s_ele.converted_type == ConvertedType::MAP; |
618 | 0 | bool is_map_kv = s_ele.__isset.converted_type && s_ele.converted_type == ConvertedType::MAP_KEY_VALUE; |
619 | 0 | bool is_variant = s_ele.__isset.logicalType && s_ele.logicalType.__isset.VARIANT == true; |
620 | |
|
621 | 0 | if (!is_map_kv && this_idx > 0) { |
622 | | // check if the parent node of this is a map |
623 | 0 | auto &p_ele = file_meta_data->schema[this_idx - 1]; |
624 | 0 | bool parent_is_map = p_ele.__isset.converted_type && p_ele.converted_type == ConvertedType::MAP; |
625 | 0 | bool parent_has_children = p_ele.__isset.num_children && p_ele.num_children == 1; |
626 | 0 | is_map_kv = parent_is_map && parent_has_children; |
627 | 0 | } |
628 | |
|
629 | 0 | if (is_map_kv) { |
630 | 0 | if (child_schemas.size() != 2) { |
631 | 0 | throw IOException("MAP_KEY_VALUE requires two children"); |
632 | 0 | } |
633 | 0 | if (!is_repeated) { |
634 | 0 | throw IOException("MAP_KEY_VALUE needs to be repeated"); |
635 | 0 | } |
636 | 0 | auto result_type = LogicalType::MAP(child_schemas[0].type, child_schemas[1].type); |
637 | 0 | ParquetColumnSchema struct_schema(s_ele.name, ListType::GetChildType(result_type), max_define - 1, |
638 | 0 | max_repeat - 1, this_idx, next_file_idx); |
639 | 0 | struct_schema.children = std::move(child_schemas); |
640 | |
|
641 | 0 | ParquetColumnSchema map_schema(s_ele.name, std::move(result_type), max_define, max_repeat, this_idx, |
642 | 0 | next_file_idx); |
643 | 0 | map_schema.children.push_back(std::move(struct_schema)); |
644 | 0 | return map_schema; |
645 | 0 | } |
646 | 0 | ParquetColumnSchema result; |
647 | 0 | if (child_schemas.size() > 1 || (!is_list && !is_map && !is_repeated)) { |
648 | 0 | child_list_t<LogicalType> struct_types; |
649 | 0 | for (auto &child_schema : child_schemas) { |
650 | 0 | struct_types.emplace_back(make_pair(child_schema.name, child_schema.type)); |
651 | 0 | } |
652 | |
|
653 | 0 | LogicalType result_type; |
654 | 0 | if (is_variant) { |
655 | 0 | result_type = LogicalType::VARIANT(); |
656 | 0 | } else { |
657 | 0 | result_type = LogicalType::STRUCT(std::move(struct_types)); |
658 | 0 | } |
659 | 0 | ParquetColumnSchema struct_schema(s_ele.name, std::move(result_type), max_define, max_repeat, this_idx, |
660 | 0 | next_file_idx); |
661 | 0 | struct_schema.children = std::move(child_schemas); |
662 | 0 | if (is_variant) { |
663 | 0 | struct_schema.schema_type = ParquetColumnSchemaType::VARIANT; |
664 | 0 | } |
665 | 0 | result = std::move(struct_schema); |
666 | 0 | } else { |
667 | | // if we have a struct with only a single type, pull up |
668 | 0 | result = std::move(child_schemas[0]); |
669 | 0 | result.name = s_ele.name; |
670 | 0 | } |
671 | 0 | if (is_repeated) { |
672 | 0 | auto list_type = LogicalType::LIST(result.type); |
673 | 0 | ParquetColumnSchema list_schema(s_ele.name, std::move(list_type), max_define, max_repeat, this_idx, |
674 | 0 | next_file_idx); |
675 | 0 | list_schema.children.push_back(std::move(result)); |
676 | 0 | result = std::move(list_schema); |
677 | 0 | } |
678 | 0 | result.parent_schema_index = this_idx; |
679 | 0 | return result; |
680 | 0 | } else { // leaf node |
681 | 0 | if (!s_ele.__isset.type) { |
682 | 0 | throw InvalidInputException( |
683 | 0 | "Node '%s' has neither num_children nor type set - this violates the Parquet spec (corrupted file)", |
684 | 0 | s_ele.name.c_str()); |
685 | 0 | } |
686 | 0 | auto result = ParseColumnSchema(s_ele, max_define, max_repeat, this_idx, next_file_idx++); |
687 | 0 | if (s_ele.repetition_type == FieldRepetitionType::REPEATED) { |
688 | 0 | auto list_type = LogicalType::LIST(result.type); |
689 | 0 | ParquetColumnSchema list_schema(s_ele.name, std::move(list_type), max_define, max_repeat, this_idx, |
690 | 0 | next_file_idx); |
691 | 0 | list_schema.children.push_back(std::move(result)); |
692 | 0 | return list_schema; |
693 | 0 | } |
694 | | |
695 | 0 | return result; |
696 | 0 | } |
697 | 0 | } |
698 | | |
699 | 0 | static ParquetColumnSchema FileRowNumberSchema() { |
700 | 0 | return ParquetColumnSchema("file_row_number", LogicalType::BIGINT, 0, 0, 0, 0, |
701 | 0 | ParquetColumnSchemaType::FILE_ROW_NUMBER); |
702 | 0 | } |
703 | | |
704 | 0 | unique_ptr<ParquetColumnSchema> ParquetReader::ParseSchema(ClientContext &context) { |
705 | 0 | auto file_meta_data = GetFileMetadata(); |
706 | 0 | idx_t next_schema_idx = 0; |
707 | 0 | idx_t next_file_idx = 0; |
708 | |
|
709 | 0 | if (file_meta_data->schema.empty()) { |
710 | 0 | throw IOException("Failed to read Parquet file \"%s\": no schema elements found", file.path); |
711 | 0 | } |
712 | 0 | if (file_meta_data->schema[0].num_children == 0) { |
713 | 0 | throw IOException("Failed to read Parquet file \"%s\": root schema element has no children", file.path); |
714 | 0 | } |
715 | 0 | auto root = ParseSchemaRecursive(0, 0, 0, next_schema_idx, next_file_idx, context); |
716 | 0 | if (root.type.id() != LogicalTypeId::STRUCT) { |
717 | 0 | throw InvalidInputException("Failed to read Parquet file \"%s\": Root element of Parquet file must be a struct", |
718 | 0 | file.path); |
719 | 0 | } |
720 | 0 | D_ASSERT(next_schema_idx == file_meta_data->schema.size() - 1); |
721 | 0 | if (!file_meta_data->row_groups.empty() && next_file_idx != file_meta_data->row_groups[0].columns.size()) { |
722 | 0 | throw InvalidInputException("Failed to read Parquet file \"%s\": row group does not have enough columns", |
723 | 0 | file.path); |
724 | 0 | } |
725 | 0 | if (parquet_options.file_row_number) { |
726 | 0 | for (auto &column : root.children) { |
727 | 0 | auto &name = column.name; |
728 | 0 | if (StringUtil::CIEquals(name, "file_row_number")) { |
729 | 0 | throw BinderException("Failed to read Parquet file \"%s\": Using file_row_number option on file with " |
730 | 0 | "column named file_row_number is not supported", |
731 | 0 | file.path); |
732 | 0 | } |
733 | 0 | } |
734 | 0 | root.children.push_back(FileRowNumberSchema()); |
735 | 0 | } |
736 | 0 | return make_uniq<ParquetColumnSchema>(root); |
737 | 0 | } |
738 | | |
739 | | MultiFileColumnDefinition ParquetReader::ParseColumnDefinition(const FileMetaData &file_meta_data, |
740 | 0 | ParquetColumnSchema &element) { |
741 | 0 | MultiFileColumnDefinition result(element.name, element.type); |
742 | 0 | if (element.schema_type == ParquetColumnSchemaType::FILE_ROW_NUMBER) { |
743 | 0 | result.identifier = Value::INTEGER(MultiFileReader::ORDINAL_FIELD_ID); |
744 | 0 | return result; |
745 | 0 | } |
746 | 0 | auto &column_schema = file_meta_data.schema[element.schema_index]; |
747 | |
|
748 | 0 | if (column_schema.__isset.field_id) { |
749 | 0 | result.identifier = Value::INTEGER(column_schema.field_id); |
750 | 0 | } else if (element.parent_schema_index.IsValid()) { |
751 | 0 | auto &parent_column_schema = file_meta_data.schema[element.parent_schema_index.GetIndex()]; |
752 | 0 | if (parent_column_schema.__isset.field_id) { |
753 | 0 | result.identifier = Value::INTEGER(parent_column_schema.field_id); |
754 | 0 | } |
755 | 0 | } |
756 | 0 | for (auto &child : element.children) { |
757 | 0 | result.children.push_back(ParseColumnDefinition(file_meta_data, child)); |
758 | 0 | } |
759 | 0 | return result; |
760 | 0 | } |
761 | | |
762 | 0 | void ParquetReader::InitializeSchema(ClientContext &context) { |
763 | 0 | auto file_meta_data = GetFileMetadata(); |
764 | |
|
765 | 0 | if (file_meta_data->__isset.encryption_algorithm) { |
766 | 0 | if (file_meta_data->encryption_algorithm.__isset.AES_GCM_CTR_V1) { |
767 | 0 | throw InvalidInputException("File '%s' is encrypted with AES_GCM_CTR_V1, but only AES_GCM_V1 is supported", |
768 | 0 | GetFileName()); |
769 | 0 | } |
770 | 0 | } |
771 | | // check if we like this schema |
772 | 0 | if (file_meta_data->schema.size() < 2) { |
773 | 0 | throw InvalidInputException("Failed to read Parquet file '%s': Need at least one non-root column in the file", |
774 | 0 | GetFileName()); |
775 | 0 | } |
776 | 0 | root_schema = ParseSchema(context); |
777 | 0 | for (idx_t i = 0; i < root_schema->children.size(); i++) { |
778 | 0 | auto &element = root_schema->children[i]; |
779 | 0 | columns.push_back(ParseColumnDefinition(*file_meta_data, element)); |
780 | 0 | } |
781 | 0 | } |
782 | | |
783 | 0 | void ParquetReader::AddVirtualColumn(column_t virtual_column_id) { |
784 | 0 | if (virtual_column_id == MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER) { |
785 | 0 | root_schema->children.push_back(FileRowNumberSchema()); |
786 | 0 | } else { |
787 | 0 | throw InternalException("Unsupported virtual column id %d for parquet reader", virtual_column_id); |
788 | 0 | } |
789 | 0 | } |
790 | | |
791 | 0 | ParquetOptions::ParquetOptions(ClientContext &context) { |
792 | 0 | Value lookup_value; |
793 | 0 | if (context.TryGetCurrentSetting("binary_as_string", lookup_value)) { |
794 | 0 | binary_as_string = lookup_value.GetValue<bool>(); |
795 | 0 | } |
796 | 0 | } |
797 | | |
798 | 0 | ParquetColumnDefinition ParquetColumnDefinition::FromSchemaValue(ClientContext &context, const Value &column_value) { |
799 | 0 | ParquetColumnDefinition result; |
800 | 0 | auto &identifier = StructValue::GetChildren(column_value)[0]; |
801 | 0 | result.identifier = identifier; |
802 | |
|
803 | 0 | const auto &column_def = StructValue::GetChildren(column_value)[1]; |
804 | 0 | D_ASSERT(column_def.type().id() == LogicalTypeId::STRUCT); |
805 | |
|
806 | 0 | const auto children = StructValue::GetChildren(column_def); |
807 | 0 | result.name = StringValue::Get(children[0]); |
808 | 0 | result.type = TransformStringToLogicalType(StringValue::Get(children[1])); |
809 | 0 | string error_message; |
810 | 0 | if (!children[2].TryCastAs(context, result.type, result.default_value, &error_message)) { |
811 | 0 | throw BinderException("Unable to cast Parquet schema default_value \"%s\" to %s", children[2].ToString(), |
812 | 0 | result.type.ToString()); |
813 | 0 | } |
814 | | |
815 | 0 | return result; |
816 | 0 | } |
817 | | |
818 | | ParquetReader::ParquetReader(ClientContext &context_p, OpenFileInfo file_p, ParquetOptions parquet_options_p, |
819 | | shared_ptr<ParquetFileMetadataCache> metadata_p) |
820 | 0 | : BaseFileReader(std::move(file_p)), fs(CachingFileSystem::Get(context_p)), |
821 | 0 | allocator(BufferAllocator::Get(context_p)), parquet_options(std::move(parquet_options_p)) { |
822 | 0 | file_handle = fs.OpenFile(context_p, file, FileFlags::FILE_FLAGS_READ); |
823 | 0 | if (!file_handle->CanSeek()) { |
824 | 0 | throw NotImplementedException( |
825 | 0 | "Reading parquet files from a FIFO stream is not supported and cannot be efficiently supported since " |
826 | 0 | "metadata is located at the end of the file. Write the stream to disk first and read from there instead."); |
827 | 0 | } |
828 | | |
829 | | // read the extended file open info (if any) |
830 | 0 | optional_idx footer_size; |
831 | 0 | if (file.extended_info) { |
832 | 0 | auto &open_options = file.extended_info->options; |
833 | 0 | auto encryption_entry = file.extended_info->options.find("encryption_key"); |
834 | 0 | if (encryption_entry != open_options.end()) { |
835 | 0 | parquet_options.encryption_config = |
836 | 0 | make_shared_ptr<ParquetEncryptionConfig>(StringValue::Get(encryption_entry->second)); |
837 | 0 | } |
838 | 0 | auto footer_entry = file.extended_info->options.find("footer_size"); |
839 | 0 | if (footer_entry != open_options.end()) { |
840 | 0 | footer_size = UBigIntValue::Get(footer_entry->second); |
841 | 0 | } |
842 | 0 | } |
843 | | // set pointer to factory method for AES state |
844 | 0 | auto &config = DBConfig::GetConfig(context_p); |
845 | 0 | if (config.encryption_util && parquet_options.debug_use_openssl) { |
846 | 0 | encryption_util = config.encryption_util; |
847 | 0 | } else { |
848 | 0 | encryption_util = make_shared_ptr<duckdb_mbedtls::MbedTlsWrapper::AESStateMBEDTLSFactory>(); |
849 | 0 | } |
850 | | // If metadata cached is disabled |
851 | | // or if this file has cached metadata |
852 | | // or if the cached version already expired |
853 | 0 | if (!metadata_p) { |
854 | 0 | if (!MetadataCacheEnabled(context_p)) { |
855 | 0 | metadata = LoadMetadata(context_p, allocator, *file_handle, parquet_options.encryption_config, |
856 | 0 | *encryption_util, footer_size); |
857 | 0 | } else { |
858 | 0 | metadata = ObjectCache::GetObjectCache(context_p).Get<ParquetFileMetadataCache>(file.path); |
859 | 0 | if (!metadata || !metadata->IsValid(*file_handle)) { |
860 | 0 | metadata = LoadMetadata(context_p, allocator, *file_handle, parquet_options.encryption_config, |
861 | 0 | *encryption_util, footer_size); |
862 | 0 | ObjectCache::GetObjectCache(context_p).Put(file.path, metadata); |
863 | 0 | } |
864 | 0 | } |
865 | 0 | } else { |
866 | 0 | metadata = std::move(metadata_p); |
867 | 0 | } |
868 | 0 | InitializeSchema(context_p); |
869 | 0 | } |
870 | | |
871 | 0 | bool ParquetReader::MetadataCacheEnabled(ClientContext &context) { |
872 | 0 | Value metadata_cache = false; |
873 | 0 | context.TryGetCurrentSetting("parquet_metadata_cache", metadata_cache); |
874 | 0 | return metadata_cache.GetValue<bool>(); |
875 | 0 | } |
876 | | |
877 | | shared_ptr<ParquetFileMetadataCache> ParquetReader::GetMetadataCacheEntry(ClientContext &context, |
878 | 0 | const OpenFileInfo &file) { |
879 | 0 | return ObjectCache::GetObjectCache(context).Get<ParquetFileMetadataCache>(file.path); |
880 | 0 | } |
881 | | |
882 | 0 | ParquetUnionData::~ParquetUnionData() { |
883 | 0 | } |
884 | | |
885 | 0 | unique_ptr<BaseStatistics> ParquetUnionData::GetStatistics(ClientContext &context, const string &name) { |
886 | 0 | if (reader) { |
887 | 0 | return reader->Cast<ParquetReader>().GetStatistics(context, name); |
888 | 0 | } |
889 | 0 | return ParquetReader::ReadStatistics(*this, name); |
890 | 0 | } |
891 | | |
892 | | ParquetReader::ParquetReader(ClientContext &context_p, ParquetOptions parquet_options_p, |
893 | | shared_ptr<ParquetFileMetadataCache> metadata_p) |
894 | 0 | : BaseFileReader(string()), fs(CachingFileSystem::Get(context_p)), allocator(BufferAllocator::Get(context_p)), |
895 | 0 | metadata(std::move(metadata_p)), parquet_options(std::move(parquet_options_p)), rows_read(0) { |
896 | 0 | InitializeSchema(context_p); |
897 | 0 | } |
898 | | |
899 | 0 | ParquetReader::~ParquetReader() { |
900 | 0 | } |
901 | | |
902 | 0 | const FileMetaData *ParquetReader::GetFileMetadata() const { |
903 | 0 | D_ASSERT(metadata); |
904 | 0 | D_ASSERT(metadata->metadata); |
905 | 0 | return metadata->metadata.get(); |
906 | 0 | } |
907 | | |
908 | | static unique_ptr<BaseStatistics> ReadStatisticsInternal(const FileMetaData &file_meta_data, |
909 | | const ParquetColumnSchema &root_schema, |
910 | | const ParquetOptions &parquet_options, |
911 | 0 | const idx_t &file_col_idx) { |
912 | 0 | unique_ptr<BaseStatistics> column_stats; |
913 | 0 | auto &column_schema = root_schema.children[file_col_idx]; |
914 | |
|
915 | 0 | for (idx_t row_group_idx = 0; row_group_idx < file_meta_data.row_groups.size(); row_group_idx++) { |
916 | 0 | auto &row_group = file_meta_data.row_groups[row_group_idx]; |
917 | 0 | auto chunk_stats = column_schema.Stats(file_meta_data, parquet_options, row_group_idx, row_group.columns); |
918 | 0 | if (!chunk_stats) { |
919 | 0 | return nullptr; |
920 | 0 | } |
921 | 0 | if (!column_stats) { |
922 | 0 | column_stats = std::move(chunk_stats); |
923 | 0 | } else { |
924 | 0 | column_stats->Merge(*chunk_stats); |
925 | 0 | } |
926 | 0 | } |
927 | 0 | return column_stats; |
928 | 0 | } |
929 | | |
930 | 0 | unique_ptr<BaseStatistics> ParquetReader::ReadStatistics(const string &name) { |
931 | 0 | idx_t file_col_idx; |
932 | 0 | for (file_col_idx = 0; file_col_idx < columns.size(); file_col_idx++) { |
933 | 0 | if (columns[file_col_idx].name == name) { |
934 | 0 | break; |
935 | 0 | } |
936 | 0 | } |
937 | 0 | if (file_col_idx == columns.size()) { |
938 | 0 | return nullptr; |
939 | 0 | } |
940 | | |
941 | 0 | return ReadStatisticsInternal(*GetFileMetadata(), *root_schema, parquet_options, file_col_idx); |
942 | 0 | } |
943 | | |
944 | | unique_ptr<BaseStatistics> ParquetReader::ReadStatistics(ClientContext &context, ParquetOptions parquet_options, |
945 | | shared_ptr<ParquetFileMetadataCache> metadata, |
946 | 0 | const string &name) { |
947 | 0 | ParquetReader reader(context, std::move(parquet_options), std::move(metadata)); |
948 | 0 | return reader.ReadStatistics(name); |
949 | 0 | } |
950 | | |
951 | 0 | unique_ptr<BaseStatistics> ParquetReader::ReadStatistics(const ParquetUnionData &union_data, const string &name) { |
952 | 0 | const auto &col_names = union_data.names; |
953 | |
|
954 | 0 | idx_t file_col_idx; |
955 | 0 | for (file_col_idx = 0; file_col_idx < col_names.size(); file_col_idx++) { |
956 | 0 | if (col_names[file_col_idx] == name) { |
957 | 0 | break; |
958 | 0 | } |
959 | 0 | } |
960 | 0 | if (file_col_idx == col_names.size()) { |
961 | 0 | return nullptr; |
962 | 0 | } |
963 | | |
964 | 0 | return ReadStatisticsInternal(*union_data.metadata->metadata, *union_data.root_schema, union_data.options, |
965 | 0 | file_col_idx); |
966 | 0 | } |
967 | | |
968 | 0 | uint32_t ParquetReader::Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) { |
969 | 0 | if (parquet_options.encryption_config) { |
970 | 0 | return ParquetCrypto::Read(object, iprot, parquet_options.encryption_config->GetFooterKey(), *encryption_util); |
971 | 0 | } else { |
972 | 0 | return object.read(&iprot); |
973 | 0 | } |
974 | 0 | } |
975 | | |
976 | | uint32_t ParquetReader::ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer, |
977 | 0 | const uint32_t buffer_size) { |
978 | 0 | if (parquet_options.encryption_config) { |
979 | 0 | return ParquetCrypto::ReadData(iprot, buffer, buffer_size, parquet_options.encryption_config->GetFooterKey(), |
980 | 0 | *encryption_util); |
981 | 0 | } else { |
982 | 0 | return iprot.getTransport()->read(buffer, buffer_size); |
983 | 0 | } |
984 | 0 | } |
985 | | |
986 | 0 | static idx_t GetRowGroupOffset(ParquetReader &reader, idx_t group_idx) { |
987 | 0 | idx_t row_group_offset = 0; |
988 | 0 | auto &row_groups = reader.GetFileMetadata()->row_groups; |
989 | 0 | for (idx_t i = 0; i < group_idx; i++) { |
990 | 0 | row_group_offset += row_groups[i].num_rows; |
991 | 0 | } |
992 | 0 | return row_group_offset; |
993 | 0 | } |
994 | | |
995 | 0 | const ParquetRowGroup &ParquetReader::GetGroup(ParquetReaderScanState &state) { |
996 | 0 | auto file_meta_data = GetFileMetadata(); |
997 | 0 | D_ASSERT(state.current_group >= 0 && (idx_t)state.current_group < state.group_idx_list.size()); |
998 | 0 | D_ASSERT(state.group_idx_list[state.current_group] < file_meta_data->row_groups.size()); |
999 | 0 | return file_meta_data->row_groups[state.group_idx_list[state.current_group]]; |
1000 | 0 | } |
1001 | | |
1002 | 0 | uint64_t ParquetReader::GetGroupCompressedSize(ParquetReaderScanState &state) { |
1003 | 0 | const auto &group = GetGroup(state); |
1004 | 0 | int64_t total_compressed_size = group.__isset.total_compressed_size ? group.total_compressed_size : 0; |
1005 | |
|
1006 | 0 | idx_t calc_compressed_size = 0; |
1007 | | |
1008 | | // If the global total_compressed_size is not set, we can still calculate it |
1009 | 0 | if (group.total_compressed_size == 0) { |
1010 | 0 | for (auto &column_chunk : group.columns) { |
1011 | 0 | calc_compressed_size += column_chunk.meta_data.total_compressed_size; |
1012 | 0 | } |
1013 | 0 | } |
1014 | |
|
1015 | 0 | if (total_compressed_size != 0 && calc_compressed_size != 0 && |
1016 | 0 | (idx_t)total_compressed_size != calc_compressed_size) { |
1017 | 0 | throw InvalidInputException( |
1018 | 0 | "Failed to read file \"%s\": mismatch between calculated compressed size and reported compressed size", |
1019 | 0 | GetFileName()); |
1020 | 0 | } |
1021 | | |
1022 | 0 | return total_compressed_size ? total_compressed_size : calc_compressed_size; |
1023 | 0 | } |
1024 | | |
1025 | 0 | uint64_t ParquetReader::GetGroupSpan(ParquetReaderScanState &state) { |
1026 | 0 | auto &group = GetGroup(state); |
1027 | 0 | idx_t min_offset = NumericLimits<idx_t>::Maximum(); |
1028 | 0 | idx_t max_offset = NumericLimits<idx_t>::Minimum(); |
1029 | |
|
1030 | 0 | for (auto &column_chunk : group.columns) { |
1031 | | // Set the min offset |
1032 | 0 | idx_t current_min_offset = NumericLimits<idx_t>::Maximum(); |
1033 | 0 | if (column_chunk.meta_data.__isset.dictionary_page_offset) { |
1034 | 0 | current_min_offset = MinValue<idx_t>(current_min_offset, column_chunk.meta_data.dictionary_page_offset); |
1035 | 0 | } |
1036 | 0 | if (column_chunk.meta_data.__isset.index_page_offset) { |
1037 | 0 | current_min_offset = MinValue<idx_t>(current_min_offset, column_chunk.meta_data.index_page_offset); |
1038 | 0 | } |
1039 | 0 | current_min_offset = MinValue<idx_t>(current_min_offset, column_chunk.meta_data.data_page_offset); |
1040 | 0 | min_offset = MinValue<idx_t>(current_min_offset, min_offset); |
1041 | 0 | max_offset = MaxValue<idx_t>(max_offset, column_chunk.meta_data.total_compressed_size + current_min_offset); |
1042 | 0 | } |
1043 | |
|
1044 | 0 | return max_offset - min_offset; |
1045 | 0 | } |
1046 | | |
1047 | 0 | idx_t ParquetReader::GetGroupOffset(ParquetReaderScanState &state) { |
1048 | 0 | auto &group = GetGroup(state); |
1049 | 0 | idx_t min_offset = NumericLimits<idx_t>::Maximum(); |
1050 | |
|
1051 | 0 | for (auto &column_chunk : group.columns) { |
1052 | 0 | if (column_chunk.meta_data.__isset.dictionary_page_offset) { |
1053 | 0 | min_offset = MinValue<idx_t>(min_offset, column_chunk.meta_data.dictionary_page_offset); |
1054 | 0 | } |
1055 | 0 | if (column_chunk.meta_data.__isset.index_page_offset) { |
1056 | 0 | min_offset = MinValue<idx_t>(min_offset, column_chunk.meta_data.index_page_offset); |
1057 | 0 | } |
1058 | 0 | min_offset = MinValue<idx_t>(min_offset, column_chunk.meta_data.data_page_offset); |
1059 | 0 | } |
1060 | |
|
1061 | 0 | return min_offset; |
1062 | 0 | } |
1063 | | |
1064 | | static FilterPropagateResult CheckParquetStringFilter(BaseStatistics &stats, const Statistics &pq_col_stats, |
1065 | 0 | TableFilter &filter) { |
1066 | 0 | switch (filter.filter_type) { |
1067 | 0 | case TableFilterType::CONJUNCTION_AND: { |
1068 | 0 | auto &conjunction_filter = filter.Cast<ConjunctionAndFilter>(); |
1069 | 0 | auto and_result = FilterPropagateResult::FILTER_ALWAYS_TRUE; |
1070 | 0 | for (auto &child_filter : conjunction_filter.child_filters) { |
1071 | 0 | auto child_prune_result = CheckParquetStringFilter(stats, pq_col_stats, *child_filter); |
1072 | 0 | if (child_prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) { |
1073 | 0 | return FilterPropagateResult::FILTER_ALWAYS_FALSE; |
1074 | 0 | } |
1075 | 0 | if (child_prune_result != and_result) { |
1076 | 0 | and_result = FilterPropagateResult::NO_PRUNING_POSSIBLE; |
1077 | 0 | } |
1078 | 0 | } |
1079 | 0 | return and_result; |
1080 | 0 | } |
1081 | 0 | case TableFilterType::CONSTANT_COMPARISON: { |
1082 | 0 | auto &constant_filter = filter.Cast<ConstantFilter>(); |
1083 | 0 | auto &min_value = pq_col_stats.min_value; |
1084 | 0 | auto &max_value = pq_col_stats.max_value; |
1085 | 0 | return StringStats::CheckZonemap(const_data_ptr_cast(min_value.c_str()), min_value.size(), |
1086 | 0 | const_data_ptr_cast(max_value.c_str()), max_value.size(), |
1087 | 0 | constant_filter.comparison_type, StringValue::Get(constant_filter.constant)); |
1088 | 0 | } |
1089 | 0 | default: |
1090 | 0 | return filter.CheckStatistics(stats); |
1091 | 0 | } |
1092 | 0 | } |
1093 | | |
1094 | | static FilterPropagateResult CheckParquetFloatFilter(ColumnReader &reader, const Statistics &pq_col_stats, |
1095 | 0 | TableFilter &filter) { |
1096 | | // floating point values can have values in the [min, max] domain AND nan values |
1097 | | // check both stats against the filter |
1098 | 0 | auto &type = reader.Type(); |
1099 | 0 | auto nan_stats = NumericStats::CreateUnknown(type); |
1100 | 0 | auto nan_value = Value("nan").DefaultCastAs(type); |
1101 | 0 | NumericStats::SetMin(nan_stats, nan_value); |
1102 | 0 | NumericStats::SetMax(nan_stats, nan_value); |
1103 | 0 | auto nan_prune = filter.CheckStatistics(nan_stats); |
1104 | |
|
1105 | 0 | auto min_max_stats = ParquetStatisticsUtils::CreateNumericStats(reader.Type(), reader.Schema(), pq_col_stats); |
1106 | 0 | auto prune = filter.CheckStatistics(*min_max_stats); |
1107 | | |
1108 | | // if EITHER of them cannot be pruned - we cannot prune |
1109 | 0 | if (prune == FilterPropagateResult::NO_PRUNING_POSSIBLE || |
1110 | 0 | nan_prune == FilterPropagateResult::NO_PRUNING_POSSIBLE) { |
1111 | 0 | return FilterPropagateResult::NO_PRUNING_POSSIBLE; |
1112 | 0 | } |
1113 | | // if both are the same we can return that value |
1114 | 0 | if (prune == nan_prune) { |
1115 | 0 | return prune; |
1116 | 0 | } |
1117 | | // if they are different we need to return that we cannot prune |
1118 | | // e.g. prune = always false, nan_prune = always true -> we don't know |
1119 | 0 | return FilterPropagateResult::NO_PRUNING_POSSIBLE; |
1120 | 0 | } |
1121 | | |
1122 | 0 | void ParquetReader::PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t i) { |
1123 | 0 | auto &group = GetGroup(state); |
1124 | 0 | auto col_idx = MultiFileLocalIndex(i); |
1125 | 0 | auto column_id = column_ids[col_idx]; |
1126 | 0 | auto &column_reader = state.root_reader->Cast<StructColumnReader>().GetChildReader(column_id); |
1127 | |
|
1128 | 0 | if (filters) { |
1129 | 0 | auto stats = column_reader.Stats(state.group_idx_list[state.current_group], group.columns); |
1130 | | // filters contain output chunk index, not file col idx! |
1131 | 0 | auto filter_entry = filters->filters.find(col_idx); |
1132 | 0 | if (stats && filter_entry != filters->filters.end()) { |
1133 | 0 | auto &filter = *filter_entry->second; |
1134 | |
|
1135 | 0 | FilterPropagateResult prune_result; |
1136 | 0 | bool is_generated_column = column_reader.ColumnIndex() >= group.columns.size(); |
1137 | 0 | bool is_column = column_reader.Schema().schema_type == ParquetColumnSchemaType::COLUMN; |
1138 | 0 | bool is_expression = column_reader.Schema().schema_type == ParquetColumnSchemaType::EXPRESSION; |
1139 | 0 | bool has_min_max = false; |
1140 | 0 | if (!is_generated_column) { |
1141 | 0 | has_min_max = group.columns[column_reader.ColumnIndex()].meta_data.statistics.__isset.min_value && |
1142 | 0 | group.columns[column_reader.ColumnIndex()].meta_data.statistics.__isset.max_value; |
1143 | 0 | } |
1144 | 0 | if (is_expression) { |
1145 | | // no pruning possible for expressions |
1146 | 0 | prune_result = FilterPropagateResult::NO_PRUNING_POSSIBLE; |
1147 | 0 | } else if (!is_generated_column && has_min_max && column_reader.Type().id() == LogicalTypeId::VARCHAR) { |
1148 | | // our StringStats only store the first 8 bytes of strings (even if Parquet has longer string stats) |
1149 | | // however, when reading remote Parquet files, skipping row groups is really important |
1150 | | // here, we implement a special case to check the full length for string filters |
1151 | 0 | prune_result = CheckParquetStringFilter( |
1152 | 0 | *stats, group.columns[column_reader.ColumnIndex()].meta_data.statistics, filter); |
1153 | 0 | } else if (!is_generated_column && has_min_max && |
1154 | 0 | (column_reader.Type().id() == LogicalTypeId::FLOAT || |
1155 | 0 | column_reader.Type().id() == LogicalTypeId::DOUBLE) && |
1156 | 0 | parquet_options.can_have_nan) { |
1157 | | // floating point columns can have NaN values in addition to the min/max bounds defined in the file |
1158 | | // in order to do optimal pruning - we prune based on the [min, max] of the file followed by pruning |
1159 | | // based on nan |
1160 | 0 | prune_result = CheckParquetFloatFilter( |
1161 | 0 | column_reader, group.columns[column_reader.ColumnIndex()].meta_data.statistics, filter); |
1162 | 0 | } else { |
1163 | 0 | prune_result = filter.CheckStatistics(*stats); |
1164 | 0 | } |
1165 | | // check the bloom filter if present |
1166 | 0 | if (prune_result == FilterPropagateResult::NO_PRUNING_POSSIBLE && !column_reader.Type().IsNested() && |
1167 | 0 | is_column && ParquetStatisticsUtils::BloomFilterSupported(column_reader.Type().id()) && |
1168 | 0 | ParquetStatisticsUtils::BloomFilterExcludes(filter, |
1169 | 0 | group.columns[column_reader.ColumnIndex()].meta_data, |
1170 | 0 | *state.thrift_file_proto, allocator)) { |
1171 | 0 | prune_result = FilterPropagateResult::FILTER_ALWAYS_FALSE; |
1172 | 0 | } |
1173 | |
|
1174 | 0 | if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) { |
1175 | | // this effectively will skip this chunk |
1176 | 0 | state.offset_in_group = group.num_rows; |
1177 | 0 | return; |
1178 | 0 | } |
1179 | 0 | } |
1180 | 0 | } |
1181 | | |
1182 | 0 | state.root_reader->InitializeRead(state.group_idx_list[state.current_group], group.columns, |
1183 | 0 | *state.thrift_file_proto); |
1184 | 0 | } |
1185 | | |
1186 | 0 | idx_t ParquetReader::NumRows() const { |
1187 | 0 | return GetFileMetadata()->num_rows; |
1188 | 0 | } |
1189 | | |
1190 | 0 | idx_t ParquetReader::NumRowGroups() const { |
1191 | 0 | return GetFileMetadata()->row_groups.size(); |
1192 | 0 | } |
1193 | | |
1194 | | ParquetScanFilter::ParquetScanFilter(ClientContext &context, idx_t filter_idx, TableFilter &filter) |
1195 | 0 | : filter_idx(filter_idx), filter(filter) { |
1196 | 0 | filter_state = TableFilterState::Initialize(context, filter); |
1197 | 0 | } |
1198 | | |
1199 | 0 | ParquetScanFilter::~ParquetScanFilter() { |
1200 | 0 | } |
1201 | | |
1202 | | void ParquetReader::InitializeScan(ClientContext &context, ParquetReaderScanState &state, |
1203 | 0 | vector<idx_t> groups_to_read) { |
1204 | 0 | state.current_group = -1; |
1205 | 0 | state.finished = false; |
1206 | 0 | state.offset_in_group = 0; |
1207 | 0 | state.group_idx_list = std::move(groups_to_read); |
1208 | 0 | state.sel.Initialize(STANDARD_VECTOR_SIZE); |
1209 | 0 | if (!state.file_handle || state.file_handle->GetPath() != file_handle->GetPath()) { |
1210 | 0 | auto flags = FileFlags::FILE_FLAGS_READ; |
1211 | 0 | if (ShouldAndCanPrefetch(context, *file_handle)) { |
1212 | 0 | state.prefetch_mode = true; |
1213 | 0 | if (file_handle->IsRemoteFile()) { |
1214 | 0 | flags |= FileFlags::FILE_FLAGS_DIRECT_IO; |
1215 | 0 | } |
1216 | 0 | } else { |
1217 | 0 | state.prefetch_mode = false; |
1218 | 0 | } |
1219 | |
|
1220 | 0 | state.file_handle = fs.OpenFile(context, file, flags); |
1221 | 0 | } |
1222 | 0 | state.adaptive_filter.reset(); |
1223 | 0 | state.scan_filters.clear(); |
1224 | 0 | if (filters) { |
1225 | 0 | state.adaptive_filter = make_uniq<AdaptiveFilter>(*filters); |
1226 | 0 | for (auto &entry : filters->filters) { |
1227 | 0 | state.scan_filters.emplace_back(context, entry.first, *entry.second); |
1228 | 0 | } |
1229 | 0 | } |
1230 | |
|
1231 | 0 | state.thrift_file_proto = CreateThriftFileProtocol(context, *state.file_handle, state.prefetch_mode); |
1232 | 0 | state.root_reader = CreateReader(context); |
1233 | 0 | state.define_buf.resize(allocator, STANDARD_VECTOR_SIZE); |
1234 | 0 | state.repeat_buf.resize(allocator, STANDARD_VECTOR_SIZE); |
1235 | 0 | } |
1236 | | |
1237 | 0 | void ParquetReader::GetPartitionStats(vector<PartitionStatistics> &result) { |
1238 | 0 | GetPartitionStats(*GetFileMetadata(), result); |
1239 | 0 | } |
1240 | | |
1241 | | void ParquetReader::GetPartitionStats(const duckdb_parquet::FileMetaData &metadata, |
1242 | 0 | vector<PartitionStatistics> &result) { |
1243 | 0 | idx_t offset = 0; |
1244 | 0 | for (auto &row_group : metadata.row_groups) { |
1245 | 0 | PartitionStatistics partition_stats; |
1246 | 0 | partition_stats.row_start = offset; |
1247 | 0 | partition_stats.count = row_group.num_rows; |
1248 | 0 | partition_stats.count_type = CountType::COUNT_EXACT; |
1249 | 0 | offset += row_group.num_rows; |
1250 | 0 | result.push_back(partition_stats); |
1251 | 0 | } |
1252 | 0 | } |
1253 | | |
1254 | 0 | AsyncResult ParquetReader::Scan(ClientContext &context, ParquetReaderScanState &state, DataChunk &result) { |
1255 | 0 | result.Reset(); |
1256 | 0 | if (state.finished) { |
1257 | 0 | return SourceResultType::FINISHED; |
1258 | 0 | } |
1259 | | |
1260 | | // see if we have to switch to the next row group in the parquet file |
1261 | 0 | if (state.current_group < 0 || (int64_t)state.offset_in_group >= GetGroup(state).num_rows) { |
1262 | 0 | state.current_group++; |
1263 | 0 | state.offset_in_group = 0; |
1264 | |
|
1265 | 0 | auto &trans = reinterpret_cast<ThriftFileTransport &>(*state.thrift_file_proto->getTransport()); |
1266 | 0 | trans.ClearPrefetch(); |
1267 | 0 | state.current_group_prefetched = false; |
1268 | |
|
1269 | 0 | if ((idx_t)state.current_group == state.group_idx_list.size()) { |
1270 | 0 | state.finished = true; |
1271 | 0 | return SourceResultType::FINISHED; |
1272 | 0 | } |
1273 | | |
1274 | | // TODO: only need this if we have a deletion vector? |
1275 | 0 | state.group_offset = GetRowGroupOffset(state.root_reader->Reader(), state.group_idx_list[state.current_group]); |
1276 | |
|
1277 | 0 | uint64_t to_scan_compressed_bytes = 0; |
1278 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
1279 | 0 | auto col_idx = MultiFileLocalIndex(i); |
1280 | 0 | PrepareRowGroupBuffer(state, col_idx); |
1281 | |
|
1282 | 0 | auto file_col_idx = column_ids[col_idx]; |
1283 | |
|
1284 | 0 | auto &root_reader = state.root_reader->Cast<StructColumnReader>(); |
1285 | 0 | to_scan_compressed_bytes += root_reader.GetChildReader(file_col_idx).TotalCompressedSize(); |
1286 | 0 | } |
1287 | |
|
1288 | 0 | auto &group = GetGroup(state); |
1289 | 0 | if (state.op) { |
1290 | 0 | DUCKDB_LOG(context, PhysicalOperatorLogType, *state.op, "ParquetReader", |
1291 | 0 | state.offset_in_group == (idx_t)group.num_rows ? "SkipRowGroup" : "ReadRowGroup", |
1292 | 0 | {{"file", file.path}, {"row_group_id", to_string(state.group_idx_list[state.current_group])}}); |
1293 | 0 | } |
1294 | |
|
1295 | 0 | if (state.prefetch_mode && state.offset_in_group != (idx_t)group.num_rows) { |
1296 | 0 | uint64_t total_row_group_span = GetGroupSpan(state); |
1297 | |
|
1298 | 0 | double scan_percentage = (double)(to_scan_compressed_bytes) / static_cast<double>(total_row_group_span); |
1299 | |
|
1300 | 0 | if (to_scan_compressed_bytes > total_row_group_span) { |
1301 | 0 | throw IOException( |
1302 | 0 | "The parquet file '%s' seems to have incorrectly set page offsets. This interferes with DuckDB's " |
1303 | 0 | "prefetching optimization. DuckDB may still be able to scan this file by manually disabling the " |
1304 | 0 | "prefetching mechanism using: 'SET disable_parquet_prefetching=true'.", |
1305 | 0 | GetFileName()); |
1306 | 0 | } |
1307 | | |
1308 | 0 | if (!filters && scan_percentage > ParquetReaderPrefetchConfig::WHOLE_GROUP_PREFETCH_MINIMUM_SCAN) { |
1309 | | // Prefetch the whole row group |
1310 | 0 | if (!state.current_group_prefetched) { |
1311 | 0 | auto total_compressed_size = GetGroupCompressedSize(state); |
1312 | 0 | if (total_compressed_size > 0) { |
1313 | 0 | trans.Prefetch(GetGroupOffset(state), total_row_group_span); |
1314 | 0 | } |
1315 | 0 | state.current_group_prefetched = true; |
1316 | 0 | } |
1317 | 0 | } else { |
1318 | | // lazy fetching is when all tuples in a column can be skipped. With lazy fetching the buffer is only |
1319 | | // fetched on the first read to that buffer. |
1320 | 0 | bool lazy_fetch = filters != nullptr; |
1321 | | |
1322 | | // Prefetch column-wise |
1323 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
1324 | 0 | auto col_idx = MultiFileLocalIndex(i); |
1325 | 0 | auto file_col_idx = column_ids[col_idx]; |
1326 | 0 | auto &root_reader = state.root_reader->Cast<StructColumnReader>(); |
1327 | |
|
1328 | 0 | bool has_filter = false; |
1329 | 0 | if (filters) { |
1330 | 0 | auto entry = filters->filters.find(col_idx); |
1331 | 0 | has_filter = entry != filters->filters.end(); |
1332 | 0 | } |
1333 | 0 | root_reader.GetChildReader(file_col_idx).RegisterPrefetch(trans, !(lazy_fetch && !has_filter)); |
1334 | 0 | } |
1335 | |
|
1336 | 0 | trans.FinalizeRegistration(); |
1337 | |
|
1338 | 0 | if (!lazy_fetch) { |
1339 | 0 | trans.PrefetchRegistered(); |
1340 | 0 | } |
1341 | 0 | } |
1342 | 0 | } |
1343 | 0 | result.Reset(); |
1344 | 0 | return SourceResultType::HAVE_MORE_OUTPUT; |
1345 | 0 | } |
1346 | | |
1347 | 0 | auto scan_count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, GetGroup(state).num_rows - state.offset_in_group); |
1348 | 0 | result.SetCardinality(scan_count); |
1349 | |
|
1350 | 0 | if (scan_count == 0) { |
1351 | 0 | state.finished = true; |
1352 | | // end of last group, we are done |
1353 | 0 | return SourceResultType::FINISHED; |
1354 | 0 | } |
1355 | | |
1356 | 0 | auto &deletion_filter = state.root_reader->Reader().deletion_filter; |
1357 | |
|
1358 | 0 | state.define_buf.zero(); |
1359 | 0 | state.repeat_buf.zero(); |
1360 | |
|
1361 | 0 | auto define_ptr = (uint8_t *)state.define_buf.ptr; |
1362 | 0 | auto repeat_ptr = (uint8_t *)state.repeat_buf.ptr; |
1363 | |
|
1364 | 0 | auto &root_reader = state.root_reader->Cast<StructColumnReader>(); |
1365 | |
|
1366 | 0 | if (filters || deletion_filter) { |
1367 | 0 | idx_t filter_count = result.size(); |
1368 | 0 | D_ASSERT(filter_count == scan_count); |
1369 | 0 | vector<bool> need_to_read(column_ids.size(), true); |
1370 | |
|
1371 | 0 | state.sel.Initialize(nullptr); |
1372 | 0 | D_ASSERT(!filters || state.scan_filters.size() == filters->filters.size()); |
1373 | |
|
1374 | 0 | bool is_first_filter = true; |
1375 | 0 | if (deletion_filter) { |
1376 | 0 | auto row_start = UnsafeNumericCast<row_t>(state.offset_in_group + state.group_offset); |
1377 | 0 | filter_count = deletion_filter->Filter(row_start, scan_count, state.sel); |
1378 | | //! FIXME: does this need to be set? |
1379 | | //! As part of 'DirectFilter' we also initialize reads of the child readers |
1380 | 0 | is_first_filter = false; |
1381 | 0 | } |
1382 | |
|
1383 | 0 | if (filters) { |
1384 | | // first load the columns that are used in filters |
1385 | 0 | auto filter_state = state.adaptive_filter->BeginFilter(); |
1386 | 0 | for (idx_t i = 0; i < state.scan_filters.size(); i++) { |
1387 | 0 | if (filter_count == 0) { |
1388 | | // if no rows are left we can stop checking filters |
1389 | 0 | break; |
1390 | 0 | } |
1391 | 0 | auto &scan_filter = state.scan_filters[state.adaptive_filter->permutation[i]]; |
1392 | 0 | auto local_idx = MultiFileLocalIndex(scan_filter.filter_idx); |
1393 | 0 | auto column_id = column_ids[local_idx]; |
1394 | |
|
1395 | 0 | auto &result_vector = result.data[local_idx.GetIndex()]; |
1396 | 0 | auto &child_reader = root_reader.GetChildReader(column_id); |
1397 | 0 | child_reader.Filter(scan_count, define_ptr, repeat_ptr, result_vector, scan_filter.filter, |
1398 | 0 | *scan_filter.filter_state, state.sel, filter_count, is_first_filter); |
1399 | 0 | need_to_read[local_idx.GetIndex()] = false; |
1400 | 0 | is_first_filter = false; |
1401 | 0 | } |
1402 | 0 | state.adaptive_filter->EndFilter(filter_state); |
1403 | 0 | } |
1404 | | |
1405 | | // we still may have to read some cols |
1406 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
1407 | 0 | auto col_idx = MultiFileLocalIndex(i); |
1408 | 0 | if (!need_to_read[col_idx]) { |
1409 | 0 | continue; |
1410 | 0 | } |
1411 | 0 | auto file_col_idx = column_ids[col_idx]; |
1412 | 0 | if (filter_count == 0) { |
1413 | 0 | root_reader.GetChildReader(file_col_idx).Skip(result.size()); |
1414 | 0 | continue; |
1415 | 0 | } |
1416 | 0 | auto &result_vector = result.data[i]; |
1417 | 0 | auto &child_reader = root_reader.GetChildReader(file_col_idx); |
1418 | 0 | child_reader.Select(result.size(), define_ptr, repeat_ptr, result_vector, state.sel, filter_count); |
1419 | 0 | } |
1420 | 0 | if (scan_count != filter_count) { |
1421 | 0 | result.Slice(state.sel, filter_count); |
1422 | 0 | } |
1423 | 0 | } else { |
1424 | 0 | for (idx_t i = 0; i < column_ids.size(); i++) { |
1425 | 0 | auto col_idx = MultiFileLocalIndex(i); |
1426 | 0 | auto file_col_idx = column_ids[col_idx]; |
1427 | 0 | auto &result_vector = result.data[i]; |
1428 | 0 | auto &child_reader = root_reader.GetChildReader(file_col_idx); |
1429 | 0 | auto rows_read = child_reader.Read(scan_count, define_ptr, repeat_ptr, result_vector); |
1430 | 0 | if (rows_read != scan_count) { |
1431 | 0 | throw InvalidInputException("Mismatch in parquet read for column %llu, expected %llu rows, got %llu", |
1432 | 0 | file_col_idx, scan_count, rows_read); |
1433 | 0 | } |
1434 | 0 | } |
1435 | 0 | } |
1436 | | |
1437 | 0 | rows_read += scan_count; |
1438 | 0 | state.offset_in_group += scan_count; |
1439 | 0 | return SourceResultType::HAVE_MORE_OUTPUT; |
1440 | 0 | } |
1441 | | |
1442 | | } // namespace duckdb |