Coverage Report

Created: 2025-11-15 07:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/extension/parquet/parquet_reader.cpp
Line
Count
Source
1
#include "parquet_reader.hpp"
2
3
#include "reader/boolean_column_reader.hpp"
4
#include "reader/callback_column_reader.hpp"
5
#include "column_reader.hpp"
6
#include "duckdb.hpp"
7
#include "reader/expression_column_reader.hpp"
8
#include "parquet_geometry.hpp"
9
#include "reader/list_column_reader.hpp"
10
#include "parquet_crypto.hpp"
11
#include "parquet_file_metadata_cache.hpp"
12
#include "parquet_statistics.hpp"
13
#include "parquet_timestamp.hpp"
14
#include "mbedtls_wrapper.hpp"
15
#include "reader/row_number_column_reader.hpp"
16
#include "reader/string_column_reader.hpp"
17
#include "reader/variant_column_reader.hpp"
18
#include "reader/struct_column_reader.hpp"
19
#include "reader/templated_column_reader.hpp"
20
#include "thrift_tools.hpp"
21
#include "duckdb/main/config.hpp"
22
#include "duckdb/common/encryption_state.hpp"
23
#include "duckdb/common/file_system.hpp"
24
#include "duckdb/common/helper.hpp"
25
#include "duckdb/common/hive_partitioning.hpp"
26
#include "duckdb/common/string_util.hpp"
27
#include "duckdb/planner/table_filter.hpp"
28
#include "duckdb/storage/object_cache.hpp"
29
#include "duckdb/optimizer/statistics_propagator.hpp"
30
#include "duckdb/planner/table_filter_state.hpp"
31
#include "duckdb/common/multi_file/multi_file_reader.hpp"
32
#include "duckdb/logging/log_manager.hpp"
33
34
#include <cassert>
35
#include <chrono>
36
#include <cstring>
37
#include <sstream>
38
39
namespace duckdb {
40
41
using duckdb_parquet::ColumnChunk;
42
using duckdb_parquet::ConvertedType;
43
using duckdb_parquet::FieldRepetitionType;
44
using duckdb_parquet::FileCryptoMetaData;
45
using duckdb_parquet::FileMetaData;
46
using ParquetRowGroup = duckdb_parquet::RowGroup;
47
using duckdb_parquet::SchemaElement;
48
using duckdb_parquet::Statistics;
49
using duckdb_parquet::Type;
50
51
static unique_ptr<duckdb_apache::thrift::protocol::TProtocol>
52
0
CreateThriftFileProtocol(QueryContext context, CachingFileHandle &file_handle, bool prefetch_mode) {
53
0
  auto transport = duckdb_base_std::make_shared<ThriftFileTransport>(file_handle, prefetch_mode);
54
0
  return make_uniq<duckdb_apache::thrift::protocol::TCompactProtocolT<ThriftFileTransport>>(std::move(transport));
55
0
}
56
57
0
static bool ShouldAndCanPrefetch(ClientContext &context, CachingFileHandle &file_handle) {
58
0
  Value disable_prefetch = false;
59
0
  Value prefetch_all_files = false;
60
0
  context.TryGetCurrentSetting("disable_parquet_prefetching", disable_prefetch);
61
0
  context.TryGetCurrentSetting("prefetch_all_parquet_files", prefetch_all_files);
62
0
  bool should_prefetch = !file_handle.OnDiskFile() || prefetch_all_files.GetValue<bool>();
63
0
  bool can_prefetch = file_handle.CanSeek() && !disable_prefetch.GetValue<bool>();
64
0
  return should_prefetch && can_prefetch;
65
0
}
66
67
static void ParseParquetFooter(data_ptr_t buffer, const string &file_path, idx_t file_size,
68
                               const shared_ptr<const ParquetEncryptionConfig> &encryption_config, uint32_t &footer_len,
69
0
                               bool &footer_encrypted) {
70
0
  if (memcmp(buffer + 4, "PAR1", 4) == 0) {
71
0
    footer_encrypted = false;
72
0
    if (encryption_config) {
73
0
      throw InvalidInputException("File '%s' is not encrypted, but 'encryption_config' was set", file_path);
74
0
    }
75
0
  } else if (memcmp(buffer + 4, "PARE", 4) == 0) {
76
0
    footer_encrypted = true;
77
0
    if (!encryption_config) {
78
0
      throw InvalidInputException("File '%s' is encrypted, but 'encryption_config' was not set", file_path);
79
0
    }
80
0
  } else {
81
0
    throw InvalidInputException("No magic bytes found at end of file '%s'", file_path);
82
0
  }
83
84
  // read four-byte footer length from just before the end magic bytes
85
0
  footer_len = Load<uint32_t>(buffer);
86
0
  if (footer_len == 0 || file_size < 12 + footer_len) {
87
0
    throw InvalidInputException("Footer length error in file '%s'", file_path);
88
0
  }
89
0
}
90
91
static shared_ptr<ParquetFileMetadataCache>
92
LoadMetadata(ClientContext &context, Allocator &allocator, CachingFileHandle &file_handle,
93
             const shared_ptr<const ParquetEncryptionConfig> &encryption_config, const EncryptionUtil &encryption_util,
94
0
             optional_idx footer_size) {
95
0
  auto file_proto = CreateThriftFileProtocol(context, file_handle, false);
96
0
  auto &transport = reinterpret_cast<ThriftFileTransport &>(*file_proto->getTransport());
97
0
  auto file_size = transport.GetSize();
98
0
  if (file_size < 12) {
99
0
    throw InvalidInputException("File '%s' too small to be a Parquet file", file_handle.GetPath());
100
0
  }
101
102
0
  bool footer_encrypted;
103
0
  uint32_t footer_len;
104
  // footer size is not provided - read it from the back
105
0
  if (!footer_size.IsValid()) {
106
    // We have to do two reads here:
107
    // 1. The 8 bytes from the back to check if it's a Parquet file and the footer size
108
    // 2. The footer (after getting the size)
109
    // For local reads this doesn't matter much, but for remote reads this means two round trips,
110
    // which is especially bad for small Parquet files where the read cost is mostly round trips.
111
    // So, we prefetch more, to hopefully save a round trip.
112
0
    static constexpr idx_t ESTIMATED_FOOTER_RATIO = 1000; // Estimate 1/1000th of the file to be footer
113
0
    static constexpr idx_t MIN_PREFETCH_SIZE = 16384;     // Prefetch at least this many bytes
114
0
    static constexpr idx_t MAX_PREFETCH_SIZE = 262144;    // Prefetch at most this many bytes
115
0
    idx_t prefetch_size = 8;
116
0
    if (ShouldAndCanPrefetch(context, file_handle)) {
117
0
      prefetch_size = ClampValue(file_size / ESTIMATED_FOOTER_RATIO, MIN_PREFETCH_SIZE, MAX_PREFETCH_SIZE);
118
0
      prefetch_size = MinValue(NextPowerOfTwo(prefetch_size), file_size);
119
0
    }
120
121
0
    ResizeableBuffer buf;
122
0
    buf.resize(allocator, 8);
123
0
    buf.zero();
124
125
0
    transport.Prefetch(file_size - prefetch_size, prefetch_size);
126
0
    transport.SetLocation(file_size - 8);
127
0
    transport.read(buf.ptr, 8);
128
129
0
    ParseParquetFooter(buf.ptr, file_handle.GetPath(), file_size, encryption_config, footer_len, footer_encrypted);
130
131
0
    auto metadata_pos = file_size - (footer_len + 8);
132
0
    transport.SetLocation(metadata_pos);
133
0
    if (footer_len > prefetch_size - 8) {
134
0
      transport.Prefetch(metadata_pos, footer_len);
135
0
    }
136
0
  } else {
137
0
    footer_len = UnsafeNumericCast<uint32_t>(footer_size.GetIndex());
138
0
    if (footer_len == 0 || file_size < 12 + footer_len) {
139
0
      throw InvalidInputException("Invalid footer length provided for file '%s'", file_handle.GetPath());
140
0
    }
141
142
0
    idx_t total_footer_len = footer_len + 8;
143
0
    auto metadata_pos = file_size - total_footer_len;
144
0
    transport.SetLocation(metadata_pos);
145
0
    transport.Prefetch(metadata_pos, total_footer_len);
146
147
0
    auto read_head = transport.GetReadHead(metadata_pos);
148
0
    auto data_ptr = read_head->buffer_ptr;
149
150
0
    uint32_t read_footer_len;
151
0
    ParseParquetFooter(data_ptr + footer_len, file_handle.GetPath(), file_size, encryption_config, read_footer_len,
152
0
                       footer_encrypted);
153
0
    if (read_footer_len != footer_len) {
154
0
      throw InvalidInputException("Parquet footer length stored in file is not equal to footer length provided");
155
0
    }
156
0
  }
157
158
0
  auto metadata = make_uniq<FileMetaData>();
159
0
  if (footer_encrypted) {
160
0
    auto crypto_metadata = make_uniq<FileCryptoMetaData>();
161
0
    crypto_metadata->read(file_proto.get());
162
0
    if (crypto_metadata->encryption_algorithm.__isset.AES_GCM_CTR_V1) {
163
0
      throw InvalidInputException("File '%s' is encrypted with AES_GCM_CTR_V1, but only AES_GCM_V1 is supported",
164
0
                                  file_handle.GetPath());
165
0
    }
166
0
    ParquetCrypto::Read(*metadata, *file_proto, encryption_config->GetFooterKey(), encryption_util);
167
0
  } else {
168
0
    metadata->read(file_proto.get());
169
0
  }
170
171
  // Try to read the GeoParquet metadata (if present)
172
0
  auto geo_metadata = GeoParquetFileMetadata::TryRead(*metadata, context);
173
0
  return make_shared_ptr<ParquetFileMetadataCache>(std::move(metadata), file_handle, std::move(geo_metadata),
174
0
                                                   footer_len);
175
0
}
176
177
0
LogicalType ParquetReader::DeriveLogicalType(const SchemaElement &s_ele, ParquetColumnSchema &schema) const {
178
  // inner node
179
0
  if (s_ele.type == Type::FIXED_LEN_BYTE_ARRAY && !s_ele.__isset.type_length) {
180
0
    throw IOException("FIXED_LEN_BYTE_ARRAY requires length to be set");
181
0
  }
182
0
  if (s_ele.__isset.type_length) {
183
0
    schema.type_length = NumericCast<uint32_t>(s_ele.type_length);
184
0
  }
185
0
  schema.parquet_type = s_ele.type;
186
0
  if (s_ele.__isset.logicalType) {
187
0
    if (s_ele.logicalType.__isset.UNKNOWN) {
188
0
      return LogicalType::SQLNULL;
189
0
    } else if (s_ele.logicalType.__isset.UUID) {
190
0
      if (s_ele.type == Type::FIXED_LEN_BYTE_ARRAY) {
191
0
        return LogicalType::UUID;
192
0
      }
193
0
    } else if (s_ele.logicalType.__isset.FLOAT16) {
194
0
      if (s_ele.type == Type::FIXED_LEN_BYTE_ARRAY && s_ele.type_length == 2) {
195
0
        schema.type_info = ParquetExtraTypeInfo::FLOAT16;
196
0
        return LogicalType::FLOAT;
197
0
      }
198
0
    } else if (s_ele.logicalType.__isset.TIMESTAMP) {
199
0
      if (s_ele.logicalType.TIMESTAMP.unit.__isset.MILLIS) {
200
0
        schema.type_info = ParquetExtraTypeInfo::UNIT_MS;
201
0
      } else if (s_ele.logicalType.TIMESTAMP.unit.__isset.MICROS) {
202
0
        schema.type_info = ParquetExtraTypeInfo::UNIT_MICROS;
203
0
      } else if (s_ele.logicalType.TIMESTAMP.unit.__isset.NANOS) {
204
0
        schema.type_info = ParquetExtraTypeInfo::UNIT_NS;
205
0
      } else {
206
0
        throw NotImplementedException("Unimplemented TIMESTAMP encoding - missing UNIT");
207
0
      }
208
0
      if (s_ele.logicalType.TIMESTAMP.isAdjustedToUTC) {
209
0
        return LogicalType::TIMESTAMP_TZ;
210
0
      } else if (s_ele.logicalType.TIMESTAMP.unit.__isset.NANOS) {
211
0
        return LogicalType::TIMESTAMP_NS;
212
0
      }
213
0
      return LogicalType::TIMESTAMP;
214
0
    } else if (s_ele.logicalType.__isset.TIME) {
215
0
      if (s_ele.logicalType.TIME.unit.__isset.MILLIS) {
216
0
        schema.type_info = ParquetExtraTypeInfo::UNIT_MS;
217
0
      } else if (s_ele.logicalType.TIME.unit.__isset.MICROS) {
218
0
        schema.type_info = ParquetExtraTypeInfo::UNIT_MICROS;
219
0
      } else if (s_ele.logicalType.TIME.unit.__isset.NANOS) {
220
0
        schema.type_info = ParquetExtraTypeInfo::UNIT_NS;
221
0
      } else {
222
0
        throw NotImplementedException("Unimplemented TIME encoding - missing UNIT");
223
0
      }
224
0
      if (s_ele.logicalType.TIME.isAdjustedToUTC) {
225
0
        return LogicalType::TIME_TZ;
226
0
      }
227
0
      return LogicalType::TIME;
228
0
    }
229
0
  }
230
0
  if (s_ele.__isset.converted_type) {
231
    // Legacy NULL type, does no longer exist, but files are still around of course
232
0
    if (static_cast<uint8_t>(s_ele.converted_type) == 24) {
233
0
      return LogicalTypeId::SQLNULL;
234
0
    }
235
0
    switch (s_ele.converted_type) {
236
0
    case ConvertedType::INT_8:
237
0
      if (s_ele.type == Type::INT32) {
238
0
        return LogicalType::TINYINT;
239
0
      } else {
240
0
        throw IOException("INT8 converted type can only be set for value of Type::INT32");
241
0
      }
242
0
    case ConvertedType::INT_16:
243
0
      if (s_ele.type == Type::INT32) {
244
0
        return LogicalType::SMALLINT;
245
0
      } else {
246
0
        throw IOException("INT16 converted type can only be set for value of Type::INT32");
247
0
      }
248
0
    case ConvertedType::INT_32:
249
0
      if (s_ele.type == Type::INT32) {
250
0
        return LogicalType::INTEGER;
251
0
      } else {
252
0
        throw IOException("INT32 converted type can only be set for value of Type::INT32");
253
0
      }
254
0
    case ConvertedType::INT_64:
255
0
      if (s_ele.type == Type::INT64) {
256
0
        return LogicalType::BIGINT;
257
0
      } else {
258
0
        throw IOException("INT64 converted type can only be set for value of Type::INT32");
259
0
      }
260
0
    case ConvertedType::UINT_8:
261
0
      if (s_ele.type == Type::INT32) {
262
0
        return LogicalType::UTINYINT;
263
0
      } else {
264
0
        throw IOException("UINT8 converted type can only be set for value of Type::INT32");
265
0
      }
266
0
    case ConvertedType::UINT_16:
267
0
      if (s_ele.type == Type::INT32) {
268
0
        return LogicalType::USMALLINT;
269
0
      } else {
270
0
        throw IOException("UINT16 converted type can only be set for value of Type::INT32");
271
0
      }
272
0
    case ConvertedType::UINT_32:
273
0
      if (s_ele.type == Type::INT32) {
274
0
        return LogicalType::UINTEGER;
275
0
      } else {
276
0
        throw IOException("UINT32 converted type can only be set for value of Type::INT32");
277
0
      }
278
0
    case ConvertedType::UINT_64:
279
0
      if (s_ele.type == Type::INT64) {
280
0
        return LogicalType::UBIGINT;
281
0
      } else {
282
0
        throw IOException("UINT64 converted type can only be set for value of Type::INT64");
283
0
      }
284
0
    case ConvertedType::DATE:
285
0
      if (s_ele.type == Type::INT32) {
286
0
        return LogicalType::DATE;
287
0
      } else {
288
0
        throw IOException("DATE converted type can only be set for value of Type::INT32");
289
0
      }
290
0
    case ConvertedType::TIMESTAMP_MICROS:
291
0
      schema.type_info = ParquetExtraTypeInfo::UNIT_MICROS;
292
0
      if (s_ele.type == Type::INT64) {
293
0
        return LogicalType::TIMESTAMP;
294
0
      } else {
295
0
        throw IOException("TIMESTAMP converted type can only be set for value of Type::INT64");
296
0
      }
297
0
    case ConvertedType::TIMESTAMP_MILLIS:
298
0
      schema.type_info = ParquetExtraTypeInfo::UNIT_MS;
299
0
      if (s_ele.type == Type::INT64) {
300
0
        return LogicalType::TIMESTAMP;
301
0
      } else {
302
0
        throw IOException("TIMESTAMP converted type can only be set for value of Type::INT64");
303
0
      }
304
0
    case ConvertedType::DECIMAL:
305
0
      if (!s_ele.__isset.precision || !s_ele.__isset.scale) {
306
0
        throw IOException("DECIMAL requires a length and scale specifier!");
307
0
      }
308
0
      schema.type_scale = NumericCast<uint32_t>(s_ele.scale);
309
0
      if (s_ele.precision > DecimalType::MaxWidth()) {
310
0
        schema.type_info = ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY;
311
0
        return LogicalType::DOUBLE;
312
0
      }
313
0
      switch (s_ele.type) {
314
0
      case Type::BYTE_ARRAY:
315
0
      case Type::FIXED_LEN_BYTE_ARRAY:
316
0
        schema.type_info = ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY;
317
0
        break;
318
0
      case Type::INT32:
319
0
        schema.type_info = ParquetExtraTypeInfo::DECIMAL_INT32;
320
0
        break;
321
0
      case Type::INT64:
322
0
        schema.type_info = ParquetExtraTypeInfo::DECIMAL_INT64;
323
0
        break;
324
0
      default:
325
0
        throw IOException(
326
0
            "DECIMAL converted type can only be set for value of Type::(FIXED_LEN_)BYTE_ARRAY/INT32/INT64");
327
0
      }
328
0
      return LogicalType::DECIMAL(s_ele.precision, s_ele.scale);
329
0
    case ConvertedType::UTF8:
330
0
    case ConvertedType::ENUM:
331
0
      switch (s_ele.type) {
332
0
      case Type::BYTE_ARRAY:
333
0
      case Type::FIXED_LEN_BYTE_ARRAY:
334
0
        return LogicalType::VARCHAR;
335
0
      default:
336
0
        throw IOException("UTF8 converted type can only be set for Type::(FIXED_LEN_)BYTE_ARRAY");
337
0
      }
338
0
    case ConvertedType::TIME_MILLIS:
339
0
      schema.type_info = ParquetExtraTypeInfo::UNIT_MS;
340
0
      if (s_ele.type == Type::INT32) {
341
0
        return LogicalType::TIME;
342
0
      } else {
343
0
        throw IOException("TIME_MILLIS converted type can only be set for value of Type::INT32");
344
0
      }
345
0
    case ConvertedType::TIME_MICROS:
346
0
      schema.type_info = ParquetExtraTypeInfo::UNIT_MICROS;
347
0
      if (s_ele.type == Type::INT64) {
348
0
        return LogicalType::TIME;
349
0
      } else {
350
0
        throw IOException("TIME_MICROS converted type can only be set for value of Type::INT64");
351
0
      }
352
0
    case ConvertedType::INTERVAL:
353
0
      return LogicalType::INTERVAL;
354
0
    case ConvertedType::JSON:
355
0
      return LogicalType::JSON();
356
0
    case ConvertedType::MAP:
357
0
    case ConvertedType::MAP_KEY_VALUE:
358
0
    case ConvertedType::LIST:
359
0
    case ConvertedType::BSON:
360
0
    default:
361
0
      throw IOException("Unsupported converted type (%d)", (int32_t)s_ele.converted_type);
362
0
    }
363
0
  } else {
364
    // no converted type set
365
    // use default type for each physical type
366
0
    switch (s_ele.type) {
367
0
    case Type::BOOLEAN:
368
0
      return LogicalType::BOOLEAN;
369
0
    case Type::INT32:
370
0
      return LogicalType::INTEGER;
371
0
    case Type::INT64:
372
0
      return LogicalType::BIGINT;
373
0
    case Type::INT96: // always a timestamp it would seem
374
0
      schema.type_info = ParquetExtraTypeInfo::IMPALA_TIMESTAMP;
375
0
      return LogicalType::TIMESTAMP;
376
0
    case Type::FLOAT:
377
0
      return LogicalType::FLOAT;
378
0
    case Type::DOUBLE:
379
0
      return LogicalType::DOUBLE;
380
0
    case Type::BYTE_ARRAY:
381
0
    case Type::FIXED_LEN_BYTE_ARRAY:
382
0
      if (parquet_options.binary_as_string) {
383
0
        return LogicalType::VARCHAR;
384
0
      }
385
0
      return LogicalType::BLOB;
386
0
    default:
387
0
      return LogicalType::INVALID;
388
0
    }
389
0
  }
390
0
}
391
392
ParquetColumnSchema ParquetReader::ParseColumnSchema(const SchemaElement &s_ele, idx_t max_define, idx_t max_repeat,
393
                                                     idx_t schema_index, idx_t column_index,
394
0
                                                     ParquetColumnSchemaType type) {
395
0
  ParquetColumnSchema schema(max_define, max_repeat, schema_index, column_index, type);
396
0
  schema.name = s_ele.name;
397
0
  schema.type = DeriveLogicalType(s_ele, schema);
398
0
  return schema;
399
0
}
400
401
unique_ptr<ColumnReader> ParquetReader::CreateReaderRecursive(ClientContext &context,
402
                                                              const vector<ColumnIndex> &indexes,
403
0
                                                              const ParquetColumnSchema &schema) {
404
0
  switch (schema.schema_type) {
405
0
  case ParquetColumnSchemaType::FILE_ROW_NUMBER:
406
0
    return make_uniq<RowNumberColumnReader>(*this, schema);
407
0
  case ParquetColumnSchemaType::GEOMETRY: {
408
0
    return GeometryColumnReader::Create(*this, schema, context);
409
0
  }
410
0
  case ParquetColumnSchemaType::COLUMN: {
411
0
    if (schema.children.empty()) {
412
      // leaf reader
413
0
      return ColumnReader::CreateReader(*this, schema);
414
0
    }
415
0
    vector<unique_ptr<ColumnReader>> children;
416
0
    children.resize(schema.children.size());
417
0
    if (indexes.empty()) {
418
0
      for (idx_t child_index = 0; child_index < schema.children.size(); child_index++) {
419
0
        children[child_index] = CreateReaderRecursive(context, indexes, schema.children[child_index]);
420
0
      }
421
0
    } else {
422
0
      for (idx_t i = 0; i < indexes.size(); i++) {
423
0
        auto child_index = indexes[i].GetPrimaryIndex();
424
0
        children[child_index] =
425
0
            CreateReaderRecursive(context, indexes[i].GetChildIndexes(), schema.children[child_index]);
426
0
      }
427
0
    }
428
0
    switch (schema.type.id()) {
429
0
    case LogicalTypeId::LIST:
430
0
    case LogicalTypeId::MAP:
431
0
      D_ASSERT(children.size() == 1);
432
0
      return make_uniq<ListColumnReader>(*this, schema, std::move(children[0]));
433
0
    case LogicalTypeId::STRUCT:
434
0
      return make_uniq<StructColumnReader>(*this, schema, std::move(children));
435
0
    default:
436
0
      throw InternalException("Unsupported schema type for schema with children");
437
0
    }
438
0
  }
439
0
  case ParquetColumnSchemaType::VARIANT: {
440
0
    if (schema.children.size() < 2) {
441
0
      throw InternalException("VARIANT schema type used for a non-variant type column");
442
0
    }
443
0
    vector<unique_ptr<ColumnReader>> children;
444
0
    children.resize(schema.children.size());
445
0
    for (idx_t child_index = 0; child_index < schema.children.size(); child_index++) {
446
0
      children[child_index] = CreateReaderRecursive(context, indexes, schema.children[child_index]);
447
0
    }
448
0
    return make_uniq<VariantColumnReader>(context, *this, schema, std::move(children));
449
0
  }
450
0
  default:
451
0
    throw InternalException("Unsupported ParquetColumnSchemaType");
452
0
  }
453
0
}
454
455
0
unique_ptr<ColumnReader> ParquetReader::CreateReader(ClientContext &context) {
456
0
  auto ret = CreateReaderRecursive(context, column_indexes, *root_schema);
457
0
  if (ret->Type().id() != LogicalTypeId::STRUCT) {
458
0
    throw InternalException("Root element of Parquet file must be a struct");
459
0
  }
460
  // add expressions if required
461
0
  auto &root_struct_reader = ret->Cast<StructColumnReader>();
462
0
  for (auto &entry : expression_map) {
463
0
    auto column_id = entry.first;
464
0
    auto &expression = entry.second;
465
0
    auto child_reader = std::move(root_struct_reader.child_readers[column_id]);
466
0
    auto expr_schema = make_uniq<ParquetColumnSchema>(child_reader->Schema(), expression->return_type,
467
0
                                                      ParquetColumnSchemaType::EXPRESSION);
468
0
    auto expr_reader = make_uniq<ExpressionColumnReader>(context, std::move(child_reader), expression->Copy(),
469
0
                                                         std::move(expr_schema));
470
0
    root_struct_reader.child_readers[column_id] = std::move(expr_reader);
471
0
  }
472
0
  return ret;
473
0
}
474
475
ParquetColumnSchema::ParquetColumnSchema(idx_t max_define, idx_t max_repeat, idx_t schema_index, idx_t column_index,
476
                                         ParquetColumnSchemaType schema_type)
477
0
    : ParquetColumnSchema(string(), LogicalTypeId::INVALID, max_define, max_repeat, schema_index, column_index,
478
0
                          schema_type) {
479
0
}
480
481
ParquetColumnSchema::ParquetColumnSchema(string name_p, LogicalType type_p, idx_t max_define, idx_t max_repeat,
482
                                         idx_t schema_index, idx_t column_index, ParquetColumnSchemaType schema_type)
483
0
    : schema_type(schema_type), name(std::move(name_p)), type(std::move(type_p)), max_define(max_define),
484
0
      max_repeat(max_repeat), schema_index(schema_index), column_index(column_index) {
485
0
}
486
487
ParquetColumnSchema::ParquetColumnSchema(ParquetColumnSchema child, LogicalType result_type,
488
                                         ParquetColumnSchemaType schema_type)
489
0
    : schema_type(schema_type), name(child.name), type(std::move(result_type)), max_define(child.max_define),
490
0
      max_repeat(child.max_repeat), schema_index(child.schema_index), column_index(child.column_index) {
491
0
  children.push_back(std::move(child));
492
0
}
493
494
unique_ptr<BaseStatistics> ParquetColumnSchema::Stats(const FileMetaData &file_meta_data,
495
                                                      const ParquetOptions &parquet_options, idx_t row_group_idx_p,
496
0
                                                      const vector<ColumnChunk> &columns) const {
497
0
  if (schema_type == ParquetColumnSchemaType::EXPRESSION) {
498
0
    return nullptr;
499
0
  }
500
0
  if (schema_type == ParquetColumnSchemaType::FILE_ROW_NUMBER) {
501
0
    auto stats = NumericStats::CreateUnknown(type);
502
0
    auto &row_groups = file_meta_data.row_groups;
503
0
    D_ASSERT(row_group_idx_p < row_groups.size());
504
0
    idx_t row_group_offset_min = 0;
505
0
    for (idx_t i = 0; i < row_group_idx_p; i++) {
506
0
      row_group_offset_min += row_groups[i].num_rows;
507
0
    }
508
509
0
    NumericStats::SetMin(stats, Value::BIGINT(UnsafeNumericCast<int64_t>(row_group_offset_min)));
510
0
    NumericStats::SetMax(stats, Value::BIGINT(UnsafeNumericCast<int64_t>(row_group_offset_min +
511
0
                                                                         row_groups[row_group_idx_p].num_rows)));
512
0
    stats.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
513
0
    return stats.ToUnique();
514
0
  }
515
0
  return ParquetStatisticsUtils::TransformColumnStatistics(*this, columns, parquet_options.can_have_nan);
516
0
}
517
518
0
static bool IsGeometryType(const SchemaElement &s_ele, const ParquetFileMetadataCache &metadata, idx_t depth) {
519
0
  const auto is_blob = s_ele.__isset.type && s_ele.type == Type::BYTE_ARRAY;
520
0
  if (!is_blob) {
521
0
    return false;
522
0
  }
523
524
  // TODO: Handle CRS in the future
525
0
  const auto is_native_geom = s_ele.__isset.logicalType && s_ele.logicalType.__isset.GEOMETRY;
526
0
  const auto is_native_geog = s_ele.__isset.logicalType && s_ele.logicalType.__isset.GEOGRAPHY;
527
0
  if (is_native_geom || is_native_geog) {
528
0
    return true;
529
0
  }
530
531
  // geoparquet types have to be at the root of the schema, and have to be present in the kv metadata.
532
0
  const auto is_at_root = depth == 1;
533
0
  const auto is_in_gpq_metadata = metadata.geo_metadata && metadata.geo_metadata->IsGeometryColumn(s_ele.name);
534
0
  const auto is_leaf = s_ele.num_children == 0;
535
0
  const auto is_geoparquet_geom = is_at_root && is_in_gpq_metadata && is_leaf;
536
537
0
  if (is_geoparquet_geom) {
538
0
    return true;
539
0
  }
540
541
0
  return false;
542
0
}
543
544
ParquetColumnSchema ParquetReader::ParseSchemaRecursive(idx_t depth, idx_t max_define, idx_t max_repeat,
545
                                                        idx_t &next_schema_idx, idx_t &next_file_idx,
546
0
                                                        ClientContext &context) {
547
0
  auto file_meta_data = GetFileMetadata();
548
0
  D_ASSERT(file_meta_data);
549
0
  if (next_schema_idx >= file_meta_data->schema.size()) {
550
0
    throw InvalidInputException("Malformed Parquet schema in file \"%s\": invalid schema index %d", file.path,
551
0
                                next_schema_idx);
552
0
  }
553
0
  auto &s_ele = file_meta_data->schema[next_schema_idx];
554
0
  auto this_idx = next_schema_idx;
555
556
0
  auto repetition_type = FieldRepetitionType::REQUIRED;
557
0
  if (s_ele.__isset.repetition_type && this_idx > 0) {
558
0
    repetition_type = s_ele.repetition_type;
559
0
  }
560
0
  if (repetition_type != FieldRepetitionType::REQUIRED) {
561
0
    max_define++;
562
0
  }
563
0
  if (repetition_type == FieldRepetitionType::REPEATED) {
564
0
    max_repeat++;
565
0
  }
566
567
  // Check for geometry type
568
0
  if (IsGeometryType(s_ele, *metadata, depth)) {
569
    // Geometries in both GeoParquet and native parquet are stored as a WKB-encoded BLOB.
570
    // Because we don't just want to validate that the WKB encoding is correct, but also transform it into
571
    // little-endian if necessary, we cant just make use of the StringColumnReader without heavily modifying it.
572
    // Therefore, we create a dedicated GEOMETRY parquet column schema type, which wraps the underlying BLOB column.
573
    // This schema type gets instantiated as a ExpressionColumnReader on top of the standard Blob/String reader,
574
    // which performs the WKB validation/transformation using the `ST_GeomFromWKB` function of DuckDB.
575
    // This enables us to also support other geometry encodings (such as GeoArrow geometries) easier in the future.
576
577
    // Inner BLOB schema
578
0
    ParquetColumnSchema blob_schema(max_define, max_repeat, this_idx, next_file_idx++,
579
0
                                    ParquetColumnSchemaType::COLUMN);
580
0
    blob_schema.name = s_ele.name;
581
0
    blob_schema.type = LogicalType::BLOB;
582
583
    // Wrap in geometry schema
584
0
    ParquetColumnSchema geom_schema(std::move(blob_schema), LogicalType::GEOMETRY(),
585
0
                                    ParquetColumnSchemaType::GEOMETRY);
586
0
    return geom_schema;
587
0
  }
588
589
0
  if (s_ele.__isset.num_children && s_ele.num_children > 0) { // inner node
590
0
    vector<ParquetColumnSchema> child_schemas;
591
592
0
    idx_t c_idx = 0;
593
0
    while (c_idx < NumericCast<idx_t>(s_ele.num_children)) {
594
0
      next_schema_idx++;
595
596
0
      auto child_schema =
597
0
          ParseSchemaRecursive(depth + 1, max_define, max_repeat, next_schema_idx, next_file_idx, context);
598
0
      child_schemas.push_back(std::move(child_schema));
599
0
      c_idx++;
600
0
    }
601
    // rename child type entries if there are case-insensitive duplicates by appending _1, _2 etc.
602
    // behavior consistent with CSV reader fwiw
603
0
    case_insensitive_map_t<idx_t> name_collision_count;
604
0
    for (auto &child_schema : child_schemas) {
605
0
      auto &col_name = child_schema.name;
606
      // avoid duplicate header names
607
0
      while (name_collision_count.find(col_name) != name_collision_count.end()) {
608
0
        name_collision_count[col_name] += 1;
609
0
        col_name = col_name + "_" + to_string(name_collision_count[col_name]);
610
0
      }
611
0
      child_schema.name = col_name;
612
0
      name_collision_count[col_name] = 0;
613
0
    }
614
615
0
    bool is_repeated = repetition_type == FieldRepetitionType::REPEATED;
616
0
    const bool is_list = s_ele.__isset.converted_type && s_ele.converted_type == ConvertedType::LIST;
617
0
    const bool is_map = s_ele.__isset.converted_type && s_ele.converted_type == ConvertedType::MAP;
618
0
    bool is_map_kv = s_ele.__isset.converted_type && s_ele.converted_type == ConvertedType::MAP_KEY_VALUE;
619
0
    bool is_variant = s_ele.__isset.logicalType && s_ele.logicalType.__isset.VARIANT == true;
620
621
0
    if (!is_map_kv && this_idx > 0) {
622
      // check if the parent node of this is a map
623
0
      auto &p_ele = file_meta_data->schema[this_idx - 1];
624
0
      bool parent_is_map = p_ele.__isset.converted_type && p_ele.converted_type == ConvertedType::MAP;
625
0
      bool parent_has_children = p_ele.__isset.num_children && p_ele.num_children == 1;
626
0
      is_map_kv = parent_is_map && parent_has_children;
627
0
    }
628
629
0
    if (is_map_kv) {
630
0
      if (child_schemas.size() != 2) {
631
0
        throw IOException("MAP_KEY_VALUE requires two children");
632
0
      }
633
0
      if (!is_repeated) {
634
0
        throw IOException("MAP_KEY_VALUE needs to be repeated");
635
0
      }
636
0
      auto result_type = LogicalType::MAP(child_schemas[0].type, child_schemas[1].type);
637
0
      ParquetColumnSchema struct_schema(s_ele.name, ListType::GetChildType(result_type), max_define - 1,
638
0
                                        max_repeat - 1, this_idx, next_file_idx);
639
0
      struct_schema.children = std::move(child_schemas);
640
641
0
      ParquetColumnSchema map_schema(s_ele.name, std::move(result_type), max_define, max_repeat, this_idx,
642
0
                                     next_file_idx);
643
0
      map_schema.children.push_back(std::move(struct_schema));
644
0
      return map_schema;
645
0
    }
646
0
    ParquetColumnSchema result;
647
0
    if (child_schemas.size() > 1 || (!is_list && !is_map && !is_repeated)) {
648
0
      child_list_t<LogicalType> struct_types;
649
0
      for (auto &child_schema : child_schemas) {
650
0
        struct_types.emplace_back(make_pair(child_schema.name, child_schema.type));
651
0
      }
652
653
0
      LogicalType result_type;
654
0
      if (is_variant) {
655
0
        result_type = LogicalType::VARIANT();
656
0
      } else {
657
0
        result_type = LogicalType::STRUCT(std::move(struct_types));
658
0
      }
659
0
      ParquetColumnSchema struct_schema(s_ele.name, std::move(result_type), max_define, max_repeat, this_idx,
660
0
                                        next_file_idx);
661
0
      struct_schema.children = std::move(child_schemas);
662
0
      if (is_variant) {
663
0
        struct_schema.schema_type = ParquetColumnSchemaType::VARIANT;
664
0
      }
665
0
      result = std::move(struct_schema);
666
0
    } else {
667
      // if we have a struct with only a single type, pull up
668
0
      result = std::move(child_schemas[0]);
669
0
      result.name = s_ele.name;
670
0
    }
671
0
    if (is_repeated) {
672
0
      auto list_type = LogicalType::LIST(result.type);
673
0
      ParquetColumnSchema list_schema(s_ele.name, std::move(list_type), max_define, max_repeat, this_idx,
674
0
                                      next_file_idx);
675
0
      list_schema.children.push_back(std::move(result));
676
0
      result = std::move(list_schema);
677
0
    }
678
0
    result.parent_schema_index = this_idx;
679
0
    return result;
680
0
  } else { // leaf node
681
0
    if (!s_ele.__isset.type) {
682
0
      throw InvalidInputException(
683
0
          "Node '%s' has neither num_children nor type set - this violates the Parquet spec (corrupted file)",
684
0
          s_ele.name.c_str());
685
0
    }
686
0
    auto result = ParseColumnSchema(s_ele, max_define, max_repeat, this_idx, next_file_idx++);
687
0
    if (s_ele.repetition_type == FieldRepetitionType::REPEATED) {
688
0
      auto list_type = LogicalType::LIST(result.type);
689
0
      ParquetColumnSchema list_schema(s_ele.name, std::move(list_type), max_define, max_repeat, this_idx,
690
0
                                      next_file_idx);
691
0
      list_schema.children.push_back(std::move(result));
692
0
      return list_schema;
693
0
    }
694
695
0
    return result;
696
0
  }
697
0
}
698
699
0
static ParquetColumnSchema FileRowNumberSchema() {
700
0
  return ParquetColumnSchema("file_row_number", LogicalType::BIGINT, 0, 0, 0, 0,
701
0
                             ParquetColumnSchemaType::FILE_ROW_NUMBER);
702
0
}
703
704
0
unique_ptr<ParquetColumnSchema> ParquetReader::ParseSchema(ClientContext &context) {
705
0
  auto file_meta_data = GetFileMetadata();
706
0
  idx_t next_schema_idx = 0;
707
0
  idx_t next_file_idx = 0;
708
709
0
  if (file_meta_data->schema.empty()) {
710
0
    throw IOException("Failed to read Parquet file \"%s\": no schema elements found", file.path);
711
0
  }
712
0
  if (file_meta_data->schema[0].num_children == 0) {
713
0
    throw IOException("Failed to read Parquet file \"%s\": root schema element has no children", file.path);
714
0
  }
715
0
  auto root = ParseSchemaRecursive(0, 0, 0, next_schema_idx, next_file_idx, context);
716
0
  if (root.type.id() != LogicalTypeId::STRUCT) {
717
0
    throw InvalidInputException("Failed to read Parquet file \"%s\": Root element of Parquet file must be a struct",
718
0
                                file.path);
719
0
  }
720
0
  D_ASSERT(next_schema_idx == file_meta_data->schema.size() - 1);
721
0
  if (!file_meta_data->row_groups.empty() && next_file_idx != file_meta_data->row_groups[0].columns.size()) {
722
0
    throw InvalidInputException("Failed to read Parquet file \"%s\": row group does not have enough columns",
723
0
                                file.path);
724
0
  }
725
0
  if (parquet_options.file_row_number) {
726
0
    for (auto &column : root.children) {
727
0
      auto &name = column.name;
728
0
      if (StringUtil::CIEquals(name, "file_row_number")) {
729
0
        throw BinderException("Failed to read Parquet file \"%s\": Using file_row_number option on file with "
730
0
                              "column named file_row_number is not supported",
731
0
                              file.path);
732
0
      }
733
0
    }
734
0
    root.children.push_back(FileRowNumberSchema());
735
0
  }
736
0
  return make_uniq<ParquetColumnSchema>(root);
737
0
}
738
739
MultiFileColumnDefinition ParquetReader::ParseColumnDefinition(const FileMetaData &file_meta_data,
740
0
                                                               ParquetColumnSchema &element) {
741
0
  MultiFileColumnDefinition result(element.name, element.type);
742
0
  if (element.schema_type == ParquetColumnSchemaType::FILE_ROW_NUMBER) {
743
0
    result.identifier = Value::INTEGER(MultiFileReader::ORDINAL_FIELD_ID);
744
0
    return result;
745
0
  }
746
0
  auto &column_schema = file_meta_data.schema[element.schema_index];
747
748
0
  if (column_schema.__isset.field_id) {
749
0
    result.identifier = Value::INTEGER(column_schema.field_id);
750
0
  } else if (element.parent_schema_index.IsValid()) {
751
0
    auto &parent_column_schema = file_meta_data.schema[element.parent_schema_index.GetIndex()];
752
0
    if (parent_column_schema.__isset.field_id) {
753
0
      result.identifier = Value::INTEGER(parent_column_schema.field_id);
754
0
    }
755
0
  }
756
0
  for (auto &child : element.children) {
757
0
    result.children.push_back(ParseColumnDefinition(file_meta_data, child));
758
0
  }
759
0
  return result;
760
0
}
761
762
0
void ParquetReader::InitializeSchema(ClientContext &context) {
763
0
  auto file_meta_data = GetFileMetadata();
764
765
0
  if (file_meta_data->__isset.encryption_algorithm) {
766
0
    if (file_meta_data->encryption_algorithm.__isset.AES_GCM_CTR_V1) {
767
0
      throw InvalidInputException("File '%s' is encrypted with AES_GCM_CTR_V1, but only AES_GCM_V1 is supported",
768
0
                                  GetFileName());
769
0
    }
770
0
  }
771
  // check if we like this schema
772
0
  if (file_meta_data->schema.size() < 2) {
773
0
    throw InvalidInputException("Failed to read Parquet file '%s': Need at least one non-root column in the file",
774
0
                                GetFileName());
775
0
  }
776
0
  root_schema = ParseSchema(context);
777
0
  for (idx_t i = 0; i < root_schema->children.size(); i++) {
778
0
    auto &element = root_schema->children[i];
779
0
    columns.push_back(ParseColumnDefinition(*file_meta_data, element));
780
0
  }
781
0
}
782
783
0
void ParquetReader::AddVirtualColumn(column_t virtual_column_id) {
784
0
  if (virtual_column_id == MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER) {
785
0
    root_schema->children.push_back(FileRowNumberSchema());
786
0
  } else {
787
0
    throw InternalException("Unsupported virtual column id %d for parquet reader", virtual_column_id);
788
0
  }
789
0
}
790
791
0
ParquetOptions::ParquetOptions(ClientContext &context) {
792
0
  Value lookup_value;
793
0
  if (context.TryGetCurrentSetting("binary_as_string", lookup_value)) {
794
0
    binary_as_string = lookup_value.GetValue<bool>();
795
0
  }
796
0
}
797
798
0
ParquetColumnDefinition ParquetColumnDefinition::FromSchemaValue(ClientContext &context, const Value &column_value) {
799
0
  ParquetColumnDefinition result;
800
0
  auto &identifier = StructValue::GetChildren(column_value)[0];
801
0
  result.identifier = identifier;
802
803
0
  const auto &column_def = StructValue::GetChildren(column_value)[1];
804
0
  D_ASSERT(column_def.type().id() == LogicalTypeId::STRUCT);
805
806
0
  const auto children = StructValue::GetChildren(column_def);
807
0
  result.name = StringValue::Get(children[0]);
808
0
  result.type = TransformStringToLogicalType(StringValue::Get(children[1]));
809
0
  string error_message;
810
0
  if (!children[2].TryCastAs(context, result.type, result.default_value, &error_message)) {
811
0
    throw BinderException("Unable to cast Parquet schema default_value \"%s\" to %s", children[2].ToString(),
812
0
                          result.type.ToString());
813
0
  }
814
815
0
  return result;
816
0
}
817
818
ParquetReader::ParquetReader(ClientContext &context_p, OpenFileInfo file_p, ParquetOptions parquet_options_p,
819
                             shared_ptr<ParquetFileMetadataCache> metadata_p)
820
0
    : BaseFileReader(std::move(file_p)), fs(CachingFileSystem::Get(context_p)),
821
0
      allocator(BufferAllocator::Get(context_p)), parquet_options(std::move(parquet_options_p)) {
822
0
  file_handle = fs.OpenFile(context_p, file, FileFlags::FILE_FLAGS_READ);
823
0
  if (!file_handle->CanSeek()) {
824
0
    throw NotImplementedException(
825
0
        "Reading parquet files from a FIFO stream is not supported and cannot be efficiently supported since "
826
0
        "metadata is located at the end of the file. Write the stream to disk first and read from there instead.");
827
0
  }
828
829
  // read the extended file open info (if any)
830
0
  optional_idx footer_size;
831
0
  if (file.extended_info) {
832
0
    auto &open_options = file.extended_info->options;
833
0
    auto encryption_entry = file.extended_info->options.find("encryption_key");
834
0
    if (encryption_entry != open_options.end()) {
835
0
      parquet_options.encryption_config =
836
0
          make_shared_ptr<ParquetEncryptionConfig>(StringValue::Get(encryption_entry->second));
837
0
    }
838
0
    auto footer_entry = file.extended_info->options.find("footer_size");
839
0
    if (footer_entry != open_options.end()) {
840
0
      footer_size = UBigIntValue::Get(footer_entry->second);
841
0
    }
842
0
  }
843
  // set pointer to factory method for AES state
844
0
  auto &config = DBConfig::GetConfig(context_p);
845
0
  if (config.encryption_util && parquet_options.debug_use_openssl) {
846
0
    encryption_util = config.encryption_util;
847
0
  } else {
848
0
    encryption_util = make_shared_ptr<duckdb_mbedtls::MbedTlsWrapper::AESStateMBEDTLSFactory>();
849
0
  }
850
  // If metadata cached is disabled
851
  // or if this file has cached metadata
852
  // or if the cached version already expired
853
0
  if (!metadata_p) {
854
0
    if (!MetadataCacheEnabled(context_p)) {
855
0
      metadata = LoadMetadata(context_p, allocator, *file_handle, parquet_options.encryption_config,
856
0
                              *encryption_util, footer_size);
857
0
    } else {
858
0
      metadata = ObjectCache::GetObjectCache(context_p).Get<ParquetFileMetadataCache>(file.path);
859
0
      if (!metadata || !metadata->IsValid(*file_handle)) {
860
0
        metadata = LoadMetadata(context_p, allocator, *file_handle, parquet_options.encryption_config,
861
0
                                *encryption_util, footer_size);
862
0
        ObjectCache::GetObjectCache(context_p).Put(file.path, metadata);
863
0
      }
864
0
    }
865
0
  } else {
866
0
    metadata = std::move(metadata_p);
867
0
  }
868
0
  InitializeSchema(context_p);
869
0
}
870
871
0
bool ParquetReader::MetadataCacheEnabled(ClientContext &context) {
872
0
  Value metadata_cache = false;
873
0
  context.TryGetCurrentSetting("parquet_metadata_cache", metadata_cache);
874
0
  return metadata_cache.GetValue<bool>();
875
0
}
876
877
shared_ptr<ParquetFileMetadataCache> ParquetReader::GetMetadataCacheEntry(ClientContext &context,
878
0
                                                                          const OpenFileInfo &file) {
879
0
  return ObjectCache::GetObjectCache(context).Get<ParquetFileMetadataCache>(file.path);
880
0
}
881
882
0
ParquetUnionData::~ParquetUnionData() {
883
0
}
884
885
0
unique_ptr<BaseStatistics> ParquetUnionData::GetStatistics(ClientContext &context, const string &name) {
886
0
  if (reader) {
887
0
    return reader->Cast<ParquetReader>().GetStatistics(context, name);
888
0
  }
889
0
  return ParquetReader::ReadStatistics(*this, name);
890
0
}
891
892
ParquetReader::ParquetReader(ClientContext &context_p, ParquetOptions parquet_options_p,
893
                             shared_ptr<ParquetFileMetadataCache> metadata_p)
894
0
    : BaseFileReader(string()), fs(CachingFileSystem::Get(context_p)), allocator(BufferAllocator::Get(context_p)),
895
0
      metadata(std::move(metadata_p)), parquet_options(std::move(parquet_options_p)), rows_read(0) {
896
0
  InitializeSchema(context_p);
897
0
}
898
899
0
ParquetReader::~ParquetReader() {
900
0
}
901
902
0
const FileMetaData *ParquetReader::GetFileMetadata() const {
903
0
  D_ASSERT(metadata);
904
0
  D_ASSERT(metadata->metadata);
905
0
  return metadata->metadata.get();
906
0
}
907
908
static unique_ptr<BaseStatistics> ReadStatisticsInternal(const FileMetaData &file_meta_data,
909
                                                         const ParquetColumnSchema &root_schema,
910
                                                         const ParquetOptions &parquet_options,
911
0
                                                         const idx_t &file_col_idx) {
912
0
  unique_ptr<BaseStatistics> column_stats;
913
0
  auto &column_schema = root_schema.children[file_col_idx];
914
915
0
  for (idx_t row_group_idx = 0; row_group_idx < file_meta_data.row_groups.size(); row_group_idx++) {
916
0
    auto &row_group = file_meta_data.row_groups[row_group_idx];
917
0
    auto chunk_stats = column_schema.Stats(file_meta_data, parquet_options, row_group_idx, row_group.columns);
918
0
    if (!chunk_stats) {
919
0
      return nullptr;
920
0
    }
921
0
    if (!column_stats) {
922
0
      column_stats = std::move(chunk_stats);
923
0
    } else {
924
0
      column_stats->Merge(*chunk_stats);
925
0
    }
926
0
  }
927
0
  return column_stats;
928
0
}
929
930
0
unique_ptr<BaseStatistics> ParquetReader::ReadStatistics(const string &name) {
931
0
  idx_t file_col_idx;
932
0
  for (file_col_idx = 0; file_col_idx < columns.size(); file_col_idx++) {
933
0
    if (columns[file_col_idx].name == name) {
934
0
      break;
935
0
    }
936
0
  }
937
0
  if (file_col_idx == columns.size()) {
938
0
    return nullptr;
939
0
  }
940
941
0
  return ReadStatisticsInternal(*GetFileMetadata(), *root_schema, parquet_options, file_col_idx);
942
0
}
943
944
unique_ptr<BaseStatistics> ParquetReader::ReadStatistics(ClientContext &context, ParquetOptions parquet_options,
945
                                                         shared_ptr<ParquetFileMetadataCache> metadata,
946
0
                                                         const string &name) {
947
0
  ParquetReader reader(context, std::move(parquet_options), std::move(metadata));
948
0
  return reader.ReadStatistics(name);
949
0
}
950
951
0
unique_ptr<BaseStatistics> ParquetReader::ReadStatistics(const ParquetUnionData &union_data, const string &name) {
952
0
  const auto &col_names = union_data.names;
953
954
0
  idx_t file_col_idx;
955
0
  for (file_col_idx = 0; file_col_idx < col_names.size(); file_col_idx++) {
956
0
    if (col_names[file_col_idx] == name) {
957
0
      break;
958
0
    }
959
0
  }
960
0
  if (file_col_idx == col_names.size()) {
961
0
    return nullptr;
962
0
  }
963
964
0
  return ReadStatisticsInternal(*union_data.metadata->metadata, *union_data.root_schema, union_data.options,
965
0
                                file_col_idx);
966
0
}
967
968
0
uint32_t ParquetReader::Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) {
969
0
  if (parquet_options.encryption_config) {
970
0
    return ParquetCrypto::Read(object, iprot, parquet_options.encryption_config->GetFooterKey(), *encryption_util);
971
0
  } else {
972
0
    return object.read(&iprot);
973
0
  }
974
0
}
975
976
uint32_t ParquetReader::ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer,
977
0
                                 const uint32_t buffer_size) {
978
0
  if (parquet_options.encryption_config) {
979
0
    return ParquetCrypto::ReadData(iprot, buffer, buffer_size, parquet_options.encryption_config->GetFooterKey(),
980
0
                                   *encryption_util);
981
0
  } else {
982
0
    return iprot.getTransport()->read(buffer, buffer_size);
983
0
  }
984
0
}
985
986
0
static idx_t GetRowGroupOffset(ParquetReader &reader, idx_t group_idx) {
987
0
  idx_t row_group_offset = 0;
988
0
  auto &row_groups = reader.GetFileMetadata()->row_groups;
989
0
  for (idx_t i = 0; i < group_idx; i++) {
990
0
    row_group_offset += row_groups[i].num_rows;
991
0
  }
992
0
  return row_group_offset;
993
0
}
994
995
0
const ParquetRowGroup &ParquetReader::GetGroup(ParquetReaderScanState &state) {
996
0
  auto file_meta_data = GetFileMetadata();
997
0
  D_ASSERT(state.current_group >= 0 && (idx_t)state.current_group < state.group_idx_list.size());
998
0
  D_ASSERT(state.group_idx_list[state.current_group] < file_meta_data->row_groups.size());
999
0
  return file_meta_data->row_groups[state.group_idx_list[state.current_group]];
1000
0
}
1001
1002
0
uint64_t ParquetReader::GetGroupCompressedSize(ParquetReaderScanState &state) {
1003
0
  const auto &group = GetGroup(state);
1004
0
  int64_t total_compressed_size = group.__isset.total_compressed_size ? group.total_compressed_size : 0;
1005
1006
0
  idx_t calc_compressed_size = 0;
1007
1008
  // If the global total_compressed_size is not set, we can still calculate it
1009
0
  if (group.total_compressed_size == 0) {
1010
0
    for (auto &column_chunk : group.columns) {
1011
0
      calc_compressed_size += column_chunk.meta_data.total_compressed_size;
1012
0
    }
1013
0
  }
1014
1015
0
  if (total_compressed_size != 0 && calc_compressed_size != 0 &&
1016
0
      (idx_t)total_compressed_size != calc_compressed_size) {
1017
0
    throw InvalidInputException(
1018
0
        "Failed to read file \"%s\": mismatch between calculated compressed size and reported compressed size",
1019
0
        GetFileName());
1020
0
  }
1021
1022
0
  return total_compressed_size ? total_compressed_size : calc_compressed_size;
1023
0
}
1024
1025
0
uint64_t ParquetReader::GetGroupSpan(ParquetReaderScanState &state) {
1026
0
  auto &group = GetGroup(state);
1027
0
  idx_t min_offset = NumericLimits<idx_t>::Maximum();
1028
0
  idx_t max_offset = NumericLimits<idx_t>::Minimum();
1029
1030
0
  for (auto &column_chunk : group.columns) {
1031
    // Set the min offset
1032
0
    idx_t current_min_offset = NumericLimits<idx_t>::Maximum();
1033
0
    if (column_chunk.meta_data.__isset.dictionary_page_offset) {
1034
0
      current_min_offset = MinValue<idx_t>(current_min_offset, column_chunk.meta_data.dictionary_page_offset);
1035
0
    }
1036
0
    if (column_chunk.meta_data.__isset.index_page_offset) {
1037
0
      current_min_offset = MinValue<idx_t>(current_min_offset, column_chunk.meta_data.index_page_offset);
1038
0
    }
1039
0
    current_min_offset = MinValue<idx_t>(current_min_offset, column_chunk.meta_data.data_page_offset);
1040
0
    min_offset = MinValue<idx_t>(current_min_offset, min_offset);
1041
0
    max_offset = MaxValue<idx_t>(max_offset, column_chunk.meta_data.total_compressed_size + current_min_offset);
1042
0
  }
1043
1044
0
  return max_offset - min_offset;
1045
0
}
1046
1047
0
idx_t ParquetReader::GetGroupOffset(ParquetReaderScanState &state) {
1048
0
  auto &group = GetGroup(state);
1049
0
  idx_t min_offset = NumericLimits<idx_t>::Maximum();
1050
1051
0
  for (auto &column_chunk : group.columns) {
1052
0
    if (column_chunk.meta_data.__isset.dictionary_page_offset) {
1053
0
      min_offset = MinValue<idx_t>(min_offset, column_chunk.meta_data.dictionary_page_offset);
1054
0
    }
1055
0
    if (column_chunk.meta_data.__isset.index_page_offset) {
1056
0
      min_offset = MinValue<idx_t>(min_offset, column_chunk.meta_data.index_page_offset);
1057
0
    }
1058
0
    min_offset = MinValue<idx_t>(min_offset, column_chunk.meta_data.data_page_offset);
1059
0
  }
1060
1061
0
  return min_offset;
1062
0
}
1063
1064
static FilterPropagateResult CheckParquetStringFilter(BaseStatistics &stats, const Statistics &pq_col_stats,
1065
0
                                                      TableFilter &filter) {
1066
0
  switch (filter.filter_type) {
1067
0
  case TableFilterType::CONJUNCTION_AND: {
1068
0
    auto &conjunction_filter = filter.Cast<ConjunctionAndFilter>();
1069
0
    auto and_result = FilterPropagateResult::FILTER_ALWAYS_TRUE;
1070
0
    for (auto &child_filter : conjunction_filter.child_filters) {
1071
0
      auto child_prune_result = CheckParquetStringFilter(stats, pq_col_stats, *child_filter);
1072
0
      if (child_prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
1073
0
        return FilterPropagateResult::FILTER_ALWAYS_FALSE;
1074
0
      }
1075
0
      if (child_prune_result != and_result) {
1076
0
        and_result = FilterPropagateResult::NO_PRUNING_POSSIBLE;
1077
0
      }
1078
0
    }
1079
0
    return and_result;
1080
0
  }
1081
0
  case TableFilterType::CONSTANT_COMPARISON: {
1082
0
    auto &constant_filter = filter.Cast<ConstantFilter>();
1083
0
    auto &min_value = pq_col_stats.min_value;
1084
0
    auto &max_value = pq_col_stats.max_value;
1085
0
    return StringStats::CheckZonemap(const_data_ptr_cast(min_value.c_str()), min_value.size(),
1086
0
                                     const_data_ptr_cast(max_value.c_str()), max_value.size(),
1087
0
                                     constant_filter.comparison_type, StringValue::Get(constant_filter.constant));
1088
0
  }
1089
0
  default:
1090
0
    return filter.CheckStatistics(stats);
1091
0
  }
1092
0
}
1093
1094
static FilterPropagateResult CheckParquetFloatFilter(ColumnReader &reader, const Statistics &pq_col_stats,
1095
0
                                                     TableFilter &filter) {
1096
  // floating point values can have values in the [min, max] domain AND nan values
1097
  // check both stats against the filter
1098
0
  auto &type = reader.Type();
1099
0
  auto nan_stats = NumericStats::CreateUnknown(type);
1100
0
  auto nan_value = Value("nan").DefaultCastAs(type);
1101
0
  NumericStats::SetMin(nan_stats, nan_value);
1102
0
  NumericStats::SetMax(nan_stats, nan_value);
1103
0
  auto nan_prune = filter.CheckStatistics(nan_stats);
1104
1105
0
  auto min_max_stats = ParquetStatisticsUtils::CreateNumericStats(reader.Type(), reader.Schema(), pq_col_stats);
1106
0
  auto prune = filter.CheckStatistics(*min_max_stats);
1107
1108
  // if EITHER of them cannot be pruned - we cannot prune
1109
0
  if (prune == FilterPropagateResult::NO_PRUNING_POSSIBLE ||
1110
0
      nan_prune == FilterPropagateResult::NO_PRUNING_POSSIBLE) {
1111
0
    return FilterPropagateResult::NO_PRUNING_POSSIBLE;
1112
0
  }
1113
  // if both are the same we can return that value
1114
0
  if (prune == nan_prune) {
1115
0
    return prune;
1116
0
  }
1117
  // if they are different we need to return that we cannot prune
1118
  // e.g. prune = always false, nan_prune = always true -> we don't know
1119
0
  return FilterPropagateResult::NO_PRUNING_POSSIBLE;
1120
0
}
1121
1122
0
void ParquetReader::PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t i) {
1123
0
  auto &group = GetGroup(state);
1124
0
  auto col_idx = MultiFileLocalIndex(i);
1125
0
  auto column_id = column_ids[col_idx];
1126
0
  auto &column_reader = state.root_reader->Cast<StructColumnReader>().GetChildReader(column_id);
1127
1128
0
  if (filters) {
1129
0
    auto stats = column_reader.Stats(state.group_idx_list[state.current_group], group.columns);
1130
    // filters contain output chunk index, not file col idx!
1131
0
    auto filter_entry = filters->filters.find(col_idx);
1132
0
    if (stats && filter_entry != filters->filters.end()) {
1133
0
      auto &filter = *filter_entry->second;
1134
1135
0
      FilterPropagateResult prune_result;
1136
0
      bool is_generated_column = column_reader.ColumnIndex() >= group.columns.size();
1137
0
      bool is_column = column_reader.Schema().schema_type == ParquetColumnSchemaType::COLUMN;
1138
0
      bool is_expression = column_reader.Schema().schema_type == ParquetColumnSchemaType::EXPRESSION;
1139
0
      bool has_min_max = false;
1140
0
      if (!is_generated_column) {
1141
0
        has_min_max = group.columns[column_reader.ColumnIndex()].meta_data.statistics.__isset.min_value &&
1142
0
                      group.columns[column_reader.ColumnIndex()].meta_data.statistics.__isset.max_value;
1143
0
      }
1144
0
      if (is_expression) {
1145
        // no pruning possible for expressions
1146
0
        prune_result = FilterPropagateResult::NO_PRUNING_POSSIBLE;
1147
0
      } else if (!is_generated_column && has_min_max && column_reader.Type().id() == LogicalTypeId::VARCHAR) {
1148
        // our StringStats only store the first 8 bytes of strings (even if Parquet has longer string stats)
1149
        // however, when reading remote Parquet files, skipping row groups is really important
1150
        // here, we implement a special case to check the full length for string filters
1151
0
        prune_result = CheckParquetStringFilter(
1152
0
            *stats, group.columns[column_reader.ColumnIndex()].meta_data.statistics, filter);
1153
0
      } else if (!is_generated_column && has_min_max &&
1154
0
                 (column_reader.Type().id() == LogicalTypeId::FLOAT ||
1155
0
                  column_reader.Type().id() == LogicalTypeId::DOUBLE) &&
1156
0
                 parquet_options.can_have_nan) {
1157
        // floating point columns can have NaN values in addition to the min/max bounds defined in the file
1158
        // in order to do optimal pruning - we prune based on the [min, max] of the file followed by pruning
1159
        // based on nan
1160
0
        prune_result = CheckParquetFloatFilter(
1161
0
            column_reader, group.columns[column_reader.ColumnIndex()].meta_data.statistics, filter);
1162
0
      } else {
1163
0
        prune_result = filter.CheckStatistics(*stats);
1164
0
      }
1165
      // check the bloom filter if present
1166
0
      if (prune_result == FilterPropagateResult::NO_PRUNING_POSSIBLE && !column_reader.Type().IsNested() &&
1167
0
          is_column && ParquetStatisticsUtils::BloomFilterSupported(column_reader.Type().id()) &&
1168
0
          ParquetStatisticsUtils::BloomFilterExcludes(filter,
1169
0
                                                      group.columns[column_reader.ColumnIndex()].meta_data,
1170
0
                                                      *state.thrift_file_proto, allocator)) {
1171
0
        prune_result = FilterPropagateResult::FILTER_ALWAYS_FALSE;
1172
0
      }
1173
1174
0
      if (prune_result == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
1175
        // this effectively will skip this chunk
1176
0
        state.offset_in_group = group.num_rows;
1177
0
        return;
1178
0
      }
1179
0
    }
1180
0
  }
1181
1182
0
  state.root_reader->InitializeRead(state.group_idx_list[state.current_group], group.columns,
1183
0
                                    *state.thrift_file_proto);
1184
0
}
1185
1186
0
idx_t ParquetReader::NumRows() const {
1187
0
  return GetFileMetadata()->num_rows;
1188
0
}
1189
1190
0
idx_t ParquetReader::NumRowGroups() const {
1191
0
  return GetFileMetadata()->row_groups.size();
1192
0
}
1193
1194
ParquetScanFilter::ParquetScanFilter(ClientContext &context, idx_t filter_idx, TableFilter &filter)
1195
0
    : filter_idx(filter_idx), filter(filter) {
1196
0
  filter_state = TableFilterState::Initialize(context, filter);
1197
0
}
1198
1199
0
ParquetScanFilter::~ParquetScanFilter() {
1200
0
}
1201
1202
void ParquetReader::InitializeScan(ClientContext &context, ParquetReaderScanState &state,
1203
0
                                   vector<idx_t> groups_to_read) {
1204
0
  state.current_group = -1;
1205
0
  state.finished = false;
1206
0
  state.offset_in_group = 0;
1207
0
  state.group_idx_list = std::move(groups_to_read);
1208
0
  state.sel.Initialize(STANDARD_VECTOR_SIZE);
1209
0
  if (!state.file_handle || state.file_handle->GetPath() != file_handle->GetPath()) {
1210
0
    auto flags = FileFlags::FILE_FLAGS_READ;
1211
0
    if (ShouldAndCanPrefetch(context, *file_handle)) {
1212
0
      state.prefetch_mode = true;
1213
0
      if (file_handle->IsRemoteFile()) {
1214
0
        flags |= FileFlags::FILE_FLAGS_DIRECT_IO;
1215
0
      }
1216
0
    } else {
1217
0
      state.prefetch_mode = false;
1218
0
    }
1219
1220
0
    state.file_handle = fs.OpenFile(context, file, flags);
1221
0
  }
1222
0
  state.adaptive_filter.reset();
1223
0
  state.scan_filters.clear();
1224
0
  if (filters) {
1225
0
    state.adaptive_filter = make_uniq<AdaptiveFilter>(*filters);
1226
0
    for (auto &entry : filters->filters) {
1227
0
      state.scan_filters.emplace_back(context, entry.first, *entry.second);
1228
0
    }
1229
0
  }
1230
1231
0
  state.thrift_file_proto = CreateThriftFileProtocol(context, *state.file_handle, state.prefetch_mode);
1232
0
  state.root_reader = CreateReader(context);
1233
0
  state.define_buf.resize(allocator, STANDARD_VECTOR_SIZE);
1234
0
  state.repeat_buf.resize(allocator, STANDARD_VECTOR_SIZE);
1235
0
}
1236
1237
0
void ParquetReader::GetPartitionStats(vector<PartitionStatistics> &result) {
1238
0
  GetPartitionStats(*GetFileMetadata(), result);
1239
0
}
1240
1241
void ParquetReader::GetPartitionStats(const duckdb_parquet::FileMetaData &metadata,
1242
0
                                      vector<PartitionStatistics> &result) {
1243
0
  idx_t offset = 0;
1244
0
  for (auto &row_group : metadata.row_groups) {
1245
0
    PartitionStatistics partition_stats;
1246
0
    partition_stats.row_start = offset;
1247
0
    partition_stats.count = row_group.num_rows;
1248
0
    partition_stats.count_type = CountType::COUNT_EXACT;
1249
0
    offset += row_group.num_rows;
1250
0
    result.push_back(partition_stats);
1251
0
  }
1252
0
}
1253
1254
0
AsyncResult ParquetReader::Scan(ClientContext &context, ParquetReaderScanState &state, DataChunk &result) {
1255
0
  result.Reset();
1256
0
  if (state.finished) {
1257
0
    return SourceResultType::FINISHED;
1258
0
  }
1259
1260
  // see if we have to switch to the next row group in the parquet file
1261
0
  if (state.current_group < 0 || (int64_t)state.offset_in_group >= GetGroup(state).num_rows) {
1262
0
    state.current_group++;
1263
0
    state.offset_in_group = 0;
1264
1265
0
    auto &trans = reinterpret_cast<ThriftFileTransport &>(*state.thrift_file_proto->getTransport());
1266
0
    trans.ClearPrefetch();
1267
0
    state.current_group_prefetched = false;
1268
1269
0
    if ((idx_t)state.current_group == state.group_idx_list.size()) {
1270
0
      state.finished = true;
1271
0
      return SourceResultType::FINISHED;
1272
0
    }
1273
1274
    // TODO: only need this if we have a deletion vector?
1275
0
    state.group_offset = GetRowGroupOffset(state.root_reader->Reader(), state.group_idx_list[state.current_group]);
1276
1277
0
    uint64_t to_scan_compressed_bytes = 0;
1278
0
    for (idx_t i = 0; i < column_ids.size(); i++) {
1279
0
      auto col_idx = MultiFileLocalIndex(i);
1280
0
      PrepareRowGroupBuffer(state, col_idx);
1281
1282
0
      auto file_col_idx = column_ids[col_idx];
1283
1284
0
      auto &root_reader = state.root_reader->Cast<StructColumnReader>();
1285
0
      to_scan_compressed_bytes += root_reader.GetChildReader(file_col_idx).TotalCompressedSize();
1286
0
    }
1287
1288
0
    auto &group = GetGroup(state);
1289
0
    if (state.op) {
1290
0
      DUCKDB_LOG(context, PhysicalOperatorLogType, *state.op, "ParquetReader",
1291
0
                 state.offset_in_group == (idx_t)group.num_rows ? "SkipRowGroup" : "ReadRowGroup",
1292
0
                 {{"file", file.path}, {"row_group_id", to_string(state.group_idx_list[state.current_group])}});
1293
0
    }
1294
1295
0
    if (state.prefetch_mode && state.offset_in_group != (idx_t)group.num_rows) {
1296
0
      uint64_t total_row_group_span = GetGroupSpan(state);
1297
1298
0
      double scan_percentage = (double)(to_scan_compressed_bytes) / static_cast<double>(total_row_group_span);
1299
1300
0
      if (to_scan_compressed_bytes > total_row_group_span) {
1301
0
        throw IOException(
1302
0
            "The parquet file '%s' seems to have incorrectly set page offsets. This interferes with DuckDB's "
1303
0
            "prefetching optimization. DuckDB may still be able to scan this file by manually disabling the "
1304
0
            "prefetching mechanism using: 'SET disable_parquet_prefetching=true'.",
1305
0
            GetFileName());
1306
0
      }
1307
1308
0
      if (!filters && scan_percentage > ParquetReaderPrefetchConfig::WHOLE_GROUP_PREFETCH_MINIMUM_SCAN) {
1309
        // Prefetch the whole row group
1310
0
        if (!state.current_group_prefetched) {
1311
0
          auto total_compressed_size = GetGroupCompressedSize(state);
1312
0
          if (total_compressed_size > 0) {
1313
0
            trans.Prefetch(GetGroupOffset(state), total_row_group_span);
1314
0
          }
1315
0
          state.current_group_prefetched = true;
1316
0
        }
1317
0
      } else {
1318
        // lazy fetching is when all tuples in a column can be skipped. With lazy fetching the buffer is only
1319
        // fetched on the first read to that buffer.
1320
0
        bool lazy_fetch = filters != nullptr;
1321
1322
        // Prefetch column-wise
1323
0
        for (idx_t i = 0; i < column_ids.size(); i++) {
1324
0
          auto col_idx = MultiFileLocalIndex(i);
1325
0
          auto file_col_idx = column_ids[col_idx];
1326
0
          auto &root_reader = state.root_reader->Cast<StructColumnReader>();
1327
1328
0
          bool has_filter = false;
1329
0
          if (filters) {
1330
0
            auto entry = filters->filters.find(col_idx);
1331
0
            has_filter = entry != filters->filters.end();
1332
0
          }
1333
0
          root_reader.GetChildReader(file_col_idx).RegisterPrefetch(trans, !(lazy_fetch && !has_filter));
1334
0
        }
1335
1336
0
        trans.FinalizeRegistration();
1337
1338
0
        if (!lazy_fetch) {
1339
0
          trans.PrefetchRegistered();
1340
0
        }
1341
0
      }
1342
0
    }
1343
0
    result.Reset();
1344
0
    return SourceResultType::HAVE_MORE_OUTPUT;
1345
0
  }
1346
1347
0
  auto scan_count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, GetGroup(state).num_rows - state.offset_in_group);
