Coverage Report

Created: 2026-06-30 06:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/extension/parquet/include/parquet_reader.hpp
Line
Count
Source
1
//===----------------------------------------------------------------------===//
2
//                         DuckDB
3
//
4
// parquet_reader.hpp
5
//
6
//
7
//===----------------------------------------------------------------------===//
8
9
#pragma once
10
11
#include <stdint.h>
12
#include <exception>
13
#include <atomic>
14
#include <memory>
15
#include <string>
16
#include <utility>
17
18
#include "duckdb.hpp"
19
#include "duckdb/common/enum_util.hpp"
20
#include "duckdb/common/helper.hpp"
21
#include "duckdb/storage/external_file_cache/caching_file_system.hpp"
22
#include "duckdb/common/common.hpp"
23
#include "duckdb/common/encryption_functions.hpp"
24
#include "duckdb/common/encryption_state.hpp"
25
#include "duckdb/common/exception.hpp"
26
#include "duckdb/common/multi_file/base_file_reader.hpp"
27
#include "duckdb/common/multi_file/multi_file_adaptive_filter_cache.hpp"
28
#include "duckdb/common/multi_file/multi_file_options.hpp"
29
#include "duckdb/common/string_util.hpp"
30
#include "duckdb/common/types/data_chunk.hpp"
31
#include "column_reader.hpp"
32
#include "parquet_prefetch_cost_model.hpp"
33
#include "parquet_file_metadata_cache.hpp"
34
#include "parquet_rle_bp_decoder.hpp"
35
#include "parquet_types.h"
36
#include "resizable_buffer.hpp"
37
#include "duckdb/execution/adaptive_filter.hpp"
38
#include "duckdb/common/column_index.hpp"
39
#include "duckdb/common/multi_file/multi_file_data.hpp"
40
#include "duckdb/common/open_file_info.hpp"
41
#include "duckdb/common/optional_idx.hpp"
42
#include "duckdb/common/optional_ptr.hpp"
43
#include "duckdb/common/projection_index.hpp"
44
#include "duckdb/common/shared_ptr_ipp.hpp"
45
#include "duckdb/common/string.hpp"
46
#include "duckdb/common/typedefs.hpp"
47
#include "duckdb/common/types.hpp"
48
#include "duckdb/common/types/selection_vector.hpp"
49
#include "duckdb/common/types/value.hpp"
50
#include "duckdb/common/unique_ptr.hpp"
51
#include "duckdb/common/vector.hpp"
52
#include "duckdb/parallel/async_result.hpp"
53
#include "parquet_column_schema.hpp"
54
#include "thrift/protocol/TProtocol.h"
55
56
namespace duckdb_apache {
57
namespace thrift {
58
class TBase;
59
} // namespace thrift
60
} // namespace duckdb_apache
61
62
namespace duckdb_parquet {
63
class EncryptionAlgorithm;
64
class FileMetaData;
65
class RowGroup;
66
class SchemaElement;
67
68
namespace format {
69
class FileMetaData;
70
}
71
} // namespace duckdb_parquet
72
73
namespace duckdb {
74
class Allocator;
75
class ClientContext;
76
class BaseStatistics;
77
class TableFilterSet;
78
class ParquetEncryptionConfig;
79
class ParquetReader;
80
class DataChunk;
81
class Deserializer;
82
class EncryptionUtil;
83
class PhysicalOperator;
84
class Serializer;
85
class TableFilter;
86
class ThriftFileTransport;
87
struct CryptoMetaData;
88
struct GlobalTableFunctionState;
89
struct LocalTableFunctionState;
90
struct PartitionStatistics;
91
struct TableFilterState;
92
93
struct ParquetReaderPrefetchConfig {
94
  //! Percentage of data in a row group span that should be scanned for enabling whole group prefetch
95
  static constexpr double WHOLE_GROUP_PREFETCH_MINIMUM_SCAN = 0.95;
96
  //! How many row groups need to produce at least one surviving row (from filtering)
97
  static constexpr double PREFETCH_FILTER_MINIMUM_MATCH_RATIO = 0.9;
98
};
99
100
enum class ParquetPrefetchStrategy : uint8_t {
101
  NONE,
102
  WHOLE_GROUP,      //! Prefetches the whole group
103
  PREFETCH_FILTERS, //! Used when we have fully selective filters, so they are prefetched earlier
104
  COLUMN_WISE_EAGER //! Used when we have projections and optional only filters
105
};
106
107
const char *ParquetPrefetchStrategyToString(ParquetPrefetchStrategy strategy);
108
109
enum class ParquetPrefetchStrategyOption : uint8_t {
110
  AUTO,        //! Uses the runtime strategy to pick between ParquetPrefetchStrategy
111
  WHOLE_GROUP, //! Always do the whole row group
112
};
113
114
ParquetPrefetchStrategyOption ParquetPrefetchStrategyOptionFromString(const string &value);
115
116
template <>
117
const char *EnumUtil::ToChars<ParquetPrefetchStrategyOption>(ParquetPrefetchStrategyOption value);
118
119
template <>
120
ParquetPrefetchStrategyOption EnumUtil::FromString<ParquetPrefetchStrategyOption>(const char *value);
121
122
struct ParquetScanFilter {
123
  ParquetScanFilter(ClientContext &context, ProjectionIndex filter_idx, TableFilter &filter);
124
  ~ParquetScanFilter();
125
0
  ParquetScanFilter(ParquetScanFilter &&) = default;
126
127
  ProjectionIndex filter_idx;
128
  TableFilter &filter;
129
  unique_ptr<TableFilterState> filter_state;
130
};
131
132
struct ParquetPrefetchColumn {
133
  string name;
134
  idx_t offset;
135
136
0
  ParquetPrefetchColumn(string name_p, idx_t offset_p) : name(std::move(name_p)), offset(offset_p) {
137
0
  }
138
139
0
  bool operator<(const ParquetPrefetchColumn &other) const {
140
0
    return offset < other.offset;
141
0
  }
142
};
143
//! Logger-only prefetch metrics: populated only when ParquetPrefetch logging is enabled.
144
struct ParquetLoggerPrefetchMetrics {
145
  //! Physical prefetch groups (column-name batches) issued for the current row group, in order
146
  vector<vector<string>> prefetch_groups;
147
  //! Tracks which scan_filters were evaluated in the current row group (indexed by scan_filter position)
148
  vector<bool> filters_used;
149
  //! Prefetch strategy chosen for the current row group
150
  ParquetPrefetchStrategy strategy = ParquetPrefetchStrategy::NONE;
151
  //! Accepted column gap (bytes)
152
  uint64_t accepted_column_gap = 0;
153
154
0
  void Reset() {
155
0
    prefetch_groups.clear();
156
0
    std::fill(filters_used.begin(), filters_used.end(), false);
157
0
    strategy = ParquetPrefetchStrategy::NONE;
158
0
    accepted_column_gap = 0;
159
0
  }
160
161
  //! Build a prefetch group
162
  void GeneratePrefetchGroup(ThriftFileTransport &trans, vector<ParquetPrefetchColumn> &requested_columns,
163
                             const vector<duckdb_parquet::ColumnChunk> &all_chunks);
164
};
165
166
struct ParquetPrefetchMetrics {
167
  //! Whether any filter was evaluated against the current row group
168
  bool filter_ran = false;
169
  //! Whether the current row group produced at least one surviving row after filtering
170
  bool had_match = false;
171
  //! Total number of row groups for which filters were evaluated across the scan
172
  idx_t row_groups_executed = 0;
173
  //! Number of those row groups that produced at least one surviving row
174
  idx_t row_groups_with_matches = 0;
175
176
  ParquetLoggerPrefetchMetrics logger;
177
178
0
  void FinalizeRowGroupSelectivity() {
179
0
    if (filter_ran) {
180
0
      row_groups_executed++;
181
0
      if (had_match) {
182
0
        row_groups_with_matches++;
183
0
      }
184
0
    }
185
0
    filter_ran = false;
186
0
    had_match = false;
187
0
    logger.Reset();
188
0
  }
189
};
190
191
struct ParquetReaderScanState {
192
public:
193
  ColumnReader &GetColumnReader(idx_t i);
194
195
public:
196
  //! The row group index this scan state decodes
197
  idx_t group_index;
198
  idx_t offset_in_group;
199
  idx_t group_offset;
200
  shared_ptr<CachingFileHandle> file_handle;
201
  vector<unique_ptr<ColumnReader>> column_readers;
202
  duckdb_base_std::unique_ptr<duckdb_apache::thrift::protocol::TProtocol> thrift_file_proto;
203
204
  //! Set while resuming payload-column decode after the filter-column I/O blocked (vs a fresh row-group pass)
205
  bool resuming_payload = false;
206
  SelectionVector sel;
207
208
  ResizeableBuffer define_buf;
209
  ResizeableBuffer repeat_buf;
210
211
  bool prefetch_mode = false;
212
  //! Number of filter head counts, used for prefetching
213
  idx_t filter_head_count = 0;
214
  //! Surviving row count
215
  idx_t filter_count = 0;
216
217
  ParquetPrefetchMetrics prefetch_metrics;
218
219
  //! Per-thread adaptive filter cache
220
  MultiFileAdaptiveFilterCache adaptive_filter_cache;
221
  //! Table filter list
222
  vector<ParquetScanFilter> scan_filters;
223
  //! true once the filter at this index has driven the surviving row count to zero
224
  vector<bool> filter_eliminated_all_rows;
225
226
  //! (optional) pointer to the PhysicalOperator for logging
227
  optional_ptr<const PhysicalOperator> op;
228
229
  //! Per-thread counters for row groups whose data was read / skipped, incremented as row groups are processed.
230
  //! Read in ParquetScanGetMetrics and surfaced as the standard row_groups_scanned / total_row_groups_to_scan
231
  //! profiling metrics (the profiler sums them across threads).
232
  idx_t row_groups_read = 0;
233
  idx_t row_groups_skipped = 0;
234
235
  //! Prefetch cost model
236
  PrefetchCostModelState cost_model_state;
237
};
238
239
struct ParquetColumnDefinition {
240
public:
241
  static ParquetColumnDefinition FromSchemaValue(ClientContext &context, const Value &column_value);
242
243
public:
244
  // DEPRECATED, use 'identifier' instead
245
  int32_t field_id;
246
  string name;
247
  LogicalType type;
248
  Value default_value;
249
  Value identifier;
250
251
public:
252
  void Serialize(Serializer &serializer) const;
253
  static ParquetColumnDefinition Deserialize(Deserializer &deserializer);
254
};
255
256
struct ParquetOptions {
257
0
  explicit ParquetOptions() {
258
0
  }
259
  explicit ParquetOptions(ClientContext &context);
260
261
  bool binary_as_string = false;
262
  bool variant_legacy_encoding = false;
263
  bool file_row_number = false;
264
  shared_ptr<ParquetEncryptionConfig> encryption_config;
265
266
  vector<ParquetColumnDefinition> schema;
267
  idx_t explicit_cardinality = 0;
268
  bool can_have_nan = false; // if floats or doubles can contain NaN values
269
  ParquetPrefetchStrategyOption prefetch_strategy = ParquetPrefetchStrategyOption::AUTO;
270
};
271
272
struct ParquetOptionsSerialization {
273
0
  ParquetOptionsSerialization() = default;
274
  ParquetOptionsSerialization(ParquetOptions parquet_options_p, MultiFileOptions file_options_p)
275
0
      : parquet_options(std::move(parquet_options_p)), file_options(std::move(file_options_p)) {
276
0
  }
277
278
  ParquetOptions parquet_options;
279
  MultiFileOptions file_options;
280
281
public:
282
  void Serialize(Serializer &serializer) const;
283
  static ParquetOptionsSerialization Deserialize(Deserializer &deserializer);
284
};
285
286
struct ParquetUnionData : public BaseUnionData {
287
0
  explicit ParquetUnionData(OpenFileInfo file_p) : BaseUnionData(std::move(file_p)) {
288
0
  }
289
  ~ParquetUnionData() override;
290
291
  optional_idx TryGetCardinalityEstimate() const override;
292
  unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, const Identifier &name) override;
293
294
  ParquetOptions options;
295
  shared_ptr<ParquetFileMetadataCache> metadata;
296
  unique_ptr<ParquetColumnSchema> root_schema;
297
};
298
299
class ParquetReader : public BaseFileReader {
300
public:
301
  //! Virtual column identifier for the "file_row_group_number" column (the file-relative row group index of each row)
302
  static constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_GROUP_NUMBER = UINT64_C(9223372036854775820);
303
304
public:
305
  ParquetReader(ClientContext &context, OpenFileInfo file, ParquetOptions parquet_options,
306
                shared_ptr<ParquetFileMetadataCache> metadata = nullptr);
307
  ~ParquetReader() override;
308
309
  mutable CachingFileSystem fs;
310
  Allocator &allocator;
311
  shared_ptr<ParquetFileMetadataCache> metadata;
312
  ParquetOptions parquet_options;
313
  unique_ptr<ParquetColumnSchema> root_schema;
314
  shared_ptr<EncryptionUtil> encryption_util;
315
  //! How many rows have been read from this file
316
  atomic<idx_t> rows_read;
317
318
public:
319
0
  string GetReaderType() const override {
320
0
    return "Parquet";
321
0
  }
322
323
  shared_ptr<BaseUnionData> GetUnionData(idx_t file_idx) override;
324
  unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, const Identifier &name) override;
