Coverage Report

Created: 2025-08-28 07:58

/src/duckdb/extension/parquet/include/parquet_reader.hpp
Line
Count
Source (jump to first uncovered line)
1
//===----------------------------------------------------------------------===//
2
//                         DuckDB
3
//
4
// parquet_reader.hpp
5
//
6
//
7
//===----------------------------------------------------------------------===//
8
9
#pragma once
10
11
#include "duckdb.hpp"
12
#include "duckdb/storage/caching_file_system.hpp"
13
#include "duckdb/common/common.hpp"
14
#include "duckdb/common/encryption_state.hpp"
15
#include "duckdb/common/exception.hpp"
16
#include "duckdb/common/multi_file/base_file_reader.hpp"
17
#include "duckdb/common/multi_file/multi_file_options.hpp"
18
#include "duckdb/common/string_util.hpp"
19
#include "duckdb/common/types/data_chunk.hpp"
20
#include "column_reader.hpp"
21
#include "parquet_file_metadata_cache.hpp"
22
#include "parquet_rle_bp_decoder.hpp"
23
#include "parquet_types.h"
24
#include "resizable_buffer.hpp"
25
#include "duckdb/execution/adaptive_filter.hpp"
26
27
#include <exception>
28
29
namespace duckdb_parquet {
30
namespace format {
31
class FileMetaData;
32
}
33
} // namespace duckdb_parquet
34
35
namespace duckdb {
36
class Allocator;
37
class ClientContext;
38
class BaseStatistics;
39
class TableFilterSet;
40
class ParquetEncryptionConfig;
41
class ParquetReader;
42
43
struct ParquetReaderPrefetchConfig {
44
  // Percentage of data in a row group span that should be scanned for enabling whole group prefetch
45
  static constexpr double WHOLE_GROUP_PREFETCH_MINIMUM_SCAN = 0.95;
46
};
47
48
struct ParquetScanFilter {
49
  ParquetScanFilter(ClientContext &context, idx_t filter_idx, TableFilter &filter);
50
  ~ParquetScanFilter();
51
0
  ParquetScanFilter(ParquetScanFilter &&) = default;
52
53
  idx_t filter_idx;
54
  TableFilter &filter;
55
  unique_ptr<TableFilterState> filter_state;
56
};
57
58
struct ParquetReaderScanState {
59
  vector<idx_t> group_idx_list;
60
  int64_t current_group;
61
  idx_t offset_in_group;
62
  idx_t group_offset;
63
  unique_ptr<CachingFileHandle> file_handle;
64
  unique_ptr<ColumnReader> root_reader;
65
  duckdb_base_std::unique_ptr<duckdb_apache::thrift::protocol::TProtocol> thrift_file_proto;
66
67
  bool finished;
68
  SelectionVector sel;
69
70
  ResizeableBuffer define_buf;
71
  ResizeableBuffer repeat_buf;
72
73
  bool prefetch_mode = false;
74
  bool current_group_prefetched = false;
75
76
  //! Adaptive filter
77
  unique_ptr<AdaptiveFilter> adaptive_filter;
78
  //! Table filter list
79
  vector<ParquetScanFilter> scan_filters;
80
81
  //! (optional) pointer to the PhysicalOperator for logging
82
  optional_ptr<const PhysicalOperator> op;
83
};
84
85
struct ParquetColumnDefinition {
86
public:
87
  static ParquetColumnDefinition FromSchemaValue(ClientContext &context, const Value &column_value);
88
89
public:
90
  // DEPRECATED, use 'identifier' instead
91
  int32_t field_id;
92
  string name;
93
  LogicalType type;
94
  Value default_value;
95
  Value identifier;
96
97
public:
98
  void Serialize(Serializer &serializer) const;
99
  static ParquetColumnDefinition Deserialize(Deserializer &deserializer);
100
};
101
102
struct ParquetOptions {
103
0
  explicit ParquetOptions() {
104
0
  }
105
  explicit ParquetOptions(ClientContext &context);
106
107
  bool binary_as_string = false;
108
  bool variant_legacy_encoding = false;
109
  bool file_row_number = false;
110
  shared_ptr<ParquetEncryptionConfig> encryption_config;
111
  bool debug_use_openssl = true;
112
113
  vector<ParquetColumnDefinition> schema;
114
  idx_t explicit_cardinality = 0;
115
  bool can_have_nan = false; // if floats or doubles can contain NaN values
116
};
117
118
struct ParquetOptionsSerialization {
119
0
  ParquetOptionsSerialization() = default;
120
  ParquetOptionsSerialization(ParquetOptions parquet_options_p, MultiFileOptions file_options_p)
121
0
      : parquet_options(std::move(parquet_options_p)), file_options(std::move(file_options_p)) {
122
0
  }
123
124
  ParquetOptions parquet_options;
125
  MultiFileOptions file_options;
126
127
public:
128
  void Serialize(Serializer &serializer) const;
129
  static ParquetOptionsSerialization Deserialize(Deserializer &deserializer);
130
};
131
132
struct ParquetUnionData : public BaseUnionData {
133
0
  explicit ParquetUnionData(OpenFileInfo file_p) : BaseUnionData(std::move(file_p)) {
134
0
  }
135
  ~ParquetUnionData() override;
136
137
  unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, const string &name) override;
138
139
  ParquetOptions options;
140
  shared_ptr<ParquetFileMetadataCache> metadata;
141
  unique_ptr<ParquetColumnSchema> root_schema;
142
};
143
144
class ParquetReader : public BaseFileReader {
145
public:
146
  ParquetReader(ClientContext &context, OpenFileInfo file, ParquetOptions parquet_options,
147
                shared_ptr<ParquetFileMetadataCache> metadata = nullptr);
148
  ~ParquetReader() override;
149
150
  CachingFileSystem fs;
151
  Allocator &allocator;
152
  shared_ptr<ParquetFileMetadataCache> metadata;
153
  ParquetOptions parquet_options;
154
  unique_ptr<ParquetColumnSchema> root_schema;
155
  shared_ptr<EncryptionUtil> encryption_util;
156
  //! How many rows have been read from this file
157
  atomic<idx_t> rows_read;
158
159
public:
160
0
  string GetReaderType() const override {
161
0
    return "Parquet";
162
0
  }
163
164
  shared_ptr<BaseUnionData> GetUnionData(idx_t file_idx) override;
165
  unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, const string &name) override;