1348
0
  result.SetCardinality(scan_count);
1349
1350
0
  if (scan_count == 0) {
1351
0
    state.finished = true;
1352
    // end of last group, we are done
1353
0
    return SourceResultType::FINISHED;
1354
0
  }
1355
1356
0
  auto &deletion_filter = state.root_reader->Reader().deletion_filter;
1357
1358
0
  state.define_buf.zero();
1359
0
  state.repeat_buf.zero();
1360
1361
0
  auto define_ptr = (uint8_t *)state.define_buf.ptr;
1362
0
  auto repeat_ptr = (uint8_t *)state.repeat_buf.ptr;
1363
1364
0
  auto &root_reader = state.root_reader->Cast<StructColumnReader>();
1365
1366
0
  if (filters || deletion_filter) {
1367
0
    idx_t filter_count = result.size();
1368
0
    D_ASSERT(filter_count == scan_count);
1369
0
    vector<bool> need_to_read(column_ids.size(), true);
1370
1371
0
    state.sel.Initialize(nullptr);
1372
0
    D_ASSERT(!filters || state.scan_filters.size() == filters->filters.size());
1373
1374
0
    bool is_first_filter = true;
1375
0
    if (deletion_filter) {
1376
0
      auto row_start = UnsafeNumericCast<row_t>(state.offset_in_group + state.group_offset);
1377
0
      filter_count = deletion_filter->Filter(row_start, scan_count, state.sel);
1378
      //! FIXME: does this need to be set?
1379
      //! As part of 'DirectFilter' we also initialize reads of the child readers
1380
0
      is_first_filter = false;
1381
0
    }
1382
1383
0
    if (filters) {
1384
      // first load the columns that are used in filters
1385
0
      auto filter_state = state.adaptive_filter->BeginFilter();
1386
0
      for (idx_t i = 0; i < state.scan_filters.size(); i++) {
1387
0
        if (filter_count == 0) {
1388
          // if no rows are left we can stop checking filters
1389
0
          break;
1390
0
        }
1391
0
        auto &scan_filter = state.scan_filters[state.adaptive_filter->permutation[i]];
1392
0
        auto local_idx = MultiFileLocalIndex(scan_filter.filter_idx);
1393
0
        auto column_id = column_ids[local_idx];
1394
1395
0
        auto &result_vector = result.data[local_idx.GetIndex()];
1396
0
        auto &child_reader = root_reader.GetChildReader(column_id);
1397
0
        child_reader.Filter(scan_count, define_ptr, repeat_ptr, result_vector, scan_filter.filter,
1398
0
                            *scan_filter.filter_state, state.sel, filter_count, is_first_filter);
1399
0
        need_to_read[local_idx.GetIndex()] = false;
1400
0
        is_first_filter = false;
1401
0
      }
1402
0
      state.adaptive_filter->EndFilter(filter_state);
1403
0
    }
1404
1405
    // we still may have to read some cols
1406
0
    for (idx_t i = 0; i < column_ids.size(); i++) {
1407
0
      auto col_idx = MultiFileLocalIndex(i);
1408
0
      if (!need_to_read[col_idx]) {
1409
0
        continue;
1410
0
      }
1411
0
      auto file_col_idx = column_ids[col_idx];
1412
0
      if (filter_count == 0) {
1413
0
        root_reader.GetChildReader(file_col_idx).Skip(result.size());
1414
0
        continue;
1415
0
      }
1416
0
      auto &result_vector = result.data[i];
1417
0
      auto &child_reader = root_reader.GetChildReader(file_col_idx);
1418
0
      child_reader.Select(result.size(), define_ptr, repeat_ptr, result_vector, state.sel, filter_count);
1419
0
    }
1420
0
    if (scan_count != filter_count) {
1421
0
      result.Slice(state.sel, filter_count);
1422
0
    }
1423
0
  } else {
1424
0
    for (idx_t i = 0; i < column_ids.size(); i++) {
1425
0
      auto col_idx = MultiFileLocalIndex(i);
1426
0
      auto file_col_idx = column_ids[col_idx];
1427
0
      auto &result_vector = result.data[i];
1428
0
      auto &child_reader = root_reader.GetChildReader(file_col_idx);
1429
0
      auto rows_read = child_reader.Read(scan_count, define_ptr, repeat_ptr, result_vector);
1430
0
      if (rows_read != scan_count) {
1431
0
        throw InvalidInputException("Mismatch in parquet read for column %llu, expected %llu rows, got %llu",
1432
0
                                    file_col_idx, scan_count, rows_read);
1433
0
      }
1434
0
    }
1435
0
  }
1436
1437
0
  rows_read += scan_count;
1438
0
  state.offset_in_group += scan_count;
1439
0
  return SourceResultType::HAVE_MORE_OUTPUT;
1440
0
}
1441
1442
} // namespace duckdb