325
326
  bool TryInitializeScan(ClientContext &context, GlobalTableFunctionState &gstate,
327
                         LocalTableFunctionState &lstate) override;
328
  void PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate_p,
329
                   LocalTableFunctionState &lstate_p) override;
330
  AsyncResult ScheduleIO(ClientContext &context, GlobalTableFunctionState &gstate,
331
                         LocalTableFunctionState &lstate) override;
332
  AsyncResult Scan(ClientContext &context, GlobalTableFunctionState &global_state,
333
                   LocalTableFunctionState &local_state, DataChunk &chunk) override;
334
  void FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) override;
335
  double GetProgressInFile(ClientContext &context) override;
336
337
public:
338
  void InitializeScan(ClientContext &context, ParquetReaderScanState &state, idx_t group_to_read) const;
339
340
  idx_t NumRows() const;
341
  idx_t NumRowGroups() const;
342
  idx_t GetFileSize() const;
343
  idx_t GetDataSize() const;
344
345
  const duckdb_parquet::FileMetaData *GetFileMetadata() const;
346
  string static GetUniqueFileIdentifier(const duckdb_parquet::EncryptionAlgorithm &encryption_algorithm);
347
348
  uint32_t Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) const;
349
  uint32_t ReadEncrypted(duckdb_apache::thrift::TBase &object, TProtocol &iprot,
350
                         CryptoMetaData &aad_crypto_metadata) const;
