Coverage Report

Created: 2026-06-30 06:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/extension/parquet/column_reader.cpp
Line
Count
Source
1
#include "column_reader.hpp"
2
3
#include "duckdb/common/vector/flat_vector.hpp"
4
#include "duckdb/common/vector/constant_vector.hpp"
5
6
#include <algorithm>
7
#include <memory>
8
#include <sstream>
9
#include <stdexcept>
10
#include <utility>
11
12
#include "parquet_statistics.hpp"
13
14
#include "reader/boolean_column_reader.hpp"
15
#include "brotli/decode.h"
16
#include "reader/callback_column_reader.hpp"
17
#include "reader/interval_column_reader.hpp"
18
#include "lz4.hpp"
19
#include "miniz_wrapper.hpp"
20
#include "reader/null_column_reader.hpp"
21
#include "parquet_reader.hpp"
22
#include "parquet_timestamp.hpp"
23
#include "parquet_float16.hpp"
24
#include "snappy.h"
25
#include "reader/string_column_reader.hpp"
26
#include "reader/templated_column_reader.hpp"
27
#include "reader/uuid_column_reader.hpp"
28
#include "zstd.h"
29
#include "duckdb/common/helper.hpp"
30
#include "duckdb/common/numeric_utils.hpp"
31
#include "duckdb/storage/table/column_segment.hpp"
32
#include "parquet_crypto.hpp"
33
#include "decode_utils.hpp"
34
#include "duckdb/common/enums/vector_type.hpp"
35
#include "duckdb/common/limits.hpp"
36
#include "duckdb/common/string.hpp"
37
#include "duckdb/common/types/date.hpp"
38
#include "duckdb/common/types/datetime.hpp"
39
#include "duckdb/common/types/timestamp.hpp"
40
#include "duckdb/common/types/vector.hpp"
41
#include "duckdb/common/vector/unified_vector_format.hpp"
42
#include "duckdb/common/vector_size.hpp"
43
#include "parquet_decimal_utils.hpp"
44
#include "parquet_file_metadata_cache.hpp"
45
#include "parquet_rle_bp_decoder.hpp"
46
#include "thrift/protocol/TProtocol.h"
47
#include "thrift_tools.hpp"
48
49
namespace duckdb_apache {
50
namespace thrift {
51
class TBase;
52
} // namespace thrift
53
} // namespace duckdb_apache
54
55
namespace duckdb {
56
class Allocator;
57
struct hugeint_t;
58
59
using duckdb_parquet::CompressionCodec;
60
using duckdb_parquet::ConvertedType;
61
using duckdb_parquet::Encoding;
62
using duckdb_parquet::PageType;
63
using duckdb_parquet::Type;
64
65
const uint64_t ParquetDecodeUtils::BITPACK_MASKS[] = {0,
66
                                                      1,
67
                                                      3,
68
                                                      7,
69
                                                      15,
70
                                                      31,
71
                                                      63,
72
                                                      127,
73
                                                      255,
74
                                                      511,
75
                                                      1023,
76
                                                      2047,
77
                                                      4095,
78
                                                      8191,
79
                                                      16383,
80
                                                      32767,
81
                                                      65535,
82
                                                      131071,
83
                                                      262143,
84
                                                      524287,
85
                                                      1048575,
86
                                                      2097151,
87
                                                      4194303,
88
                                                      8388607,
89
                                                      16777215,
90
                                                      33554431,
91
                                                      67108863,
92
                                                      134217727,
93
                                                      268435455,
94
                                                      536870911,
95
                                                      1073741823,
96
                                                      2147483647,
97
                                                      4294967295,
98
                                                      8589934591,
99
                                                      17179869183,
100
                                                      34359738367,
101
                                                      68719476735,
102
                                                      137438953471,
103
                                                      274877906943,
104
                                                      549755813887,
105
                                                      1099511627775,
106
                                                      2199023255551,
107
                                                      4398046511103,
108
                                                      8796093022207,
109
                                                      17592186044415,
110
                                                      35184372088831,
111
                                                      70368744177663,
112
                                                      140737488355327,
113
                                                      281474976710655,
114
                                                      562949953421311,
115
                                                      1125899906842623,
116
                                                      2251799813685247,
117
                                                      4503599627370495,
118
                                                      9007199254740991,
119
                                                      18014398509481983,
120
                                                      36028797018963967,
121
                                                      72057594037927935,
122
                                                      144115188075855871,
123
                                                      288230376151711743,
124
                                                      576460752303423487,
125
                                                      1152921504606846975,
126
                                                      2305843009213693951,
127
                                                      4611686018427387903,
128
                                                      9223372036854775807,
129
                                                      18446744073709551615ULL};
130
131
const uint64_t ParquetDecodeUtils::BITPACK_MASKS_SIZE = sizeof(ParquetDecodeUtils::BITPACK_MASKS) / sizeof(uint64_t);
132
133
const uint8_t ParquetDecodeUtils::BITPACK_DLEN = 8;
134
135
ColumnReader::ColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema_p)
136
0
    : column_schema(schema_p), reader(reader), page_rows_available(0), dictionary_decoder(*this),
137
0
      delta_binary_packed_decoder(*this), rle_decoder(*this), delta_length_byte_array_decoder(*this),