166
167
  bool TryInitializeScan(ClientContext &context, GlobalTableFunctionState &gstate,
168
                         LocalTableFunctionState &lstate) override;
169
  void Scan(ClientContext &context, GlobalTableFunctionState &global_state, LocalTableFunctionState &local_state,
170
            DataChunk &chunk) override;
171
  void FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) override;
172
  double GetProgressInFile(ClientContext &context) override;
173
174
public:
175
  void InitializeScan(ClientContext &context, ParquetReaderScanState &state, vector<idx_t> groups_to_read);
176
  void Scan(ClientContext &context, ParquetReaderScanState &state, DataChunk &output);
177
178
  idx_t NumRows() const;
179
  idx_t NumRowGroups() const;
180
181
  const duckdb_parquet::FileMetaData *GetFileMetadata() const;
182
183
  uint32_t Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot);
184
  uint32_t ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer,
185
                    const uint32_t buffer_size);
186
187
  unique_ptr<BaseStatistics> ReadStatistics(const string &name);
188
189
0
  CachingFileHandle &GetHandle() {
190
0
    return *file_handle;
191
0
  }
192
193
  static unique_ptr<BaseStatistics> ReadStatistics(ClientContext &context, ParquetOptions parquet_options,
194
                                                   shared_ptr<ParquetFileMetadataCache> metadata, const string &name);
195
  static unique_ptr<BaseStatistics> ReadStatistics(const ParquetUnionData &union_data, const string &name);
196
197
  LogicalType DeriveLogicalType(const SchemaElement &s_ele, ParquetColumnSchema &schema) const;
198
199
  void AddVirtualColumn(column_t virtual_column_id) override;
200
201
  void GetPartitionStats(vector<PartitionStatistics> &result);
202
  static void GetPartitionStats(const duckdb_parquet::FileMetaData &metadata, vector<PartitionStatistics> &result);
203
  static bool MetadataCacheEnabled(ClientContext &context);
204
  static shared_ptr<ParquetFileMetadataCache> GetMetadataCacheEntry(ClientContext &context, const OpenFileInfo &file);
205
206
private:
207
  //! Construct a parquet reader but **do not** open a file, used in ReadStatistics only
208
  ParquetReader(ClientContext &context, ParquetOptions parquet_options,
209
                shared_ptr<ParquetFileMetadataCache> metadata);
210
211
  void InitializeSchema(ClientContext &context);
212
  bool ScanInternal(ClientContext &context, ParquetReaderScanState &state, DataChunk &output);
213
  //! Parse the schema of the file
214
  unique_ptr<ParquetColumnSchema> ParseSchema();
215
  ParquetColumnSchema ParseSchemaRecursive(idx_t depth, idx_t max_define, idx_t max_repeat, idx_t &next_schema_idx,
216
                                           idx_t &next_file_idx);
217
218
  unique_ptr<ColumnReader> CreateReader(ClientContext &context);
219
220
  unique_ptr<ColumnReader> CreateReaderRecursive(ClientContext &context, const vector<ColumnIndex> &indexes,
221
                                                 const ParquetColumnSchema &schema);
222
  const duckdb_parquet::RowGroup &GetGroup(ParquetReaderScanState &state);
223
  uint64_t GetGroupCompressedSize(ParquetReaderScanState &state);
224
  idx_t GetGroupOffset(ParquetReaderScanState &state);
225
  // Group span is the distance between the min page offset and the max page offset plus the max page compressed size
226
  uint64_t GetGroupSpan(ParquetReaderScanState &state);
227
  void PrepareRowGroupBuffer(ParquetReaderScanState &state, idx_t out_col_idx);
228
  ParquetColumnSchema ParseColumnSchema(const SchemaElement &s_ele, idx_t max_define, idx_t max_repeat,
229
                                        idx_t schema_index, idx_t column_index,
230
                                        ParquetColumnSchemaType type = ParquetColumnSchemaType::COLUMN);
231
232
  MultiFileColumnDefinition ParseColumnDefinition(const duckdb_parquet::FileMetaData &file_meta_data,
233
                                                  ParquetColumnSchema &element);
234
235
private:
236
  unique_ptr<CachingFileHandle> file_handle;
237
};
238
239
} // namespace duckdb