351
  uint32_t ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer,
352
                    const uint32_t buffer_size) const;
353
  uint32_t ReadDataEncrypted(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer,
354
                             const uint32_t buffer_size, CryptoMetaData &aad_crypto_metadata) const;
355
356
  unique_ptr<BaseStatistics> ReadStatistics(const Identifier &name);
357
358
0
  CachingFileHandle &GetHandle() {
359
0
    return *file_handle;
360
0
  }
361
362
  static unique_ptr<BaseStatistics> ReadStatistics(ClientContext &context, ParquetOptions parquet_options,
363
                                                   shared_ptr<ParquetFileMetadataCache> metadata,
364
                                                   const Identifier &name);
365
  static unique_ptr<BaseStatistics> ReadStatistics(const ParquetUnionData &union_data, const Identifier &name);
366
367
  LogicalType DeriveLogicalType(const SchemaElement &s_ele, ParquetColumnSchema &schema) const;
368
  static LogicalType DeriveLogicalType(const SchemaElement &s_ele, const ParquetOptions &options,
369
                                       ParquetColumnSchema &schema);
370
371
  void AddVirtualColumn(column_t virtual_column_id) override;
372
373
  void GetPartitionStats(vector<PartitionStatistics> &result);
374
  static void GetPartitionStats(const duckdb_parquet::FileMetaData &metadata, vector<PartitionStatistics> &result,
375
                                optional_ptr<ParquetColumnSchema> root_schema = nullptr,
376
                                optional_ptr<ParquetOptions> parquet_options = nullptr);