138
0
      delta_byte_array_decoder(*this), byte_stream_split_decoder(*this), aad_crypto_metadata(reader.allocator) {
139
0
}
140
141
0
ColumnReader::~ColumnReader() {
142
0
}
143
144
0
Allocator &ColumnReader::GetAllocator() {
145
0
  return reader.allocator;
146
0
}
147
148
0
const ParquetReader &ColumnReader::Reader() {
149
0
  return reader;
150
0
}
151
152
0
void ColumnReader::RegisterPrefetch(ThriftFileTransport &transport, bool allow_merge) {
153
0
  if (chunk) {
154
0
    uint64_t size = chunk->meta_data.total_compressed_size;
155
0
    transport.RegisterPrefetch(FileOffset(), size, allow_merge);
156
0
  }
157
0
}
158
159
0
unique_ptr<BaseStatistics> ColumnReader::Stats(idx_t row_group_idx_p, const vector<ColumnChunk> &columns) {
160
0
  return Schema().Stats(*reader.GetFileMetadata(), reader.parquet_options, row_group_idx_p, columns);
161
0
}
162
163
0
uint64_t ColumnReader::TotalCompressedSize() {
164
0
  if (IsSkipped()) {
165
0
    return 0;
166
0
  }
167
168
0
  return chunk->meta_data.total_compressed_size;
169
0
}
170
171
// Note: It's not trivial to determine where all Column data is stored. Chunk->file_offset
172
// apparently is not the first page of the data. Therefore we determine the address of the first page by taking the
173
// minimum of all page offsets.
174
0
idx_t ColumnReader::FileOffset() const {
175
0
  if (IsSkipped()) {
176
    //! This column reader is skipped
177
0
    return 0;
178
0
  }
179
0
  auto min_offset = NumericLimits<idx_t>::Maximum();
180
0
  if (chunk->meta_data.__isset.dictionary_page_offset) {
181
0
    if (chunk->meta_data.dictionary_page_offset < 0) {
182
0
      throw InvalidInputException("Failed to read file \"%s\": metadata is corrupt. Column has invalid "
183
0
                                  "dictionary page offset (%lld)",
184
0
                                  reader.GetFileName(), chunk->meta_data.dictionary_page_offset);
185
0
    }
186
0
    min_offset = MinValue<idx_t>(min_offset, NumericCast<idx_t>(chunk->meta_data.dictionary_page_offset));
187
0
  }
188
0
  if (chunk->meta_data.__isset.index_page_offset) {
189
0
    if (chunk->meta_data.index_page_offset < 0) {
190
0
      throw InvalidInputException("Failed to read file \"%s\": metadata is corrupt. Column has invalid "
191
0
                                  "index page offset (%lld)",
192
0
                                  reader.GetFileName(), chunk->meta_data.index_page_offset);
193
0
    }
194
0
    min_offset = MinValue<idx_t>(min_offset, NumericCast<idx_t>(chunk->meta_data.index_page_offset));
195
0
  }
196
0
  if (chunk->meta_data.data_page_offset < 0) {
197
0
    throw InvalidInputException("Failed to read file \"%s\": metadata is corrupt. Column has invalid "
198
0
                                "data page offset (%lld)",
199
0
                                reader.GetFileName(), chunk->meta_data.data_page_offset);
200
0
  }
201
0
  min_offset = MinValue<idx_t>(min_offset, NumericCast<idx_t>(chunk->meta_data.data_page_offset));
202
203
0
  return min_offset;
204
0
}
205
206
0
idx_t ColumnReader::GroupRowsAvailable() {
207
0
  return group_rows_available;
208
0
}
209
210
0
bool ColumnReader::AllValuesAreNull() const {
211
  // for repeated columns the null_count/num_values statistics do not reliably indicate that every value is NULL
212
  // (num_values counts leaf slots, not rows), so we only trust this for non-repeated columns
213
0
  if (MaxRepeat() != 0 || !chunk || !chunk->__isset.meta_data) {
214
0
    return false;
215
0
  }
216
0
  auto &chunk_meta = chunk->meta_data;
217
0
  if (!chunk_meta.__isset.statistics || !chunk_meta.statistics.__isset.null_count) {
218
0
    return false;
219
0
  }
220
0
  return chunk_meta.statistics.null_count == chunk_meta.num_values;
221
0
}
222
223
0
void ColumnReader::PlainSkip(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values) {
224
0
  throw NotImplementedException("PlainSkip not implemented");
225
0
}
226
227
void ColumnReader::Plain(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values, // NOLINT
228
0
                         idx_t result_offset, Vector &result) {
229
0
  throw NotImplementedException("Plain not implemented");
230
0
}
231
232
void ColumnReader::Plain(shared_ptr<ResizeableBuffer> &plain_data, uint8_t *defines, idx_t num_values,
233
0
                         idx_t result_offset, Vector &result) {
234
0
  Plain(*plain_data, defines, num_values, result_offset, result);
235
0
}
236
237
void ColumnReader::PlainSelect(shared_ptr<ResizeableBuffer> &plain_data, uint8_t *defines, idx_t num_values,
238
0
                               Vector &result, const SelectionVector &sel, idx_t count) {
239
0
  throw NotImplementedException("PlainSelect not implemented");
240
0
}
241
242
0
void ColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChunk> &columns, TProtocol &protocol_p) {
243
0
  D_ASSERT(ColumnIndex() < columns.size());
244
0
  chunk = &columns[ColumnIndex()];
245
0
  protocol = &protocol_p;
246
0
  D_ASSERT(chunk);
247
0
  D_ASSERT(chunk->__isset.meta_data);
248
249
0
  if (chunk->__isset.file_path) {
250
0
    throw InvalidInputException("Failed to read file \"%s\": Only inlined data files are supported (no references)",
251
0
                                Reader().GetFileName());
252
0
  }
253
254
0
  if (chunk->meta_data.data_page_offset < 0) {
255
0
    throw InvalidInputException("Failed to read file \"%s\": metadata is corrupt. Column has invalid "
256
0
                                "data page offset (%lld)",
257
0
                                Reader().GetFileName(), chunk->meta_data.data_page_offset);
258
0
  }
259
  // ugh. sometimes there is an extra offset for the dict. sometimes it's wrong.
260
0
  chunk_read_offset = NumericCast<idx_t>(chunk->meta_data.data_page_offset);
261
0
  if (chunk->meta_data.__isset.dictionary_page_offset && chunk->meta_data.dictionary_page_offset >= 4) {
262
    // this assumes the data pages follow the dict pages directly.
263
0
    chunk_read_offset = NumericCast<idx_t>(chunk->meta_data.dictionary_page_offset);
264
0
  }
265
0
  group_rows_available = chunk->meta_data.num_values;
266
0
}
267
268
0
bool ColumnReader::PageIsFilteredOut(PageHeader &page_hdr, optional_ptr<const TableFilter> filter) {
269
0
  if (page_hdr.type != PageType::DATA_PAGE && page_hdr.type != PageType::DATA_PAGE_V2) {
270
    // we can only filter out data pages
271
0
    return false;
272
0
  }
273
0
  bool is_v1 = page_hdr.type == PageType::DATA_PAGE;
274
0
  auto &v1_header = page_hdr.data_page_header;
275
0
  auto &v2_header = page_hdr.data_page_header_v2;
276
0
  auto page_encoding = is_v1 ? v1_header.encoding : v2_header.encoding;
277
278
0
  if (page_encoding == Encoding::PLAIN_DICTIONARY || page_encoding == Encoding::RLE_DICTIONARY) {
279
0
    if (!dictionary_decoder.HasFilteredOutAllValues()) {
280
0
      return false;
281
0
    }
282
0
    encoding = ColumnEncoding::DICTIONARY;
283
0
    page_is_filtered_out = true;
284
0
  } else if (filter) {
285
    // try to use page statistics to skip this page if could.
286
0
    const duckdb_parquet::Statistics *page_stats = nullptr;
287
0
    if (is_v1 && v1_header.__isset.statistics) {
288
0
      page_stats = &v1_header.statistics;
289
0
    } else if (!is_v1 && v2_header.__isset.statistics) {
290
0
      page_stats = &v2_header.statistics;
291
0
    }
292
293
0
    if (!page_stats || !((page_stats->__isset.min_value || page_stats->__isset.min) &&
294
0
                         (page_stats->__isset.max_value || page_stats->__isset.max))) {
295
0
      return false;
296
0
    }
297
0
    auto stats =
298
0
        ParquetStatisticsUtils::TransformParquetStatistics(Type(), Schema(), *page_stats, /*can_have_nan=*/true);
299
0
    auto &expr_filter = filter->Cast<ExpressionFilter>();
300
0
    if (stats && expr_filter.CheckStatistics(*stats) == FilterPropagateResult::FILTER_ALWAYS_FALSE) {
301
0
      page_is_filtered_out = true;
302
0
    }
303
0
  }
304
0
  if (page_is_filtered_out) {
305
    // the page has been filtered out!
306
    // skip forward
307
0
    auto &trans = reinterpret_cast<ThriftFileTransport &>(*protocol->getTransport());
308
0
    trans.Skip(page_hdr.compressed_page_size);
309
0
    page_rows_available = is_v1 ? v1_header.num_values : v2_header.num_values;
310
0
  }
311
312
0
  return page_is_filtered_out;
313
0
}
314
315
0
void ColumnReader::ReadEncrypted(duckdb_apache::thrift::TBase &object) {
316
0
  aad_crypto_metadata.module = ParquetCrypto::GetModuleHeader(*chunk, aad_crypto_metadata.page_ordinal);
317
0
  aad_crypto_metadata.page_ordinal =
318
0
      ParquetCrypto::GetFinalPageOrdinal(*chunk, aad_crypto_metadata.module, aad_crypto_metadata.page_ordinal);
319
0
  reader.ReadEncrypted(object, *protocol, aad_crypto_metadata);
320
0
}
321
322
0
void ColumnReader::ReadDataEncrypted(const data_ptr_t buffer, const uint32_t buffer_size, PageType::type page_type) {
323
0
  aad_crypto_metadata.module = ParquetCrypto::GetModule(*chunk, page_type, aad_crypto_metadata.page_ordinal);
324
0
  aad_crypto_metadata.page_ordinal =
325
0
      ParquetCrypto::GetFinalPageOrdinal(*chunk, aad_crypto_metadata.module, aad_crypto_metadata.page_ordinal);
326
0
  reader.ReadDataEncrypted(*protocol, buffer, buffer_size, aad_crypto_metadata);
327
0
}
328
329
0
void ColumnReader::Read(PageHeader &page_hdr) {
330
0
  if (reader.parquet_options.encryption_config) {
331
0
    ReadEncrypted(page_hdr);
332
0
  } else {
333
0
    reader.Read(page_hdr, *protocol);
334
0
  }
335
0
}
336
337
0
void ColumnReader::ReadData(const data_ptr_t buffer, const uint32_t buffer_size, PageType::type page_type) {
338
0
  if (reader.parquet_options.encryption_config) {
339
0
    ReadDataEncrypted(buffer, buffer_size, page_type);
340
0
  } else {
341
0
    reader.ReadData(*protocol, buffer, buffer_size);
342
0
  }
343
0
}
344
345
void ColumnReader::PrepareRead(optional_ptr<const TableFilter> filter, optional_ptr<TableFilterState> filter_state,
346
0
                               idx_t rows_to_skip) {
347
0
  encoding = ColumnEncoding::INVALID;
348
0
  defined_decoder.reset();
349
0
  page_is_filtered_out = false;
350
0
  block.reset();
351
0
  PageHeader page_hdr;
352
0
  auto &trans = reinterpret_cast<ThriftFileTransport &>(*protocol->getTransport());
353
354
0
  if (trans.HasPrefetch()) {
355
    // Already has some data prefetched, let's not mess with it
356
0
    Read(page_hdr);
357
0
  } else {
358
    // No prefetch yet, prefetch the full header in one go (so thrift won't read byte-by-byte from storage)
359
    // 256 bytes should cover almost all headers (unless it's a V2 header with really LONG string statistics)
360
0
    static constexpr idx_t ASSUMED_HEADER_SIZE = 256;
361
0
    const auto prefetch_size = MinValue(trans.GetSize() - trans.GetLocation(), ASSUMED_HEADER_SIZE);
362
0
    trans.Prefetch(trans.GetLocation(), prefetch_size);
363
0
    Read(page_hdr);
364
0
    trans.ClearPrefetch();
365
0
  }
366
  // some basic sanity check
367
0
  if (page_hdr.compressed_page_size < 0 || page_hdr.uncompressed_page_size < 0) {
368
0
    throw InvalidInputException("Failed to read file \"%s\": Page sizes must be >= 0", Reader().GetFileName());
369
0
  }
370
371
0
  if (PageIsFilteredOut(page_hdr, filter)) {
372
0
    return;
373
0
  }
374
375
0
  if (rows_to_skip > 0 && (page_hdr.type == PageType::DATA_PAGE || page_hdr.type == PageType::DATA_PAGE_V2)) {
376
0
    bool is_v1 = page_hdr.type == PageType::DATA_PAGE;
377
0
    idx_t page_num_values =
378
0
        NumericCast<idx_t>(is_v1 ? page_hdr.data_page_header.num_values : page_hdr.data_page_header_v2.num_values);
379
0
    if (rows_to_skip >= page_num_values) {
380
0
      trans.Skip(page_hdr.compressed_page_size);
381
0
      page_is_filtered_out = true;
382
0
      page_rows_available = page_num_values;
383
0
      return;
384
0
    }
385
0
  }
386
387
0
  switch (page_hdr.type) {
388
0
  case PageType::DATA_PAGE_V2:
389
0
    PreparePageV2(page_hdr);
390
0
    PrepareDataPage(page_hdr);
391
0
    break;
392
0
  case PageType::DATA_PAGE:
393
0
    PreparePage(page_hdr);
394
0
    PrepareDataPage(page_hdr);
395
0
    break;
396
0
  case PageType::DICTIONARY_PAGE: {
397
0
    PreparePage(page_hdr);
398
0
    auto dictionary_size = page_hdr.dictionary_page_header.num_values;
399
0
    if (dictionary_size < 0) {
400
0
      throw InvalidInputException("Failed to read file \"%s\": Invalid dictionary page header (num_values < 0)",
401
0
                                  Reader().GetFileName());
402
0
    }
403
0
    dictionary_decoder.InitializeDictionary(dictionary_size, filter, filter_state, HasDefines());
404
0
    break;
405
0
  }
406
0
  default:
407
0
    break; // ignore INDEX page type and any other custom extensions
408
0
  }
409
0
  ResetPage();
410
0
}
411
412
0
void ColumnReader::ResetPage() {
413
0
}
414
415
0
void ColumnReader::PreparePageV2(PageHeader &page_hdr) {
416
0
  D_ASSERT(page_hdr.type == PageType::DATA_PAGE_V2);
417
418
0
  AllocateBlock(page_hdr.uncompressed_page_size + 1);
419
0
  bool uncompressed = false;
420
0
  if (page_hdr.data_page_header_v2.__isset.is_compressed && !page_hdr.data_page_header_v2.is_compressed) {
421
0
    uncompressed = true;
422
0
  }
423
0
  if (chunk->meta_data.codec == CompressionCodec::UNCOMPRESSED) {
424
0
    if (page_hdr.compressed_page_size != page_hdr.uncompressed_page_size) {
425
0
      const auto &file_name = Reader().GetFileName();
426
0
      throw InvalidInputException(
427
0
          "Parquet file (%s) corrupted: uncompressed page size mismatch (expected %d, actual: %d)", file_name,
428
0
          page_hdr.uncompressed_page_size, page_hdr.compressed_page_size);
429
0
    }
430
0
    uncompressed = true;
431
0
  }
432
0
  if (uncompressed) {
433
0
    ReadData(block->ptr, page_hdr.compressed_page_size, page_hdr.type);
434
0
    return;
435
0
  }
436
437
  // copy repeats & defines as-is because FOR SOME REASON they are uncompressed.
438
  // the page sizes are already validated >= 0 by the caller, but the level lengths are not, so guard
439
  // them here. with all four i32 header fields non-negative, the sum below cannot overflow once widened
440
  // to uint64_t, and the uint64_t casts in the comparisons below are safe.
441
0
  if (page_hdr.data_page_header_v2.repetition_levels_byte_length < 0 ||
442
0
      page_hdr.data_page_header_v2.definition_levels_byte_length < 0) {
443
0
    throw InvalidInputException(
444
0
        "Failed to read file \"%s\": header inconsistency, repetition_levels_byte_length and "
445
0
        "definition_levels_byte_length must be >= 0",
446
0
        Reader().GetFileName());
447
0
  }
448
0
  uint64_t uncompressed_bytes = static_cast<uint64_t>(page_hdr.data_page_header_v2.repetition_levels_byte_length) +
449
0
                                page_hdr.data_page_header_v2.definition_levels_byte_length;
450
0
  if (uncompressed_bytes > static_cast<uint64_t>(page_hdr.uncompressed_page_size)) {
451
0
    throw InvalidInputException(
452
0
        "Failed to read file \"%s\": header inconsistency, uncompressed_page_size needs to be larger than "
453
0
        "repetition_levels_byte_length + definition_levels_byte_length",
454
0
        Reader().GetFileName());
455
0
  }
456
0
  if (static_cast<uint64_t>(page_hdr.compressed_page_size) < uncompressed_bytes) {
457
0
    throw InvalidInputException(
458
0
        "Failed to read file \"%s\": header inconsistency, compressed_page_size is smaller than "
459
0
        "repetition_levels_byte_length + definition_levels_byte_length",
460
0
        Reader().GetFileName());
461
0
  }
462
463
0
  ReadData(block->ptr, uncompressed_bytes, page_hdr.type);
464
465
0
  auto compressed_bytes = page_hdr.compressed_page_size - uncompressed_bytes;
466
467
0
  if (compressed_bytes == 0 && static_cast<uint64_t>(page_hdr.uncompressed_page_size) > uncompressed_bytes) {
468
0
    throw InvalidInputException(
469
0
        "Failed to read file \"%s\": header inconsistency, compressed_page_size is too small for the "
470
0
        "declared value region",
471
0
        Reader().GetFileName());
472
0
  }
473
474
0
  if (compressed_bytes > 0) {
475
0
    ResizeableBuffer compressed_buffer;
476
0
    compressed_buffer.resize(GetAllocator(), compressed_bytes);
477
478
0
    ReadData(compressed_buffer.ptr, compressed_bytes, page_hdr.type);
479
480
0
    DecompressInternal(chunk->meta_data.codec, compressed_buffer.ptr, compressed_bytes,
481
0
                       block->ptr + uncompressed_bytes, page_hdr.uncompressed_page_size - uncompressed_bytes);
482
0
  }
483
0
}
484
485
0
void ColumnReader::AllocateBlock(idx_t size) {
486
0
  if (!block) {
487
0
    block = make_shared_ptr<ResizeableBuffer>(GetAllocator(), size);
488
0
  } else {
489
0
    block->resize(GetAllocator(), size);
490
0
  }
491
0
}
492
493
0
void ColumnReader::PreparePage(PageHeader &page_hdr) {
494
0
  AllocateBlock(page_hdr.uncompressed_page_size + 1);
495
0
  uint32_t compressed_page_size = page_hdr.compressed_page_size;
496
497
0
  if (chunk->__isset.crypto_metadata) {
498
0
    auto const file_aad = reader.GetUniqueFileIdentifier(reader.metadata->crypto_metadata->encryption_algorithm);
499
0
    if (!file_aad.empty()) {
500
      // If there is a file aad (identifier), this means that the Encrypted file is written by Arrow
501
      // Arrow adds the bytes for encryption (len + nonce + tag)
502
      // to the compressed page size
503
0
      compressed_page_size -=
504
0
          (ParquetCrypto::LENGTH_BYTES + ParquetCrypto::NONCE_BYTES + ParquetCrypto::TAG_BYTES);
505
0
    }
506
0
  }
507
508
0
  if (chunk->meta_data.codec == CompressionCodec::UNCOMPRESSED) {
509
0
    if (compressed_page_size != NumericCast<uint32_t>(page_hdr.uncompressed_page_size)) {
510
0
      const auto &file_name = Reader().GetFileName();
511
0
      throw InvalidInputException(
512
0
          "Parquet file (%s) corrupted: uncompressed page size mismatch (expected %d, actual: %d)", file_name,
513
0
          page_hdr.uncompressed_page_size, compressed_page_size);
514
0
    }
515
0
    ReadData(block->ptr, compressed_page_size, page_hdr.type);
516
0
    return;
517
0
  }
518
519
0
  ResizeableBuffer compressed_buffer;
520
0
  compressed_buffer.resize(GetAllocator(), compressed_page_size + 1);
521
0
  ReadData(compressed_buffer.ptr, compressed_page_size, page_hdr.type);
522
523
0
  DecompressInternal(chunk->meta_data.codec, compressed_buffer.ptr, compressed_page_size, block->ptr,
524
0
                     page_hdr.uncompressed_page_size);
525
0
}
526
527
void ColumnReader::DecompressInternal(CompressionCodec::type codec, const_data_ptr_t src, idx_t src_size,
528
0
                                      data_ptr_t dst, idx_t dst_size) {
529
0
  switch (codec) {
530
0
  case CompressionCodec::UNCOMPRESSED:
531
0
    throw InternalException("Parquet data unexpectedly uncompressed");
532
0
  case CompressionCodec::GZIP: {
533
0
    MiniZStream s;
534
0
    s.Decompress(const_char_ptr_cast(src), src_size, char_ptr_cast(dst), dst_size);
535
0
    break;
536
0
  }
537
0
  case CompressionCodec::LZ4_RAW: {
538
0
    auto res =
539
0
        duckdb_lz4::LZ4_decompress_safe(const_char_ptr_cast(src), char_ptr_cast(dst),
540
0
                                        UnsafeNumericCast<int32_t>(src_size), UnsafeNumericCast<int32_t>(dst_size));
541
0
    if (res != NumericCast<int>(dst_size)) {
542
0
      throw InvalidInputException("Failed to read file \"%s\": LZ4 decompression failure",
543
0
                                  Reader().GetFileName());
544
0
    }
545
0
    break;
546
0
  }
547
0
  case CompressionCodec::SNAPPY: {
548
0
    {
549
0
      size_t uncompressed_size = 0;
550
0
      auto res = duckdb_snappy::GetUncompressedLength(const_char_ptr_cast(src), src_size, &uncompressed_size);
551
0
      if (!res) {
552
0
        throw InvalidInputException("Failed to read file \"%s\": Snappy decompression failure",
553
0
                                    Reader().GetFileName());
554
0
      }
555
0
      if (uncompressed_size != dst_size) {
556
0
        throw InvalidInputException(
557
0
            "Failed to read file \"%s\": Snappy decompression failure: Uncompressed data size mismatch",
558
0
            Reader().GetFileName());
559
0
      }
560
0
    }
561
0
    auto res = duckdb_snappy::RawUncompress(const_char_ptr_cast(src), src_size, char_ptr_cast(dst));
562
0
    if (!res) {
563
0
      throw InvalidInputException("Failed to read file \"%s\": Snappy decompression failure",
564
0
                                  Reader().GetFileName());
565
0
    }
566
0
    break;
567
0
  }
568
0
  case CompressionCodec::ZSTD: {
569
0
    auto res = duckdb_zstd::ZSTD_decompress(dst, dst_size, src, src_size);
570
0
    if (duckdb_zstd::ZSTD_isError(res) || res != dst_size) {
571
0
      throw InvalidInputException("Failed to read file \"%s\": ZSTD Decompression failure",
572
0
                                  Reader().GetFileName());
573
0
    }
574
0
    break;
575
0
  }
576
0
  case CompressionCodec::BROTLI: {
577
0
    auto state = duckdb_brotli::BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
578
0
    size_t total_out = 0;
579
0
    auto src_size_size_t = NumericCast<size_t>(src_size);
580
0
    auto dst_size_size_t = NumericCast<size_t>(dst_size);
581
582
0
    auto res = duckdb_brotli::BrotliDecoderDecompressStream(state, &src_size_size_t, &src, &dst_size_size_t, &dst,
583
0
                                                            &total_out);
584
0
    if (res != duckdb_brotli::BROTLI_DECODER_RESULT_SUCCESS) {
585
0
      throw InvalidInputException("Failed to read file \"%s\": Brotli Decompression failure",
586
0
                                  Reader().GetFileName());
587
0
    }
588
0
    duckdb_brotli::BrotliDecoderDestroyInstance(state);
589
0
    break;
590
0
  }
591
592
0
  default: {
593
0
    duckdb::stringstream codec_name;
594
0
    codec_name << codec;
595
0
    throw InvalidInputException("Failed to read file \"%s\": Unsupported compression codec \"%s\". Supported "
596
0
                                "options are uncompressed, brotli, gzip, lz4_raw, snappy or zstd",
597
0
                                Reader().GetFileName(), codec_name.str());
598
0
  }
599
0
  }
600
0
}
601
602
0
void ColumnReader::PrepareDataPage(PageHeader &page_hdr) {
603
0
  if (page_hdr.type == PageType::DATA_PAGE && !page_hdr.__isset.data_page_header) {
604
0
    throw InvalidInputException("Failed to read file \"%s\": Missing data page header from data page",
605
0
                                Reader().GetFileName());
606
0
  }
607
0
  if (page_hdr.type == PageType::DATA_PAGE_V2 && !page_hdr.__isset.data_page_header_v2) {
608
0
    throw InvalidInputException("Failed to read file \"%s\": Missing data page header from data page v2",
609
0
                                Reader().GetFileName());
610
0
  }
611
612
0
  bool is_v1 = page_hdr.type == PageType::DATA_PAGE;
613
0
  bool is_v2 = page_hdr.type == PageType::DATA_PAGE_V2;
614
0
  auto &v1_header = page_hdr.data_page_header;
615
0
  auto &v2_header = page_hdr.data_page_header_v2;
616
617
0
  page_rows_available = is_v1 ? v1_header.num_values : v2_header.num_values;
618
0
  auto page_encoding = is_v1 ? v1_header.encoding : v2_header.encoding;
619
620
0
  if (HasRepeats()) {
621
0
    uint32_t rep_length = is_v1 ? block->read<uint32_t>() : v2_header.repetition_levels_byte_length;
622
0
    block->available(rep_length);
623
0
    repeated_decoder =
624
0
        make_uniq<RleBpDecoder>(block->ptr, rep_length, RleBpDecoder::ComputeBitWidthFromMaxValue(MaxRepeat()));
625
0
    block->inc(rep_length);
626
0
  } else if (is_v2 && v2_header.repetition_levels_byte_length > 0) {
627
0
    block->inc(v2_header.repetition_levels_byte_length);
628
0
  }
629
630
0
  if (HasDefines()) {
631
0
    uint32_t def_length = is_v1 ? block->read<uint32_t>() : v2_header.definition_levels_byte_length;
632
0
    block->available(def_length);
633
0
    defined_decoder =
634
0
        make_uniq<RleBpDecoder>(block->ptr, def_length, RleBpDecoder::ComputeBitWidthFromMaxValue(MaxDefine()));
635
0
    block->inc(def_length);
636
0
  } else if (is_v2 && v2_header.definition_levels_byte_length > 0) {
637
0
    block->inc(v2_header.definition_levels_byte_length);
638
0
  }
639
640
0
  switch (page_encoding) {
641
0
  case Encoding::RLE_DICTIONARY:
642
0
  case Encoding::PLAIN_DICTIONARY: {
643
0
    encoding = ColumnEncoding::DICTIONARY;
644
0
    dictionary_decoder.InitializePage();
645
0
    break;
646
0
  }
647
0
  case Encoding::RLE: {
648
0
    encoding = ColumnEncoding::RLE;
649
0
    rle_decoder.InitializePage();
650
0
    break;
651
0
  }
652
0
  case Encoding::DELTA_BINARY_PACKED: {
653
0
    encoding = ColumnEncoding::DELTA_BINARY_PACKED;
654
0
    delta_binary_packed_decoder.InitializePage();
655
0
    break;
656
0
  }
657
0
  case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
658
0
    encoding = ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY;
659
0
    delta_length_byte_array_decoder.InitializePage();
660
0
    break;
661
0
  }
662
0
  case Encoding::DELTA_BYTE_ARRAY: {
663
0
    encoding = ColumnEncoding::DELTA_BYTE_ARRAY;
664
0
    delta_byte_array_decoder.InitializePage();
665
0
    break;
666
0
  }
667
0
  case Encoding::BYTE_STREAM_SPLIT: {
668
0
    encoding = ColumnEncoding::BYTE_STREAM_SPLIT;
669
0
    byte_stream_split_decoder.InitializePage();
670
0
    break;
671
0
  }
672
0
  case Encoding::PLAIN:
673
    // nothing to do here, will be read directly below
674
0
    encoding = ColumnEncoding::PLAIN;
675
0
    break;
676
677
0
  default:
678
0
    throw InvalidInputException("Failed to read file \"%s\": Unsupported page encoding", Reader().GetFileName());
679
0
  }
