/src/duckdb/extension/parquet/include/parquet_reader.hpp
Line | Count | Source |
1 | | //===----------------------------------------------------------------------===// |
2 | | // DuckDB |
3 | | // |
4 | | // parquet_reader.hpp |
5 | | // |
6 | | // |
7 | | //===----------------------------------------------------------------------===// |
8 | | |
9 | | #pragma once |
10 | | |
11 | | #include <stdint.h> |
12 | | #include <exception> |
13 | | #include <atomic> |
14 | | #include <memory> |
15 | | #include <string> |
16 | | #include <utility> |
17 | | |
18 | | #include "duckdb.hpp" |
19 | | #include "duckdb/common/enum_util.hpp" |
20 | | #include "duckdb/common/helper.hpp" |
21 | | #include "duckdb/storage/external_file_cache/caching_file_system.hpp" |
22 | | #include "duckdb/common/common.hpp" |
23 | | #include "duckdb/common/encryption_functions.hpp" |
24 | | #include "duckdb/common/encryption_state.hpp" |
25 | | #include "duckdb/common/exception.hpp" |
26 | | #include "duckdb/common/multi_file/base_file_reader.hpp" |
27 | | #include "duckdb/common/multi_file/multi_file_adaptive_filter_cache.hpp" |
28 | | #include "duckdb/common/multi_file/multi_file_options.hpp" |
29 | | #include "duckdb/common/string_util.hpp" |
30 | | #include "duckdb/common/types/data_chunk.hpp" |
31 | | #include "column_reader.hpp" |
32 | | #include "parquet_prefetch_cost_model.hpp" |
33 | | #include "parquet_file_metadata_cache.hpp" |
34 | | #include "parquet_rle_bp_decoder.hpp" |
35 | | #include "parquet_types.h" |
36 | | #include "resizable_buffer.hpp" |
37 | | #include "duckdb/execution/adaptive_filter.hpp" |
38 | | #include "duckdb/common/column_index.hpp" |
39 | | #include "duckdb/common/multi_file/multi_file_data.hpp" |
40 | | #include "duckdb/common/open_file_info.hpp" |
41 | | #include "duckdb/common/optional_idx.hpp" |
42 | | #include "duckdb/common/optional_ptr.hpp" |
43 | | #include "duckdb/common/projection_index.hpp" |
44 | | #include "duckdb/common/shared_ptr_ipp.hpp" |
45 | | #include "duckdb/common/string.hpp" |
46 | | #include "duckdb/common/typedefs.hpp" |
47 | | #include "duckdb/common/types.hpp" |
48 | | #include "duckdb/common/types/selection_vector.hpp" |
49 | | #include "duckdb/common/types/value.hpp" |
50 | | #include "duckdb/common/unique_ptr.hpp" |
51 | | #include "duckdb/common/vector.hpp" |
52 | | #include "duckdb/parallel/async_result.hpp" |
53 | | #include "parquet_column_schema.hpp" |
54 | | #include "thrift/protocol/TProtocol.h" |
55 | | |
56 | | namespace duckdb_apache { |
57 | | namespace thrift { |
58 | | class TBase; |
59 | | } // namespace thrift |
60 | | } // namespace duckdb_apache |
61 | | |
62 | | namespace duckdb_parquet { |
63 | | class EncryptionAlgorithm; |
64 | | class FileMetaData; |
65 | | class RowGroup; |
66 | | class SchemaElement; |
67 | | |
68 | | namespace format { |
69 | | class FileMetaData; |
70 | | } |
71 | | } // namespace duckdb_parquet |
72 | | |
73 | | namespace duckdb { |
74 | | class Allocator; |
75 | | class ClientContext; |
76 | | class BaseStatistics; |
77 | | class TableFilterSet; |
78 | | class ParquetEncryptionConfig; |
79 | | class ParquetReader; |
80 | | class DataChunk; |
81 | | class Deserializer; |
82 | | class EncryptionUtil; |
83 | | class PhysicalOperator; |
84 | | class Serializer; |
85 | | class TableFilter; |
86 | | class ThriftFileTransport; |
87 | | struct CryptoMetaData; |
88 | | struct GlobalTableFunctionState; |
89 | | struct LocalTableFunctionState; |
90 | | struct PartitionStatistics; |
91 | | struct TableFilterState; |
92 | | |
93 | | struct ParquetReaderPrefetchConfig { |
94 | | //! Percentage of data in a row group span that should be scanned for enabling whole group prefetch |
95 | | static constexpr double WHOLE_GROUP_PREFETCH_MINIMUM_SCAN = 0.95; |
96 | | //! How many row groups need to produce at least one surviving row (from filtering) |
97 | | static constexpr double PREFETCH_FILTER_MINIMUM_MATCH_RATIO = 0.9; |
98 | | }; |
99 | | |
100 | | enum class ParquetPrefetchStrategy : uint8_t { |
101 | | NONE, |
102 | | WHOLE_GROUP, //! Prefetches the whole group |
103 | | PREFETCH_FILTERS, //! Used when we have fully selective filters, so they are prefetched earlier |
104 | | COLUMN_WISE_EAGER //! Used when we have projections and optional only filters |
105 | | }; |
106 | | |
107 | | const char *ParquetPrefetchStrategyToString(ParquetPrefetchStrategy strategy); |
108 | | |
109 | | enum class ParquetPrefetchStrategyOption : uint8_t { |
110 | | AUTO, //! Uses the runtime strategy to pick between ParquetPrefetchStrategy |
111 | | WHOLE_GROUP, //! Always do the whole row group |
112 | | }; |
113 | | |
114 | | ParquetPrefetchStrategyOption ParquetPrefetchStrategyOptionFromString(const string &value); |
115 | | |
116 | | template <> |
117 | | const char *EnumUtil::ToChars<ParquetPrefetchStrategyOption>(ParquetPrefetchStrategyOption value); |
118 | | |
119 | | template <> |
120 | | ParquetPrefetchStrategyOption EnumUtil::FromString<ParquetPrefetchStrategyOption>(const char *value); |
121 | | |
122 | | struct ParquetScanFilter { |
123 | | ParquetScanFilter(ClientContext &context, ProjectionIndex filter_idx, TableFilter &filter); |
124 | | ~ParquetScanFilter(); |
125 | 0 | ParquetScanFilter(ParquetScanFilter &&) = default; |
126 | | |
127 | | ProjectionIndex filter_idx; |
128 | | TableFilter &filter; |
129 | | unique_ptr<TableFilterState> filter_state; |
130 | | }; |
131 | | |
132 | | struct ParquetPrefetchColumn { |
133 | | string name; |
134 | | idx_t offset; |
135 | | |
136 | 0 | ParquetPrefetchColumn(string name_p, idx_t offset_p) : name(std::move(name_p)), offset(offset_p) { |
137 | 0 | } |
138 | | |
139 | 0 | bool operator<(const ParquetPrefetchColumn &other) const { |
140 | 0 | return offset < other.offset; |
141 | 0 | } |
142 | | }; |
143 | | //! Logger-only prefetch metrics: populated only when ParquetPrefetch logging is enabled. |
144 | | struct ParquetLoggerPrefetchMetrics { |
145 | | //! Physical prefetch groups (column-name batches) issued for the current row group, in order |
146 | | vector<vector<string>> prefetch_groups; |
147 | | //! Tracks which scan_filters were evaluated in the current row group (indexed by scan_filter position) |
148 | | vector<bool> filters_used; |
149 | | //! Prefetch strategy chosen for the current row group |
150 | | ParquetPrefetchStrategy strategy = ParquetPrefetchStrategy::NONE; |
151 | | //! Accepted column gap (bytes) |
152 | | uint64_t accepted_column_gap = 0; |
153 | | |
154 | 0 | void Reset() { |
155 | 0 | prefetch_groups.clear(); |
156 | 0 | std::fill(filters_used.begin(), filters_used.end(), false); |
157 | 0 | strategy = ParquetPrefetchStrategy::NONE; |
158 | 0 | accepted_column_gap = 0; |
159 | 0 | } |
160 | | |
161 | | //! Build a prefetch group |
162 | | void GeneratePrefetchGroup(ThriftFileTransport &trans, vector<ParquetPrefetchColumn> &requested_columns, |
163 | | const vector<duckdb_parquet::ColumnChunk> &all_chunks); |
164 | | }; |
165 | | |
166 | | struct ParquetPrefetchMetrics { |
167 | | //! Whether any filter was evaluated against the current row group |
168 | | bool filter_ran = false; |
169 | | //! Whether the current row group produced at least one surviving row after filtering |
170 | | bool had_match = false; |
171 | | //! Total number of row groups for which filters were evaluated across the scan |
172 | | idx_t row_groups_executed = 0; |
173 | | //! Number of those row groups that produced at least one surviving row |
174 | | idx_t row_groups_with_matches = 0; |
175 | | |
176 | | ParquetLoggerPrefetchMetrics logger; |
177 | | |
178 | 0 | void FinalizeRowGroupSelectivity() { |
179 | 0 | if (filter_ran) { |
180 | 0 | row_groups_executed++; |
181 | 0 | if (had_match) { |
182 | 0 | row_groups_with_matches++; |
183 | 0 | } |
184 | 0 | } |
185 | 0 | filter_ran = false; |
186 | 0 | had_match = false; |
187 | 0 | logger.Reset(); |
188 | 0 | } |
189 | | }; |
190 | | |
191 | | struct ParquetReaderScanState { |
192 | | public: |
193 | | ColumnReader &GetColumnReader(idx_t i); |
194 | | |
195 | | public: |
196 | | //! The row group index this scan state decodes |
197 | | idx_t group_index; |
198 | | idx_t offset_in_group; |
199 | | idx_t group_offset; |
200 | | shared_ptr<CachingFileHandle> file_handle; |
201 | | vector<unique_ptr<ColumnReader>> column_readers; |
202 | | duckdb_base_std::unique_ptr<duckdb_apache::thrift::protocol::TProtocol> thrift_file_proto; |
203 | | |
204 | | //! Set while resuming payload-column decode after the filter-column I/O blocked (vs a fresh row-group pass) |
205 | | bool resuming_payload = false; |
206 | | SelectionVector sel; |
207 | | |
208 | | ResizeableBuffer define_buf; |
209 | | ResizeableBuffer repeat_buf; |
210 | | |
211 | | bool prefetch_mode = false; |
212 | | //! Number of filter head counts, used for prefetching |
213 | | idx_t filter_head_count = 0; |
214 | | //! Surviving row count |
215 | | idx_t filter_count = 0; |
216 | | |
217 | | ParquetPrefetchMetrics prefetch_metrics; |
218 | | |
219 | | //! Per-thread adaptive filter cache |
220 | | MultiFileAdaptiveFilterCache adaptive_filter_cache; |
221 | | //! Table filter list |
222 | | vector<ParquetScanFilter> scan_filters; |
223 | | //! true once the filter at this index has driven the surviving row count to zero |
224 | | vector<bool> filter_eliminated_all_rows; |
225 | | |
226 | | //! (optional) pointer to the PhysicalOperator for logging |
227 | | optional_ptr<const PhysicalOperator> op; |
228 | | |
229 | | //! Per-thread counters for row groups whose data was read / skipped, incremented as row groups are processed. |
230 | | //! Read in ParquetScanGetMetrics and surfaced as the standard row_groups_scanned / total_row_groups_to_scan |
231 | | //! profiling metrics (the profiler sums them across threads). |
232 | | idx_t row_groups_read = 0; |
233 | | idx_t row_groups_skipped = 0; |
234 | | |
235 | | //! Prefetch cost model |
236 | | PrefetchCostModelState cost_model_state; |
237 | | }; |
238 | | |
239 | | struct ParquetColumnDefinition { |
240 | | public: |
241 | | static ParquetColumnDefinition FromSchemaValue(ClientContext &context, const Value &column_value); |
242 | | |
243 | | public: |
244 | | // DEPRECATED, use 'identifier' instead |
245 | | int32_t field_id; |
246 | | string name; |
247 | | LogicalType type; |
248 | | Value default_value; |
249 | | Value identifier; |
250 | | |
251 | | public: |
252 | | void Serialize(Serializer &serializer) const; |
253 | | static ParquetColumnDefinition Deserialize(Deserializer &deserializer); |
254 | | }; |
255 | | |
256 | | struct ParquetOptions { |
257 | 0 | explicit ParquetOptions() { |
258 | 0 | } |
259 | | explicit ParquetOptions(ClientContext &context); |
260 | | |
261 | | bool binary_as_string = false; |
262 | | bool variant_legacy_encoding = false; |
263 | | bool file_row_number = false; |
264 | | shared_ptr<ParquetEncryptionConfig> encryption_config; |
265 | | |
266 | | vector<ParquetColumnDefinition> schema; |
267 | | idx_t explicit_cardinality = 0; |
268 | | bool can_have_nan = false; // if floats or doubles can contain NaN values |
269 | | ParquetPrefetchStrategyOption prefetch_strategy = ParquetPrefetchStrategyOption::AUTO; |
270 | | }; |
271 | | |
272 | | struct ParquetOptionsSerialization { |
273 | 0 | ParquetOptionsSerialization() = default; |
274 | | ParquetOptionsSerialization(ParquetOptions parquet_options_p, MultiFileOptions file_options_p) |
275 | 0 | : parquet_options(std::move(parquet_options_p)), file_options(std::move(file_options_p)) { |
276 | 0 | } |
277 | | |
278 | | ParquetOptions parquet_options; |
279 | | MultiFileOptions file_options; |
280 | | |
281 | | public: |
282 | | void Serialize(Serializer &serializer) const; |
283 | | static ParquetOptionsSerialization Deserialize(Deserializer &deserializer); |
284 | | }; |
285 | | |
286 | | struct ParquetUnionData : public BaseUnionData { |
287 | 0 | explicit ParquetUnionData(OpenFileInfo file_p) : BaseUnionData(std::move(file_p)) { |
288 | 0 | } |
289 | | ~ParquetUnionData() override; |
290 | | |
291 | | optional_idx TryGetCardinalityEstimate() const override; |
292 | | unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, const Identifier &name) override; |
293 | | |
294 | | ParquetOptions options; |
295 | | shared_ptr<ParquetFileMetadataCache> metadata; |
296 | | unique_ptr<ParquetColumnSchema> root_schema; |
297 | | }; |
298 | | |
299 | | class ParquetReader : public BaseFileReader { |
300 | | public: |
301 | | //! Virtual column identifier for the "file_row_group_number" column (the file-relative row group index of each row) |
302 | | static constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_GROUP_NUMBER = UINT64_C(9223372036854775820); |
303 | | |
304 | | public: |
305 | | ParquetReader(ClientContext &context, OpenFileInfo file, ParquetOptions parquet_options, |
306 | | shared_ptr<ParquetFileMetadataCache> metadata = nullptr); |
307 | | ~ParquetReader() override; |
308 | | |
309 | | mutable CachingFileSystem fs; |
310 | | Allocator &allocator; |
311 | | shared_ptr<ParquetFileMetadataCache> metadata; |
312 | | ParquetOptions parquet_options; |
313 | | unique_ptr<ParquetColumnSchema> root_schema; |
314 | | shared_ptr<EncryptionUtil> encryption_util; |
315 | | //! How many rows have been read from this file |
316 | | atomic<idx_t> rows_read; |
317 | | |
318 | | public: |
319 | 0 | string GetReaderType() const override { |
320 | 0 | return "Parquet"; |
321 | 0 | } |
322 | | |
323 | | shared_ptr<BaseUnionData> GetUnionData(idx_t file_idx) override; |
324 | | unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, const Identifier &name) override; |
325 | | |
326 | | bool TryInitializeScan(ClientContext &context, GlobalTableFunctionState &gstate, |
327 | | LocalTableFunctionState &lstate) override; |
328 | | void PrepareScan(ClientContext &context, GlobalTableFunctionState &gstate_p, |
329 | | LocalTableFunctionState &lstate_p) override; |
330 | | AsyncResult ScheduleIO(ClientContext &context, GlobalTableFunctionState &gstate, |
331 | | LocalTableFunctionState &lstate) override; |
332 | | AsyncResult Scan(ClientContext &context, GlobalTableFunctionState &global_state, |
333 | | LocalTableFunctionState &local_state, DataChunk &chunk) override; |
334 | | void FinishFile(ClientContext &context, GlobalTableFunctionState &gstate_p) override; |
335 | | double GetProgressInFile(ClientContext &context) override; |
336 | | |
337 | | public: |
338 | | void InitializeScan(ClientContext &context, ParquetReaderScanState &state, idx_t group_to_read) const; |
339 | | |
340 | | idx_t NumRows() const; |
341 | | idx_t NumRowGroups() const; |
342 | | idx_t GetFileSize() const; |
343 | | idx_t GetDataSize() const; |
344 | | |
345 | | const duckdb_parquet::FileMetaData *GetFileMetadata() const; |
346 | | string static GetUniqueFileIdentifier(const duckdb_parquet::EncryptionAlgorithm &encryption_algorithm); |
347 | | |
348 | | uint32_t Read(duckdb_apache::thrift::TBase &object, TProtocol &iprot) const; |
349 | | uint32_t ReadEncrypted(duckdb_apache::thrift::TBase &object, TProtocol &iprot, |
350 | | CryptoMetaData &aad_crypto_metadata) const; |
351 | | uint32_t ReadData(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer, |
352 | | const uint32_t buffer_size) const; |
353 | | uint32_t ReadDataEncrypted(duckdb_apache::thrift::protocol::TProtocol &iprot, const data_ptr_t buffer, |
354 | | const uint32_t buffer_size, CryptoMetaData &aad_crypto_metadata) const; |
355 | | |
356 | | unique_ptr<BaseStatistics> ReadStatistics(const Identifier &name); |
357 | | |
358 | 0 | CachingFileHandle &GetHandle() { |
359 | 0 | return *file_handle; |
360 | 0 | } |
361 | | |
362 | | static unique_ptr<BaseStatistics> ReadStatistics(ClientContext &context, ParquetOptions parquet_options, |
363 | | shared_ptr<ParquetFileMetadataCache> metadata, |
364 | | const Identifier &name); |
365 | | static unique_ptr<BaseStatistics> ReadStatistics(const ParquetUnionData &union_data, const Identifier &name); |
366 | | |
367 | | LogicalType DeriveLogicalType(const SchemaElement &s_ele, ParquetColumnSchema &schema) const; |
368 | | static LogicalType DeriveLogicalType(const SchemaElement &s_ele, const ParquetOptions &options, |
369 | | ParquetColumnSchema &schema); |
370 | | |
371 | | void AddVirtualColumn(column_t virtual_column_id) override; |
372 | | |
373 | | void GetPartitionStats(vector<PartitionStatistics> &result); |
374 | | static void GetPartitionStats(const duckdb_parquet::FileMetaData &metadata, vector<PartitionStatistics> &result, |
375 | | optional_ptr<ParquetColumnSchema> root_schema = nullptr, |
376 | | optional_ptr<ParquetOptions> parquet_options = nullptr); |
377 | | static bool MetadataCacheEnabled(ClientContext &context); |
378 | | static shared_ptr<ParquetFileMetadataCache> GetMetadataCacheEntry(ClientContext &context, const OpenFileInfo &file); |
379 | | |
380 | | private: |
381 | | //! Construct a parquet reader but **do not** open a file, used in ReadStatistics only |
382 | | ParquetReader(ClientContext &context, ParquetOptions parquet_options, |
383 | | shared_ptr<ParquetFileMetadataCache> metadata); |
384 | | |
385 | | void InitializeSchema(ClientContext &context); |
386 | | //! Parse the schema of the file |
387 | | unique_ptr<ParquetColumnSchema> ParseSchema(ClientContext &context); |
388 | | ParquetColumnSchema ParseSchemaRecursive(idx_t depth, idx_t max_define, idx_t max_repeat, idx_t &next_schema_idx, |
389 | | idx_t &next_file_idx, ClientContext &context); |
390 | | |
391 | | unique_ptr<ColumnReader> CreateReaderRecursive(ClientContext &context, const ColumnIndex &index, |
392 | | const ParquetColumnSchema &schema) const; |
393 | | const duckdb_parquet::RowGroup &GetGroup(ParquetReaderScanState &state); |
394 | | uint64_t GetGroupCompressedSize(ParquetReaderScanState &state); |
395 | | idx_t GetGroupOffset(ParquetReaderScanState &state); |
396 | | //! Group span is the distance between the min page offset and the max page offset plus the max page compressed size |
397 | | uint64_t GetGroupSpan(ParquetReaderScanState &state); |
398 | | void PrepareRowGroupBuffer(ClientContext &context, ParquetReaderScanState &state, idx_t out_col_idx); |
399 | | //! Whole-group prefetch strategy. |
400 | | ParquetPrefetchStrategy WholeGroupPrefetch(ParquetReaderScanState &state, ThriftFileTransport &trans, |
401 | | const duckdb_parquet::RowGroup &group, uint64_t total_row_group_span, |
402 | | bool log_prefetch); |
403 | | //! Column-wise prefetch strategy. |
404 | | ParquetPrefetchStrategy ColumnWisePrefetch(ParquetReaderScanState &state, ThriftFileTransport &trans, |
405 | | const duckdb_parquet::RowGroup &group, bool filters_look_unselective, |
406 | | bool log_prefetch) const; |
407 | | //! Register the read-heads to fetch, and select prefetch strategy |
408 | | ParquetPrefetchStrategy RegisterRowGroupReads(ClientContext &context, ParquetReaderScanState &state); |
409 | | //! Build the async I/O tasks for the registered read-heads |
410 | | AsyncResult ScheduleRowGroupReads(ParquetReaderScanState &state, ParquetPrefetchStrategy strategy); |
411 | | //! Process up to STANDARD_VECTOR_SIZE rows of the current row group into result. |
412 | | AsyncResult Process(ClientContext &context, ParquetReaderScanState &state, DataChunk &result); |
413 | | //! Log and finalize the row group's prefetch metrics |
414 | | void FinishRowGroup(ClientContext &context, ParquetReaderScanState &state, bool log_prefetch); |
415 | | //! Process filters |
416 | | AsyncResult ProcessFilters(ParquetReaderScanState &state, DataChunk &result, idx_t scan_count, uint8_t *define_ptr, |
417 | | uint8_t *repeat_ptr, bool log_prefetch); |
418 | | //! Run the filters into state.sel; returns the surviving row count. Advances every filter column. |
419 | | idx_t EvaluateFilters(ParquetReaderScanState &state, DataChunk &result, idx_t scan_count, uint8_t *define_ptr, |
420 | | uint8_t *repeat_ptr, bool log_prefetch); |
421 | | //! Async-fetch the surviving payload columns (stashing the filter columns); empty if no fetch is needed. |
422 | | vector<unique_ptr<AsyncTask>> ScheduleRemainingColumns(ParquetReaderScanState &state, DataChunk &result, |
423 | | idx_t scan_count); |
424 | | //! Read the remaining (non-filter) columns into result. |
425 | | void DecodeRemainingColumns(ParquetReaderScanState &state, DataChunk &result, idx_t filter_count, |
426 | | uint8_t *define_ptr, uint8_t *repeat_ptr); |
427 | | ParquetColumnSchema ParseColumnSchema(const SchemaElement &s_ele, idx_t max_define, idx_t max_repeat, |
428 | | idx_t schema_index, idx_t column_index, |
429 | | ParquetColumnSchemaType type = ParquetColumnSchemaType::COLUMN); |
430 | | |
431 | | MultiFileColumnDefinition ParseColumnDefinition(const duckdb_parquet::FileMetaData &file_meta_data, |
432 | | ParquetColumnSchema &element); |
433 | | unique_ptr<AdditionalAuthenticatedData> GenerateAAD(uint8_t module_type, uint16_t row_group_ordinal, |
434 | | uint16_t column_ordinal, uint16_t page_ordinal) const; |
435 | | |
436 | | private: |
437 | | unique_ptr<CachingFileHandle> file_handle; |
438 | | }; |
439 | | |
440 | | } // namespace duckdb |