377
  static bool MetadataCacheEnabled(ClientContext &context);
378
  static shared_ptr<ParquetFileMetadataCache> GetMetadataCacheEntry(ClientContext &context, const OpenFileInfo &file);
379
380
private:
381
  //! Construct a parquet reader but **do not** open a file, used in ReadStatistics only
382
  ParquetReader(ClientContext &context, ParquetOptions parquet_options,
383
                shared_ptr<ParquetFileMetadataCache> metadata);
384
385
  void InitializeSchema(ClientContext &context);
386
  //! Parse the schema of the file
387
  unique_ptr<ParquetColumnSchema> ParseSchema(ClientContext &context);
388
  ParquetColumnSchema ParseSchemaRecursive(idx_t depth, idx_t max_define, idx_t max_repeat, idx_t &next_schema_idx,
389
                                           idx_t &next_file_idx, ClientContext &context);
390
391
  unique_ptr<ColumnReader> CreateReaderRecursive(ClientContext &context, const ColumnIndex &index,
392
                                                 const ParquetColumnSchema &schema) const;
393
  const duckdb_parquet::RowGroup &GetGroup(ParquetReaderScanState &state);
394
  uint64_t GetGroupCompressedSize(ParquetReaderScanState &state);
395
  idx_t GetGroupOffset(ParquetReaderScanState &state);