680
0
}
681
682
0
void ColumnReader::BeginRead(data_ptr_t define_out, data_ptr_t repeat_out) {
683
  // we need to reset the location because multiple column readers share the same protocol
684
0
  auto &trans = reinterpret_cast<ThriftFileTransport &>(*protocol->getTransport());
685
0
  trans.SetLocation(chunk_read_offset);
686
687
  // Perform any skips that were not applied yet.
688
0
  if (define_out && repeat_out) {
689
0
    ApplyPendingSkips(define_out, repeat_out);
690
0
  }
691
0
}
692
693
idx_t ColumnReader::ReadPageHeaders(idx_t max_read, optional_ptr<const TableFilter> filter,
694
0
                                    optional_ptr<TableFilterState> filter_state, idx_t rows_to_skip) {
695
0
  int8_t page_ordinal = 0;
696
0
  while (page_rows_available == 0) {
697
0
    aad_crypto_metadata.page_ordinal = page_ordinal;
698
0
    PrepareRead(filter, filter_state, rows_to_skip);
699
0
    page_ordinal++;
700
0
  }
701
0
  return MinValue<idx_t>(MinValue<idx_t>(max_read, page_rows_available), STANDARD_VECTOR_SIZE);
702
0
}
703
704
0
bool ColumnReader::PrepareRead(idx_t read_now, data_ptr_t define_out, data_ptr_t repeat_out, idx_t result_offset) {
705
0
  D_ASSERT(block);
706
707
0
  D_ASSERT(read_now + result_offset <= STANDARD_VECTOR_SIZE);
708
0
  D_ASSERT(!page_is_filtered_out);
709
710
0
  if (HasRepeats()) {
711
0
    D_ASSERT(repeated_decoder);
712
0
    repeated_decoder->GetBatch<uint8_t>(repeat_out + result_offset, read_now);
713
0
  }
714
715
0
  if (HasDefines()) {
716
0
    D_ASSERT(defined_decoder);
717
0
    const auto max_define = NumericCast<uint8_t>(MaxDefine());
718
0
    if (!HasRepeats() && defined_decoder->HasRepeatedBatch<uint8_t>(read_now, max_define)) {
719
      // Fast path: no repeats and all valid
720
0
      defined_decoder->GetRepeatedBatch<uint8_t>(read_now, max_define);
721
0
      return true;
722
0
    }
723
0
    defined_decoder->GetBatch<uint8_t>(define_out + result_offset, read_now);
724
0
    return false;
725
0
  }
726
727
0
  return true; // No defines, so everything is valid
728
0
}
729
730
void ColumnReader::ReadData(idx_t read_now, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result,
731
0
                            idx_t result_offset) {
732
  // flatten the result vector if required
733
0
  if (result_offset != 0 && result.GetVectorType() != VectorType::FLAT_VECTOR) {
734
0
    result.Flatten();
735
0
    result.Reserve(STANDARD_VECTOR_SIZE);
736
0
  }
737
0
  if (page_is_filtered_out) {
738
    // page is filtered out - emit NULL for any rows
739
0
    auto &validity = FlatVector::ValidityMutable(result);
740
0
    for (idx_t i = 0; i < read_now; i++) {
741
0
      validity.SetInvalid(result_offset + i);
742
0
    }
743
0
    page_rows_available -= read_now;
744
0
    return;
745
0
  }
746
  // read the defines/repeats
747
0
  const auto all_valid = PrepareRead(read_now, define_out, repeat_out, result_offset);
748
0
  if (!IsRoot() && AllValuesAreNull()) {
749
    // every value is NULL: the parent still needs the define/repeat levels we just read, but there are no
750
    // values to decode - set the result to NULL and skip the encoding read
751
0
    if (result_offset == 0) {
752
      // we own the entire vector - emit a constant NULL
753
0
      ConstantVector::SetNull(result, count_t(read_now));
754
0
    } else {
755
0
      for (idx_t i = 0; i < read_now; i++) {
756
0
        FlatVector::SetNull(result, result_offset + i, true);
757
0
      }
758
0
    }
759
0
    page_rows_available -= read_now;
760
0
    return;
761
0
  }
762
  // read the data according to the encoder
763
0
  const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(define_out);
764
0
  switch (encoding) {
765
0
  case ColumnEncoding::DICTIONARY:
766
0
    dictionary_decoder.Read(define_ptr, read_now, result, result_offset);
767
0
    break;
768
0
  case ColumnEncoding::DELTA_BINARY_PACKED:
769
0
    delta_binary_packed_decoder.Read(define_ptr, read_now, result, result_offset);
770
0
    break;
771
0
  case ColumnEncoding::RLE:
772
0
    rle_decoder.Read(define_ptr, read_now, result, result_offset);
773
0
    break;
774
0
  case ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY:
775
0
    delta_length_byte_array_decoder.Read(block, define_ptr, read_now, result, result_offset);
776
0
    break;
777
0
  case ColumnEncoding::DELTA_BYTE_ARRAY:
778
0
    delta_byte_array_decoder.Read(define_ptr, read_now, result, result_offset);
779
0
    break;
780
0
  case ColumnEncoding::BYTE_STREAM_SPLIT:
781
0
    byte_stream_split_decoder.Read(define_ptr, read_now, result, result_offset);
782
0
    break;
783
0
  default:
784
0
    Plain(block, define_ptr, read_now, result_offset, result);
785
0
    break;
786
0
  }
787
0
  page_rows_available -= read_now;
788
0
}
789
790
0
void ColumnReader::FinishRead(idx_t read_count) {
791
0
  auto &trans = reinterpret_cast<ThriftFileTransport &>(*protocol->getTransport());
792
0
  chunk_read_offset = trans.GetLocation();
793
794
0
  group_rows_available -= read_count;
795
0
}
796
797
0
idx_t ColumnReader::ReadInternal(ColumnReaderInput &input, Vector &result) {
798
0
  idx_t result_offset = 0;
799
800
0
  auto &num_values = input.num_values;
801
0
  auto &define_out = input.define_out;
802
0
  auto &repeat_out = input.repeat_out;
803
804
0
  auto to_read = num_values;
805
0
  D_ASSERT(to_read <= STANDARD_VECTOR_SIZE);
806
807
0
  while (to_read > 0) {
808
0
    auto read_now = ReadPageHeaders(to_read);
809
810
0
    ReadData(read_now, define_out, repeat_out, result, result_offset);
811
812
0
    result_offset += read_now;
813
0
    to_read -= read_now;
814
0
  }
815
0
  FinishRead(num_values);
816
817
0
  return num_values;
818
0
}
819
820
0
idx_t ColumnReader::Read(ColumnReaderInput &input, Vector &result) {
821
0
  if (IsRoot() && AllValuesAreNull()) {
822
    // a top-level column that is entirely NULL - emit a constant NULL vector without reading anything.
823
    // (nested columns are handled in ReadData: they still need to emit their define/repeat levels)
824
0
    ConstantVector::SetNull(result, count_t(input.num_values));
825
0
    return input.num_values;
826
0
  }
827
0
  BeginRead(input.define_out, input.repeat_out);
828
0
  return ReadInternal(input, result);
829
0
}
830
831
void ColumnReader::Select(ColumnReaderInput &input, Vector &result, const SelectionVector &sel,
832
0
                          idx_t approved_tuple_count) {
833
0
  auto &num_values = input.num_values;
834
0
  if (SupportsDirectSelect() && approved_tuple_count < num_values && !(IsRoot() && AllValuesAreNull())) {
835
0
    DirectSelect(input, result, sel, approved_tuple_count);
836
0
    return;
837
0
  }
838
0
  Read(input, result);
839
0
}
840
841
void ColumnReader::DirectSelect(ColumnReaderInput &input, Vector &result, const SelectionVector &sel,
842
0
                                idx_t approved_tuple_count) {
843
0
  auto &num_values = input.num_values;
844
0
  auto &define_out = input.define_out;
845
0
  auto &repeat_out = input.repeat_out;
846
847
0
  auto to_read = num_values;
848
849
  // prepare the first read if we haven't yet
850
0
  BeginRead(define_out, repeat_out);
851
0
  auto read_now = ReadPageHeaders(to_read);
852
853
  // we can only push the filter into the decoder if we are reading the ENTIRE vector in one go
854
0
  if (read_now == to_read && encoding == ColumnEncoding::PLAIN) {
855
0
    const auto all_valid = PrepareRead(read_now, define_out, repeat_out, 0);
856
0
    const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(define_out);
857
0
    PlainSelect(block, define_ptr, read_now, result, sel, approved_tuple_count);
858
859
0
    page_rows_available -= read_now;
860
0
    FinishRead(to_read);
861
0
    return;
862
0
  }
863
  // fallback to regular read + filter
864
0
  ReadInternal(input, result);
865
0
}
866
867
void ColumnReader::Filter(ColumnReaderInput &input, Vector &result, const TableFilter &filter,
868
                          TableFilterState &filter_state, SelectionVector &sel, idx_t &approved_tuple_count,
869
0
                          bool is_first_filter) {
870
0
  auto &num_values = input.num_values;
871
0
  if (SupportsDirectFilter() && is_first_filter && !(IsRoot() && AllValuesAreNull())) {
872
0
    DirectFilter(input, result, filter, filter_state, sel, approved_tuple_count);
873
0
    return;
874
0
  }
875
0
  Select(input, result, sel, approved_tuple_count);
876
0
  ApplyFilter(result, filter, filter_state, num_values, sel, approved_tuple_count);
877
0
}
878
879
void ColumnReader::DirectFilter(ColumnReaderInput &input, Vector &result, const TableFilter &filter,
880
0
                                TableFilterState &filter_state, SelectionVector &sel, idx_t &approved_tuple_count) {
881
0
  auto &num_values = input.num_values;
882
0
  auto &define_out = input.define_out;
883
0
  auto &repeat_out = input.repeat_out;
884
885
0
  auto to_read = num_values;
886
887
  // prepare the first read if we haven't yet
888
0
  BeginRead(define_out, repeat_out);
889
0
  auto read_now = ReadPageHeaders(to_read, &filter, &filter_state);
890
891
  // we can only push the filter into the decoder if we are reading the ENTIRE vector in one go
892
0
  if (encoding == ColumnEncoding::DICTIONARY && read_now == to_read && dictionary_decoder.HasFilter()) {
893
0
    if (page_is_filtered_out) {
894
      // the page has been filtered out entirely - skip
895
0
      approved_tuple_count = 0;
896
0
    } else {
897
      // Push filter into dictionary directly
898
      // read the defines/repeats
899
0
      const auto all_valid = PrepareRead(read_now, define_out, repeat_out, 0);
900
0
      const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(define_out);
901
0
      dictionary_decoder.Filter(define_ptr, read_now, result, sel, approved_tuple_count);
902
0
    }
903
0
    page_rows_available -= read_now;
904
0
    FinishRead(to_read);
905
0
    return;
906
0
  }
907
  // fallback to regular read + filter
908
0
  ReadInternal(input, result);
909
0
  ApplyFilter(result, filter, filter_state, num_values, sel, approved_tuple_count);
910
0
}
911
912
void ColumnReader::ApplyFilter(Vector &v, const TableFilter &filter, TableFilterState &filter_state, idx_t scan_count,
913
0
                               SelectionVector &sel, idx_t &approved_tuple_count) {
914
0
  FlatVector::SetSize(v, count_t(scan_count));
915
0
  ColumnSegment::FilterSelection(sel, v, filter_state, scan_count, approved_tuple_count);
916
0
}
917
918
0
void ColumnReader::Skip(idx_t num_values) {
919
0
  pending_skips += num_values;
920
0
}
921
922
0
void ColumnReader::ApplyPendingSkips(data_ptr_t define_out, data_ptr_t repeat_out) {
923
0
  if (pending_skips == 0) {
924
0
    return;
925
0
  }
926
0
  idx_t num_values = pending_skips;
927
0
  pending_skips = 0;
928
929
0
  auto to_skip = num_values;
930
0
  data_t skip_defines[STANDARD_VECTOR_SIZE] = {};
931
0
  data_t skip_repeats[STANDARD_VECTOR_SIZE];
932
0
  data_ptr_t skip_define_out = HasDefines() ? skip_defines : define_out;
933
0
  data_ptr_t skip_repeat_out = HasRepeats() ? skip_repeats : repeat_out;
934
  // start reading but do not apply skips (we are skipping now)
935
0
  BeginRead(nullptr, nullptr);
936
937
0
  while (to_skip > 0) {
938
0
    auto skip_now = ReadPageHeaders(to_skip, nullptr, nullptr, to_skip);
939
0
    if (page_is_filtered_out) {
940
      // the page has been filtered out entirely - skip
941
0
      page_rows_available -= skip_now;
942
0
      to_skip -= skip_now;
943
0
      continue;
944
0
    }
945
0
    const auto all_valid = PrepareRead(skip_now, skip_define_out, skip_repeat_out, 0);
946
947
0
    const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(skip_define_out);
948
0
    switch (encoding) {
949
0
    case ColumnEncoding::DICTIONARY:
950
0
      dictionary_decoder.Skip(define_ptr, skip_now);
951
0
      break;
952
0
    case ColumnEncoding::DELTA_BINARY_PACKED:
953
0
      delta_binary_packed_decoder.Skip(define_ptr, skip_now);
954
0
      break;
955
0
    case ColumnEncoding::RLE:
956
0
      rle_decoder.Skip(define_ptr, skip_now);
957
0
      break;
958
0
    case ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY:
959
0
      delta_length_byte_array_decoder.Skip(define_ptr, skip_now);
960
0
      break;
961
0
    case ColumnEncoding::DELTA_BYTE_ARRAY:
962
0
      delta_byte_array_decoder.Skip(define_ptr, skip_now);
963
0
      break;
964
0
    case ColumnEncoding::BYTE_STREAM_SPLIT:
965
0
      byte_stream_split_decoder.Skip(define_ptr, skip_now);
966
0
      break;
967
0
    default:
968
0
      PlainSkip(*block, define_ptr, skip_now);
969
0
      break;
970
0
    }
971
0
    page_rows_available -= skip_now;
972
0
    to_skip -= skip_now;
973
0
  }
974
0
  FinishRead(num_values);
975
0
}
976
977
//===--------------------------------------------------------------------===//
978
// Create Column Reader
979
//===--------------------------------------------------------------------===//
980
template <class T>
981
0
static unique_ptr<ColumnReader> CreateDecimalReader(const ParquetReader &reader, const ParquetColumnSchema &schema) {
982
0
  switch (schema.type.InternalType()) {
983
0
  case PhysicalType::INT16:
984
0
    return make_uniq<TemplatedColumnReader<int16_t, TemplatedParquetValueConversion<T>>>(reader, schema);
985
0
  case PhysicalType::INT32:
986
0
    return make_uniq<TemplatedColumnReader<int32_t, TemplatedParquetValueConversion<T>>>(reader, schema);
987
0
  case PhysicalType::INT64:
988
0
    return make_uniq<TemplatedColumnReader<int64_t, TemplatedParquetValueConversion<T>>>(reader, schema);
989
0
  case PhysicalType::INT128:
990
0
    return make_uniq<TemplatedColumnReader<hugeint_t, TemplatedParquetValueConversion<T>>>(reader, schema);
991
0
  default:
992
0
    throw NotImplementedException("Unimplemented internal type for CreateDecimalReader");
993
0
  }
994
0
}
Unexecuted instantiation: column_reader.cpp:duckdb::unique_ptr<duckdb::ColumnReader, std::__1::default_delete<duckdb::ColumnReader>, true> duckdb::CreateDecimalReader<int>(duckdb::ParquetReader const&, duckdb::ParquetColumnSchema const&)
Unexecuted instantiation: column_reader.cpp:duckdb::unique_ptr<duckdb::ColumnReader, std::__1::default_delete<duckdb::ColumnReader>, true> duckdb::CreateDecimalReader<long>(duckdb::ParquetReader const&, duckdb::ParquetColumnSchema const&)
995
996
0
unique_ptr<ColumnReader> ColumnReader::CreateReader(const ParquetReader &reader, const ParquetColumnSchema &schema) {
997
0
  switch (schema.type.id()) {
998
0
  case LogicalTypeId::BOOLEAN:
999
0
    return make_uniq<BooleanColumnReader>(reader, schema);
1000
0
  case LogicalTypeId::UTINYINT:
1001
0
    return make_uniq<TemplatedColumnReader<uint8_t, TemplatedParquetValueConversion<uint32_t>>>(reader, schema);
1002
0
  case LogicalTypeId::USMALLINT:
1003
0
    return make_uniq<TemplatedColumnReader<uint16_t, TemplatedParquetValueConversion<uint32_t>>>(reader, schema);
1004
0
  case LogicalTypeId::UINTEGER:
1005
0
    return make_uniq<TemplatedColumnReader<uint32_t, TemplatedParquetValueConversion<uint32_t>>>(reader, schema);
1006
0
  case LogicalTypeId::UBIGINT:
1007
0
    return make_uniq<TemplatedColumnReader<uint64_t, TemplatedParquetValueConversion<uint64_t>>>(reader, schema);
1008
0
  case LogicalTypeId::TINYINT:
1009
0
    return make_uniq<TemplatedColumnReader<int8_t, TemplatedParquetValueConversion<int32_t>>>(reader, schema);
1010
0
  case LogicalTypeId::SMALLINT:
1011
0
    return make_uniq<TemplatedColumnReader<int16_t, TemplatedParquetValueConversion<int32_t>>>(reader, schema);
1012
0
  case LogicalTypeId::INTEGER:
1013
0
    return make_uniq<TemplatedColumnReader<int32_t, TemplatedParquetValueConversion<int32_t>>>(reader, schema);
1014
0
  case LogicalTypeId::BIGINT:
1015
0
    return make_uniq<TemplatedColumnReader<int64_t, TemplatedParquetValueConversion<int64_t>>>(reader, schema);
1016
0
  case LogicalTypeId::FLOAT:
1017
0
    if (schema.type_info == ParquetExtraTypeInfo::FLOAT16) {
1018
0
      return make_uniq<CallbackColumnReader<uint16_t, float, Float16ToFloat32>>(reader, schema);
1019
0
    }
1020
0
    return make_uniq<TemplatedColumnReader<float, TemplatedParquetValueConversion<float>>>(reader, schema);
1021
0
  case LogicalTypeId::DOUBLE:
1022
0
    if (schema.type_info == ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY) {
1023
0
      return ParquetDecimalUtils::CreateReader(reader, schema);
1024
0
    }
1025
0
    return make_uniq<TemplatedColumnReader<double, TemplatedParquetValueConversion<double>>>(reader, schema);
1026
0
  case LogicalTypeId::TIMESTAMP:
1027
0
  case LogicalTypeId::TIMESTAMP_TZ:
1028
0
    switch (schema.type_info) {
1029
0
    case ParquetExtraTypeInfo::IMPALA_TIMESTAMP:
1030
0
      return make_uniq<CallbackColumnReader<Int96, timestamp_t, ImpalaTimestampToTimestamp>>(reader, schema);
1031
0
    case ParquetExtraTypeInfo::UNIT_MS:
1032
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_t, ParquetTimestampMsToTimestamp>>(reader, schema);
1033
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
1034
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_t, ParquetTimestampMicrosToTimestamp>>(reader,
1035
0
                                                                                                      schema);
1036
0
    case ParquetExtraTypeInfo::UNIT_NS:
1037
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_t, ParquetTimestampNsToTimestamp>>(reader, schema);
1038
0
    default:
1039
0
      throw InternalException("TIMESTAMP requires type info");
1040
0
    }
1041
0
  case LogicalTypeId::TIMESTAMP_NS:
1042
0
  case LogicalTypeId::TIMESTAMP_TZ_NS:
1043
0
    switch (schema.type_info) {
1044
0
    case ParquetExtraTypeInfo::IMPALA_TIMESTAMP:
1045
0
      return make_uniq<CallbackColumnReader<Int96, timestamp_ns_t, ImpalaTimestampToTimestampNS>>(reader, schema);
1046
0
    case ParquetExtraTypeInfo::UNIT_MS:
1047
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_ns_t, ParquetTimestampMsToTimestampNs>>(reader,
1048
0
                                                                                                       schema);
1049
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
1050
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_ns_t, ParquetTimestampUsToTimestampNs>>(reader,
1051
0
                                                                                                       schema);
1052
0
    case ParquetExtraTypeInfo::UNIT_NS:
1053
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_ns_t, ParquetTimestampNsToTimestampNs>>(reader,
1054
0
                                                                                                       schema);
1055
0
    default:
1056
0
      throw InternalException("TIMESTAMP_NS requires type info");
1057
0
    }
1058
0
  case LogicalTypeId::DATE:
1059
0
    return make_uniq<CallbackColumnReader<int32_t, date_t, ParquetIntToDate>>(reader, schema);
1060
0
  case LogicalTypeId::TIME:
1061
0
    switch (schema.type_info) {
1062
0
    case ParquetExtraTypeInfo::UNIT_MS:
1063
0
      return make_uniq<CallbackColumnReader<int32_t, dtime_t, ParquetMsIntToTime>>(reader, schema);
1064
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
1065
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_t, ParquetIntToTime>>(reader, schema);
1066
0
    case ParquetExtraTypeInfo::UNIT_NS:
1067
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_t, ParquetNsIntToTime>>(reader, schema);
1068
0
    default:
1069
0
      throw InternalException("TIME requires type info");
1070
0
    }
1071
0
  case LogicalTypeId::TIME_NS:
1072
0
    switch (schema.type_info) {
1073
0
    case ParquetExtraTypeInfo::UNIT_MS:
1074
0
      return make_uniq<CallbackColumnReader<int32_t, dtime_ns_t, ParquetMsIntToTimeNs>>(reader, schema);
1075
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
1076
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_ns_t, ParquetUsIntToTimeNs>>(reader, schema);
1077
0
    case ParquetExtraTypeInfo::UNIT_NS:
1078
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_ns_t, ParquetIntToTimeNs>>(reader, schema);
1079
0
    default:
1080
0
      throw InternalException("TIME requires type info");
1081
0
    }
1082
0
  case LogicalTypeId::TIME_TZ:
1083
0
    switch (schema.type_info) {
1084
0
    case ParquetExtraTypeInfo::UNIT_MS:
1085
0
      return make_uniq<CallbackColumnReader<int32_t, dtime_tz_t, ParquetIntToTimeMsTZ>>(reader, schema);
1086
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
1087
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_tz_t, ParquetIntToTimeTZ>>(reader, schema);
1088
0
    case ParquetExtraTypeInfo::UNIT_NS:
1089
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_tz_t, ParquetIntToTimeNsTZ>>(reader, schema);
1090
0
    default:
1091
0
      throw InternalException("TIME_TZ requires type info");
1092
0
    }
1093
0
  case LogicalTypeId::BLOB:
1094
0
  case LogicalTypeId::VARCHAR:
1095
0
    return make_uniq<StringColumnReader>(reader, schema);
1096
0
  case LogicalTypeId::DECIMAL:
1097
    // we have to figure out what kind of int we need
1098
0
    switch (schema.type_info) {
1099
0
    case ParquetExtraTypeInfo::DECIMAL_INT32:
1100
0
      return CreateDecimalReader<int32_t>(reader, schema);
1101
0
    case ParquetExtraTypeInfo::DECIMAL_INT64:
1102
0
      return CreateDecimalReader<int64_t>(reader, schema);
1103
0
    case ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY:
1104
0
      return ParquetDecimalUtils::CreateReader(reader, schema);
1105
0
    default:
1106
0
      throw NotImplementedException("Unrecognized Parquet type for Decimal");
1107
0
    }
1108
0
  case LogicalTypeId::UUID:
1109
0
    return make_uniq<UUIDColumnReader>(reader, schema);
1110
0
  case LogicalTypeId::INTERVAL:
1111
0
    return make_uniq<IntervalColumnReader>(reader, schema);
1112
0
  case LogicalTypeId::SQLNULL:
1113
0
    return make_uniq<NullColumnReader>(reader, schema);
1114
0
  default:
1115
0
    break;
1116
0
  }
1117
0
  throw NotImplementedException(schema.type.ToString());
1118
0
}
1119
1120
} // namespace duckdb