396
  //! Group span is the distance between the min page offset and the max page offset plus the max page compressed size
397
  uint64_t GetGroupSpan(ParquetReaderScanState &state);
398
  void PrepareRowGroupBuffer(ClientContext &context, ParquetReaderScanState &state, idx_t out_col_idx);
399
  //! Whole-group prefetch strategy.
400
  ParquetPrefetchStrategy WholeGroupPrefetch(ParquetReaderScanState &state, ThriftFileTransport &trans,
401
                                             const duckdb_parquet::RowGroup &group, uint64_t total_row_group_span,
402
                                             bool log_prefetch);
403
  //! Column-wise prefetch strategy.
404
  ParquetPrefetchStrategy ColumnWisePrefetch(ParquetReaderScanState &state, ThriftFileTransport &trans,
405
                                             const duckdb_parquet::RowGroup &group, bool filters_look_unselective,
406
                                             bool log_prefetch) const;
407
  //! Register the read-heads to fetch, and select prefetch strategy
408
  ParquetPrefetchStrategy RegisterRowGroupReads(ClientContext &context, ParquetReaderScanState &state);
409
  //! Build the async I/O tasks for the registered read-heads
410
  AsyncResult ScheduleRowGroupReads(ParquetReaderScanState &state, ParquetPrefetchStrategy strategy);
411
  //! Process up to STANDARD_VECTOR_SIZE rows of the current row group into result.
412
  AsyncResult Process(ClientContext &context, ParquetReaderScanState &state, DataChunk &result);
413
  //! Log and finalize the row group's prefetch metrics
414
  void FinishRowGroup(ClientContext &context, ParquetReaderScanState &state, bool log_prefetch);
415
  //! Process filters
416
  AsyncResult ProcessFilters(ParquetReaderScanState &state, DataChunk &result, idx_t scan_count, uint8_t *define_ptr,
417
                             uint8_t *repeat_ptr, bool log_prefetch);
418
  //! Run the filters into state.sel; returns the surviving row count. Advances every filter column.
419
  idx_t EvaluateFilters(ParquetReaderScanState &state, DataChunk &result, idx_t scan_count, uint8_t *define_ptr,
420
                        uint8_t *repeat_ptr, bool log_prefetch);
421
  //! Async-fetch the surviving payload columns (stashing the filter columns); empty if no fetch is needed.
422
  vector<unique_ptr<AsyncTask>> ScheduleRemainingColumns(ParquetReaderScanState &state, DataChunk &result,
423
                                                         idx_t scan_count);
424
  //! Read the remaining (non-filter) columns into result.
425
  void DecodeRemainingColumns(ParquetReaderScanState &state, DataChunk &result, idx_t filter_count,
426
                              uint8_t *define_ptr, uint8_t *repeat_ptr);
427
  ParquetColumnSchema ParseColumnSchema(const SchemaElement &s_ele, idx_t max_define, idx_t max_repeat,
428
                                        idx_t schema_index, idx_t column_index,
429
                                        ParquetColumnSchemaType type = ParquetColumnSchemaType::COLUMN);
430
431
  MultiFileColumnDefinition ParseColumnDefinition(const duckdb_parquet::FileMetaData &file_meta_data,
432
                                                  ParquetColumnSchema &element);
433
  unique_ptr<AdditionalAuthenticatedData> GenerateAAD(uint8_t module_type, uint16_t row_group_ordinal,
434
                                                      uint16_t column_ordinal, uint16_t page_ordinal) const;
435
436
private:
437
  unique_ptr<CachingFileHandle> file_handle;
438
};
439
440
} // namespace duckdb