Coverage Report

Created: 2026-03-31 07:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/extension/parquet/column_reader.cpp
Line
Count
Source
1
#include "column_reader.hpp"
2
3
#include "reader/boolean_column_reader.hpp"
4
#include "brotli/decode.h"
5
#include "reader/callback_column_reader.hpp"
6
#include "reader/decimal_column_reader.hpp"
7
#include "duckdb.hpp"
8
#include "reader/expression_column_reader.hpp"
9
#include "reader/interval_column_reader.hpp"
10
#include "reader/list_column_reader.hpp"
11
#include "lz4.hpp"
12
#include "miniz_wrapper.hpp"
13
#include "reader/null_column_reader.hpp"
14
#include "parquet_reader.hpp"
15
#include "parquet_timestamp.hpp"
16
#include "parquet_float16.hpp"
17
18
#include "reader/row_number_column_reader.hpp"
19
#include "snappy.h"
20
#include "reader/string_column_reader.hpp"
21
#include "reader/struct_column_reader.hpp"
22
#include "reader/templated_column_reader.hpp"
23
#include "reader/uuid_column_reader.hpp"
24
25
#include "zstd.h"
26
27
#include "duckdb/common/helper.hpp"
28
#include "duckdb/common/numeric_utils.hpp"
29
#include "duckdb/common/types/bit.hpp"
30
#include "duckdb/storage/table/column_segment.hpp"
31
32
#include "parquet_crypto.hpp"
33
34
namespace duckdb {
35
36
using duckdb_parquet::CompressionCodec;
37
using duckdb_parquet::ConvertedType;
38
using duckdb_parquet::Encoding;
39
using duckdb_parquet::PageType;
40
using duckdb_parquet::Type;
41
42
const uint64_t ParquetDecodeUtils::BITPACK_MASKS[] = {0,
43
                                                      1,
44
                                                      3,
45
                                                      7,
46
                                                      15,
47
                                                      31,
48
                                                      63,
49
                                                      127,
50
                                                      255,
51
                                                      511,
52
                                                      1023,
53
                                                      2047,
54
                                                      4095,
55
                                                      8191,
56
                                                      16383,
57
                                                      32767,
58
                                                      65535,
59
                                                      131071,
60
                                                      262143,
61
                                                      524287,
62
                                                      1048575,
63
                                                      2097151,
64
                                                      4194303,
65
                                                      8388607,
66
                                                      16777215,
67
                                                      33554431,
68
                                                      67108863,
69
                                                      134217727,
70
                                                      268435455,
71
                                                      536870911,
72
                                                      1073741823,
73
                                                      2147483647,
74
                                                      4294967295,
75
                                                      8589934591,
76
                                                      17179869183,
77
                                                      34359738367,
78
                                                      68719476735,
79
                                                      137438953471,
80
                                                      274877906943,
81
                                                      549755813887,
82
                                                      1099511627775,
83
                                                      2199023255551,
84
                                                      4398046511103,
85
                                                      8796093022207,
86
                                                      17592186044415,
87
                                                      35184372088831,
88
                                                      70368744177663,
89
                                                      140737488355327,
90
                                                      281474976710655,
91
                                                      562949953421311,
92
                                                      1125899906842623,
93
                                                      2251799813685247,
94
                                                      4503599627370495,
95
                                                      9007199254740991,
96
                                                      18014398509481983,
97
                                                      36028797018963967,
98
                                                      72057594037927935,
99
                                                      144115188075855871,
100
                                                      288230376151711743,
101
                                                      576460752303423487,
102
                                                      1152921504606846975,
103
                                                      2305843009213693951,
104
                                                      4611686018427387903,
105
                                                      9223372036854775807,
106
                                                      18446744073709551615ULL};
107
108
const uint64_t ParquetDecodeUtils::BITPACK_MASKS_SIZE = sizeof(ParquetDecodeUtils::BITPACK_MASKS) / sizeof(uint64_t);
109
110
const uint8_t ParquetDecodeUtils::BITPACK_DLEN = 8;
111
112
ColumnReader::ColumnReader(const ParquetReader &reader, const ParquetColumnSchema &schema_p)
113
0
    : column_schema(schema_p), reader(reader), page_rows_available(0), dictionary_decoder(*this),
114
0
      delta_binary_packed_decoder(*this), rle_decoder(*this), delta_length_byte_array_decoder(*this),
115
0
      delta_byte_array_decoder(*this), byte_stream_split_decoder(*this), aad_crypto_metadata(reader.allocator) {
116
0
}
117
118
0
ColumnReader::~ColumnReader() {
119
0
}
120
121
0
Allocator &ColumnReader::GetAllocator() {
122
0
  return reader.allocator;
123
0
}
124
125
0
const ParquetReader &ColumnReader::Reader() {
126
0
  return reader;
127
0
}
128
129
0
void ColumnReader::RegisterPrefetch(ThriftFileTransport &transport, bool allow_merge) {
130
0
  if (chunk) {
131
0
    uint64_t size = chunk->meta_data.total_compressed_size;
132
0
    transport.RegisterPrefetch(FileOffset(), size, allow_merge);
133
0
  }
134
0
}
135
136
0
unique_ptr<BaseStatistics> ColumnReader::Stats(idx_t row_group_idx_p, const vector<ColumnChunk> &columns) {
137
0
  return Schema().Stats(*reader.GetFileMetadata(), reader.parquet_options, row_group_idx_p, columns);
138
0
}
139
140
0
uint64_t ColumnReader::TotalCompressedSize() {
141
0
  if (!chunk) {
142
0
    return 0;
143
0
  }
144
145
0
  return chunk->meta_data.total_compressed_size;
146
0
}
147
148
// Note: It's not trivial to determine where all Column data is stored. Chunk->file_offset
149
// apparently is not the first page of the data. Therefore we determine the address of the first page by taking the
150
// minimum of all page offsets.
151
0
idx_t ColumnReader::FileOffset() const {
152
0
  if (!chunk) {
153
0
    throw std::runtime_error("FileOffset called on ColumnReader with no chunk");
154
0
  }
155
0
  auto min_offset = NumericLimits<idx_t>::Maximum();
156
0
  if (chunk->meta_data.__isset.dictionary_page_offset) {
157
0
    min_offset = MinValue<idx_t>(min_offset, chunk->meta_data.dictionary_page_offset);
158
0
  }
159
0
  if (chunk->meta_data.__isset.index_page_offset) {
160
0
    min_offset = MinValue<idx_t>(min_offset, chunk->meta_data.index_page_offset);
161
0
  }
162
0
  min_offset = MinValue<idx_t>(min_offset, chunk->meta_data.data_page_offset);
163
164
0
  return min_offset;
165
0
}
166
167
0
idx_t ColumnReader::GroupRowsAvailable() {
168
0
  return group_rows_available;
169
0
}
170
171
0
void ColumnReader::PlainSkip(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values) {
172
0
  throw NotImplementedException("PlainSkip not implemented");
173
0
}
174
175
void ColumnReader::Plain(ByteBuffer &plain_data, uint8_t *defines, idx_t num_values, // NOLINT
176
0
                         idx_t result_offset, Vector &result) {
177
0
  throw NotImplementedException("Plain not implemented");
178
0
}
179
180
void ColumnReader::Plain(shared_ptr<ResizeableBuffer> &plain_data, uint8_t *defines, idx_t num_values,
181
0
                         idx_t result_offset, Vector &result) {
182
0
  Plain(*plain_data, defines, num_values, result_offset, result);
183
0
}
184
185
void ColumnReader::PlainSelect(shared_ptr<ResizeableBuffer> &plain_data, uint8_t *defines, idx_t num_values,
186
0
                               Vector &result, const SelectionVector &sel, idx_t count) {
187
0
  throw NotImplementedException("PlainSelect not implemented");
188
0
}
189
190
0
void ColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChunk> &columns, TProtocol &protocol_p) {
191
0
  D_ASSERT(ColumnIndex() < columns.size());
192
0
  chunk = &columns[ColumnIndex()];
193
0
  protocol = &protocol_p;
194
0
  D_ASSERT(chunk);
195
0
  D_ASSERT(chunk->__isset.meta_data);
196
197
0
  if (chunk->__isset.file_path) {
198
0
    throw InvalidInputException("Failed to read file \"%s\": Only inlined data files are supported (no references)",
199
0
                                Reader().GetFileName());
200
0
  }
201
202
  // ugh. sometimes there is an extra offset for the dict. sometimes it's wrong.
203
0
  chunk_read_offset = chunk->meta_data.data_page_offset;
204
0
  if (chunk->meta_data.__isset.dictionary_page_offset && chunk->meta_data.dictionary_page_offset >= 4) {
205
    // this assumes the data pages follow the dict pages directly.
206
0
    chunk_read_offset = chunk->meta_data.dictionary_page_offset;
207
0
  }
208
0
  group_rows_available = chunk->meta_data.num_values;
209
0
}
210
211
0
bool ColumnReader::PageIsFilteredOut(PageHeader &page_hdr) {
212
0
  if (!dictionary_decoder.HasFilteredOutAllValues()) {
213
0
    return false;
214
0
  }
215
0
  if (page_hdr.type != PageType::DATA_PAGE && page_hdr.type != PageType::DATA_PAGE_V2) {
216
    // we can only filter out data pages
217
0
    return false;
218
0
  }
219
0
  bool is_v1 = page_hdr.type == PageType::DATA_PAGE;
220
0
  auto &v1_header = page_hdr.data_page_header;
221
0
  auto &v2_header = page_hdr.data_page_header_v2;
222
0
  auto page_encoding = is_v1 ? v1_header.encoding : v2_header.encoding;
223
0
  if (page_encoding != Encoding::PLAIN_DICTIONARY && page_encoding != Encoding::RLE_DICTIONARY) {
224
    // not a dictionary page
225
0
    return false;
226
0
  }
227
  // the page has been filtered out!
228
  // skip forward
229
0
  auto &trans = reinterpret_cast<ThriftFileTransport &>(*protocol->getTransport());
230
0
  trans.Skip(page_hdr.compressed_page_size);
231
232
0
  page_rows_available = is_v1 ? v1_header.num_values : v2_header.num_values;
233
0
  encoding = ColumnEncoding::DICTIONARY;
234
0
  page_is_filtered_out = true;
235
0
  return true;
236
0
}
237
238
0
void ColumnReader::ReadEncrypted(duckdb_apache::thrift::TBase &object) {
239
0
  aad_crypto_metadata.module = ParquetCrypto::GetModuleHeader(*chunk, aad_crypto_metadata.page_ordinal);
240
0
  aad_crypto_metadata.page_ordinal =
241
0
      ParquetCrypto::GetFinalPageOrdinal(*chunk, aad_crypto_metadata.module, aad_crypto_metadata.page_ordinal);
242
0
  reader.ReadEncrypted(object, *protocol, aad_crypto_metadata);
243
0
}
244
245
0
void ColumnReader::ReadDataEncrypted(const data_ptr_t buffer, const uint32_t buffer_size, PageType::type page_type) {
246
0
  aad_crypto_metadata.module = ParquetCrypto::GetModule(*chunk, page_type, aad_crypto_metadata.page_ordinal);
247
0
  aad_crypto_metadata.page_ordinal =
248
0
      ParquetCrypto::GetFinalPageOrdinal(*chunk, aad_crypto_metadata.module, aad_crypto_metadata.page_ordinal);
249
0
  reader.ReadDataEncrypted(*protocol, buffer, buffer_size, aad_crypto_metadata);
250
0
}
251
252
0
void ColumnReader::Read(PageHeader &page_hdr) {
253
0
  if (reader.parquet_options.encryption_config) {
254
0
    ReadEncrypted(page_hdr);
255
0
  } else {
256
0
    reader.Read(page_hdr, *protocol);
257
0
  }
258
0
}
259
260
0
void ColumnReader::ReadData(const data_ptr_t buffer, const uint32_t buffer_size, PageType::type page_type) {
261
0
  if (reader.parquet_options.encryption_config) {
262
0
    ReadDataEncrypted(buffer, buffer_size, page_type);
263
0
  } else {
264
0
    reader.ReadData(*protocol, buffer, buffer_size);
265
0
  }
266
0
}
267
268
0
void ColumnReader::PrepareRead(optional_ptr<const TableFilter> filter, optional_ptr<TableFilterState> filter_state) {
269
0
  encoding = ColumnEncoding::INVALID;
270
0
  defined_decoder.reset();
271
0
  page_is_filtered_out = false;
272
0
  block.reset();
273
0
  PageHeader page_hdr;
274
0
  auto &trans = reinterpret_cast<ThriftFileTransport &>(*protocol->getTransport());
275
276
0
  if (trans.HasPrefetch()) {
277
    // Already has some data prefetched, let's not mess with it
278
0
    Read(page_hdr);
279
0
  } else {
280
    // No prefetch yet, prefetch the full header in one go (so thrift won't read byte-by-byte from storage)
281
    // 256 bytes should cover almost all headers (unless it's a V2 header with really LONG string statistics)
282
0
    static constexpr idx_t ASSUMED_HEADER_SIZE = 256;
283
0
    const auto prefetch_size = MinValue(trans.GetSize() - trans.GetLocation(), ASSUMED_HEADER_SIZE);
284
0
    trans.Prefetch(trans.GetLocation(), prefetch_size);
285
0
    Read(page_hdr);
286
0
    trans.ClearPrefetch();
287
0
  }
288
  // some basic sanity check
289
0
  if (page_hdr.compressed_page_size < 0 || page_hdr.uncompressed_page_size < 0) {
290
0
    throw InvalidInputException("Failed to read file \"%s\": Page sizes can't be < 0", Reader().GetFileName());
291
0
  }
292
293
0
  if (PageIsFilteredOut(page_hdr)) {
294
    // this page has been filtered out so we don't need to read it
295
0
    return;
296
0
  }
297
298
0
  switch (page_hdr.type) {
299
0
  case PageType::DATA_PAGE_V2:
300
0
    PreparePageV2(page_hdr);
301
0
    PrepareDataPage(page_hdr);
302
0
    break;
303
0
  case PageType::DATA_PAGE:
304
0
    PreparePage(page_hdr);
305
0
    PrepareDataPage(page_hdr);
306
0
    break;
307
0
  case PageType::DICTIONARY_PAGE: {
308
0
    PreparePage(page_hdr);
309
0
    auto dictionary_size = page_hdr.dictionary_page_header.num_values;
310
0
    if (dictionary_size < 0) {
311
0
      throw InvalidInputException("Failed to read file \"%s\": Invalid dictionary page header (num_values < 0)",
312
0
                                  Reader().GetFileName());
313
0
    }
314
0
    dictionary_decoder.InitializeDictionary(dictionary_size, filter, filter_state, HasDefines());
315
0
    break;
316
0
  }
317
0
  default:
318
0
    break; // ignore INDEX page type and any other custom extensions
319
0
  }
320
0
  ResetPage();
321
0
}
322
323
0
void ColumnReader::ResetPage() {
324
0
}
325
326
0
void ColumnReader::PreparePageV2(PageHeader &page_hdr) {
327
0
  D_ASSERT(page_hdr.type == PageType::DATA_PAGE_V2);
328
329
0
  AllocateBlock(page_hdr.uncompressed_page_size + 1);
330
0
  bool uncompressed = false;
331
0
  if (page_hdr.data_page_header_v2.__isset.is_compressed && !page_hdr.data_page_header_v2.is_compressed) {
332
0
    uncompressed = true;
333
0
  }
334
0
  if (chunk->meta_data.codec == CompressionCodec::UNCOMPRESSED) {
335
0
    if (page_hdr.compressed_page_size != page_hdr.uncompressed_page_size) {
336
0
      throw InvalidInputException("Failed to read file \"%s\": Page size mismatch", Reader().GetFileName());
337
0
    }
338
0
    uncompressed = true;
339
0
  }
340
0
  if (uncompressed) {
341
0
    ReadData(block->ptr, page_hdr.compressed_page_size, page_hdr.type);
342
0
    return;
343
0
  }
344
345
  // copy repeats & defines as-is because FOR SOME REASON they are uncompressed
346
0
  auto uncompressed_bytes = page_hdr.data_page_header_v2.repetition_levels_byte_length +
347
0
                            page_hdr.data_page_header_v2.definition_levels_byte_length;
348
0
  if (uncompressed_bytes > page_hdr.uncompressed_page_size) {
349
0
    throw InvalidInputException(
350
0
        "Failed to read file \"%s\": header inconsistency, uncompressed_page_size needs to be larger than "
351
0
        "repetition_levels_byte_length + definition_levels_byte_length",
352
0
        Reader().GetFileName());
353
0
  }
354
355
0
  ReadData(block->ptr, uncompressed_bytes, page_hdr.type);
356
357
0
  auto compressed_bytes = page_hdr.compressed_page_size - uncompressed_bytes;
358
359
0
  if (compressed_bytes > 0) {
360
0
    ResizeableBuffer compressed_buffer;
361
0
    compressed_buffer.resize(GetAllocator(), compressed_bytes);
362
363
0
    ReadData(compressed_buffer.ptr, compressed_bytes, page_hdr.type);
364
365
0
    DecompressInternal(chunk->meta_data.codec, compressed_buffer.ptr, compressed_bytes,
366
0
                       block->ptr + uncompressed_bytes, page_hdr.uncompressed_page_size - uncompressed_bytes);
367
0
  }
368
0
}
369
370
0
void ColumnReader::AllocateBlock(idx_t size) {
371
0
  if (!block) {
372
0
    block = make_shared_ptr<ResizeableBuffer>(GetAllocator(), size);
373
0
  } else {
374
0
    block->resize(GetAllocator(), size);
375
0
  }
376
0
}
377
378
0
void ColumnReader::PreparePage(PageHeader &page_hdr) {
379
0
  AllocateBlock(page_hdr.uncompressed_page_size + 1);
380
0
  uint32_t compressed_page_size = page_hdr.compressed_page_size;
381
382
0
  if (chunk->__isset.crypto_metadata) {
383
0
    auto const file_aad = reader.GetUniqueFileIdentifier(reader.metadata->crypto_metadata->encryption_algorithm);
384
0
    if (!file_aad.empty()) {
385
      // If there is a file aad (identifier), this means that the Encrypted file is written by Arrow
386
      // Arrow adds the bytes for encryption (len + nonce + tag)
387
      // to the compressed page size
388
0
      compressed_page_size -=
389
0
          (ParquetCrypto::LENGTH_BYTES + ParquetCrypto::NONCE_BYTES + ParquetCrypto::TAG_BYTES);
390
0
    }
391
0
  }
392
393
0
  if (chunk->meta_data.codec == CompressionCodec::UNCOMPRESSED) {
394
0
    if (compressed_page_size != NumericCast<uint32_t>(page_hdr.uncompressed_page_size)) {
395
0
      throw std::runtime_error("Page size mismatch");
396
0
    }
397
0
    ReadData(block->ptr, compressed_page_size, page_hdr.type);
398
0
    return;
399
0
  }
400
401
0
  ResizeableBuffer compressed_buffer;
402
0
  compressed_buffer.resize(GetAllocator(), compressed_page_size + 1);
403
0
  ReadData(compressed_buffer.ptr, compressed_page_size, page_hdr.type);
404
405
0
  DecompressInternal(chunk->meta_data.codec, compressed_buffer.ptr, compressed_page_size, block->ptr,
406
0
                     page_hdr.uncompressed_page_size);
407
0
}
408
409
void ColumnReader::DecompressInternal(CompressionCodec::type codec, const_data_ptr_t src, idx_t src_size,
410
0
                                      data_ptr_t dst, idx_t dst_size) {
411
0
  switch (codec) {
412
0
  case CompressionCodec::UNCOMPRESSED:
413
0
    throw InternalException("Parquet data unexpectedly uncompressed");
414
0
  case CompressionCodec::GZIP: {
415
0
    MiniZStream s;
416
0
    s.Decompress(const_char_ptr_cast(src), src_size, char_ptr_cast(dst), dst_size);
417
0
    break;
418
0
  }
419
0
  case CompressionCodec::LZ4_RAW: {
420
0
    auto res =
421
0
        duckdb_lz4::LZ4_decompress_safe(const_char_ptr_cast(src), char_ptr_cast(dst),
422
0
                                        UnsafeNumericCast<int32_t>(src_size), UnsafeNumericCast<int32_t>(dst_size));
423
0
    if (res != NumericCast<int>(dst_size)) {
424
0
      throw InvalidInputException("Failed to read file \"%s\": LZ4 decompression failure",
425
0
                                  Reader().GetFileName());
426
0
    }
427
0
    break;
428
0
  }
429
0
  case CompressionCodec::SNAPPY: {
430
0
    {
431
0
      size_t uncompressed_size = 0;
432
0
      auto res = duckdb_snappy::GetUncompressedLength(const_char_ptr_cast(src), src_size, &uncompressed_size);
433
0
      if (!res) {
434
0
        throw InvalidInputException("Failed to read file \"%s\": Snappy decompression failure",
435
0
                                    Reader().GetFileName());
436
0
      }
437
0
      if (uncompressed_size != dst_size) {
438
0
        throw InvalidInputException(
439
0
            "Failed to read file \"%s\": Snappy decompression failure: Uncompressed data size mismatch",
440
0
            Reader().GetFileName());
441
0
      }
442
0
    }
443
0
    auto res = duckdb_snappy::RawUncompress(const_char_ptr_cast(src), src_size, char_ptr_cast(dst));
444
0
    if (!res) {
445
0
      throw InvalidInputException("Failed to read file \"%s\": Snappy decompression failure",
446
0
                                  Reader().GetFileName());
447
0
    }
448
0
    break;
449
0
  }
450
0
  case CompressionCodec::ZSTD: {
451
0
    auto res = duckdb_zstd::ZSTD_decompress(dst, dst_size, src, src_size);
452
0
    if (duckdb_zstd::ZSTD_isError(res) || res != dst_size) {
453
0
      throw InvalidInputException("Failed to read file \"%s\": ZSTD Decompression failure",
454
0
                                  Reader().GetFileName());
455
0
    }
456
0
    break;
457
0
  }
458
0
  case CompressionCodec::BROTLI: {
459
0
    auto state = duckdb_brotli::BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
460
0
    size_t total_out = 0;
461
0
    auto src_size_size_t = NumericCast<size_t>(src_size);
462
0
    auto dst_size_size_t = NumericCast<size_t>(dst_size);
463
464
0
    auto res = duckdb_brotli::BrotliDecoderDecompressStream(state, &src_size_size_t, &src, &dst_size_size_t, &dst,
465
0
                                                            &total_out);
466
0
    if (res != duckdb_brotli::BROTLI_DECODER_RESULT_SUCCESS) {
467
0
      throw InvalidInputException("Failed to read file \"%s\": Brotli Decompression failure",
468
0
                                  Reader().GetFileName());
469
0
    }
470
0
    duckdb_brotli::BrotliDecoderDestroyInstance(state);
471
0
    break;
472
0
  }
473
474
0
  default: {
475
0
    duckdb::stringstream codec_name;
476
0
    codec_name << codec;
477
0
    throw InvalidInputException("Failed to read file \"%s\": Unsupported compression codec \"%s\". Supported "
478
0
                                "options are uncompressed, brotli, gzip, lz4_raw, snappy or zstd",
479
0
                                Reader().GetFileName(), codec_name.str());
480
0
  }
481
0
  }
482
0
}
483
484
0
void ColumnReader::PrepareDataPage(PageHeader &page_hdr) {
485
0
  if (page_hdr.type == PageType::DATA_PAGE && !page_hdr.__isset.data_page_header) {
486
0
    throw InvalidInputException("Failed to read file \"%s\": Missing data page header from data page",
487
0
                                Reader().GetFileName());
488
0
  }
489
0
  if (page_hdr.type == PageType::DATA_PAGE_V2 && !page_hdr.__isset.data_page_header_v2) {
490
0
    throw InvalidInputException("Failed to read file \"%s\": Missing data page header from data page v2",
491
0
                                Reader().GetFileName());
492
0
  }
493
494
0
  bool is_v1 = page_hdr.type == PageType::DATA_PAGE;
495
0
  bool is_v2 = page_hdr.type == PageType::DATA_PAGE_V2;
496
0
  auto &v1_header = page_hdr.data_page_header;
497
0
  auto &v2_header = page_hdr.data_page_header_v2;
498
499
0
  page_rows_available = is_v1 ? v1_header.num_values : v2_header.num_values;
500
0
  auto page_encoding = is_v1 ? v1_header.encoding : v2_header.encoding;
501
502
0
  if (HasRepeats()) {
503
0
    uint32_t rep_length = is_v1 ? block->read<uint32_t>() : v2_header.repetition_levels_byte_length;
504
0
    block->available(rep_length);
505
0
    repeated_decoder =
506
0
        make_uniq<RleBpDecoder>(block->ptr, rep_length, RleBpDecoder::ComputeBitWidthFromMaxValue(MaxRepeat()));
507
0
    block->inc(rep_length);
508
0
  } else if (is_v2 && v2_header.repetition_levels_byte_length > 0) {
509
0
    block->inc(v2_header.repetition_levels_byte_length);
510
0
  }
511
512
0
  if (HasDefines()) {
513
0
    uint32_t def_length = is_v1 ? block->read<uint32_t>() : v2_header.definition_levels_byte_length;
514
0
    block->available(def_length);
515
0
    defined_decoder =
516
0
        make_uniq<RleBpDecoder>(block->ptr, def_length, RleBpDecoder::ComputeBitWidthFromMaxValue(MaxDefine()));
517
0
    block->inc(def_length);
518
0
  } else if (is_v2 && v2_header.definition_levels_byte_length > 0) {
519
0
    block->inc(v2_header.definition_levels_byte_length);
520
0
  }
521
522
0
  switch (page_encoding) {
523
0
  case Encoding::RLE_DICTIONARY:
524
0
  case Encoding::PLAIN_DICTIONARY: {
525
0
    encoding = ColumnEncoding::DICTIONARY;
526
0
    dictionary_decoder.InitializePage();
527
0
    break;
528
0
  }
529
0
  case Encoding::RLE: {
530
0
    encoding = ColumnEncoding::RLE;
531
0
    rle_decoder.InitializePage();
532
0
    break;
533
0
  }
534
0
  case Encoding::DELTA_BINARY_PACKED: {
535
0
    encoding = ColumnEncoding::DELTA_BINARY_PACKED;
536
0
    delta_binary_packed_decoder.InitializePage();
537
0
    break;
538
0
  }
539
0
  case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
540
0
    encoding = ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY;
541
0
    delta_length_byte_array_decoder.InitializePage();
542
0
    break;
543
0
  }
544
0
  case Encoding::DELTA_BYTE_ARRAY: {
545
0
    encoding = ColumnEncoding::DELTA_BYTE_ARRAY;
546
0
    delta_byte_array_decoder.InitializePage();
547
0
    break;
548
0
  }
549
0
  case Encoding::BYTE_STREAM_SPLIT: {
550
0
    encoding = ColumnEncoding::BYTE_STREAM_SPLIT;
551
0
    byte_stream_split_decoder.InitializePage();
552
0
    break;
553
0
  }
554
0
  case Encoding::PLAIN:
555
    // nothing to do here, will be read directly below
556
0
    encoding = ColumnEncoding::PLAIN;
557
0
    break;
558
559
0
  default:
560
0
    throw InvalidInputException("Failed to read file \"%s\": Unsupported page encoding", Reader().GetFileName());
561
0
  }
562
0
}
563
564
0
void ColumnReader::BeginRead(data_ptr_t define_out, data_ptr_t repeat_out) {
565
  // we need to reset the location because multiple column readers share the same protocol
566
0
  auto &trans = reinterpret_cast<ThriftFileTransport &>(*protocol->getTransport());
567
0
  trans.SetLocation(chunk_read_offset);
568
569
  // Perform any skips that were not applied yet.
570
0
  if (define_out && repeat_out) {
571
0
    ApplyPendingSkips(define_out, repeat_out);
572
0
  }
573
0
}
574
575
idx_t ColumnReader::ReadPageHeaders(idx_t max_read, optional_ptr<const TableFilter> filter,
576
0
                                    optional_ptr<TableFilterState> filter_state) {
577
0
  int8_t page_ordinal = 0;
578
0
  while (page_rows_available == 0) {
579
0
    aad_crypto_metadata.page_ordinal = page_ordinal;
580
0
    PrepareRead(filter, filter_state);
581
0
    page_ordinal++;
582
0
  }
583
0
  return MinValue<idx_t>(MinValue<idx_t>(max_read, page_rows_available), STANDARD_VECTOR_SIZE);
584
0
}
585
586
0
bool ColumnReader::PrepareRead(idx_t read_now, data_ptr_t define_out, data_ptr_t repeat_out, idx_t result_offset) {
587
0
  D_ASSERT(block);
588
589
0
  D_ASSERT(read_now + result_offset <= STANDARD_VECTOR_SIZE);
590
0
  D_ASSERT(!page_is_filtered_out);
591
592
0
  if (HasRepeats()) {
593
0
    D_ASSERT(repeated_decoder);
594
0
    repeated_decoder->GetBatch<uint8_t>(repeat_out + result_offset, read_now);
595
0
  }
596
597
0
  if (HasDefines()) {
598
0
    D_ASSERT(defined_decoder);
599
0
    const auto max_define = NumericCast<uint8_t>(MaxDefine());
600
0
    if (!HasRepeats() && defined_decoder->HasRepeatedBatch<uint8_t>(read_now, max_define)) {
601
      // Fast path: no repeats and all valid
602
0
      defined_decoder->GetRepeatedBatch<uint8_t>(read_now, max_define);
603
0
      return true;
604
0
    }
605
0
    defined_decoder->GetBatch<uint8_t>(define_out + result_offset, read_now);
606
0
    return false;
607
0
  }
608
609
0
  return true; // No defines, so everything is valid
610
0
}
611
612
void ColumnReader::ReadData(idx_t read_now, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result,
613
0
                            idx_t result_offset) {
614
  // flatten the result vector if required
615
0
  if (result_offset != 0 && result.GetVectorType() != VectorType::FLAT_VECTOR) {
616
0
    result.Flatten(result_offset);
617
0
    result.Resize(result_offset, STANDARD_VECTOR_SIZE);
618
0
  }
619
0
  if (page_is_filtered_out) {
620
    // page is filtered out - emit NULL for any rows
621
0
    auto &validity = FlatVector::Validity(result);
622
0
    for (idx_t i = 0; i < read_now; i++) {
623
0
      validity.SetInvalid(result_offset + i);
624
0
    }
625
0
    page_rows_available -= read_now;
626
0
    return;
627
0
  }
628
  // read the defines/repeats
629
0
  const auto all_valid = PrepareRead(read_now, define_out, repeat_out, result_offset);
630
  // read the data according to the encoder
631
0
  const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(define_out);
632
0
  switch (encoding) {
633
0
  case ColumnEncoding::DICTIONARY:
634
0
    dictionary_decoder.Read(define_ptr, read_now, result, result_offset);
635
0
    break;
636
0
  case ColumnEncoding::DELTA_BINARY_PACKED:
637
0
    delta_binary_packed_decoder.Read(define_ptr, read_now, result, result_offset);
638
0
    break;
639
0
  case ColumnEncoding::RLE:
640
0
    rle_decoder.Read(define_ptr, read_now, result, result_offset);
641
0
    break;
642
0
  case ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY:
643
0
    delta_length_byte_array_decoder.Read(block, define_ptr, read_now, result, result_offset);
644
0
    break;
645
0
  case ColumnEncoding::DELTA_BYTE_ARRAY:
646
0
    delta_byte_array_decoder.Read(define_ptr, read_now, result, result_offset);
647
0
    break;
648
0
  case ColumnEncoding::BYTE_STREAM_SPLIT:
649
0
    byte_stream_split_decoder.Read(define_ptr, read_now, result, result_offset);
650
0
    break;
651
0
  default:
652
0
    Plain(block, define_ptr, read_now, result_offset, result);
653
0
    break;
654
0
  }
655
0
  page_rows_available -= read_now;
656
0
}
657
658
0
void ColumnReader::FinishRead(idx_t read_count) {
659
0
  auto &trans = reinterpret_cast<ThriftFileTransport &>(*protocol->getTransport());
660
0
  chunk_read_offset = trans.GetLocation();
661
662
0
  group_rows_available -= read_count;
663
0
}
664
665
0
idx_t ColumnReader::ReadInternal(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result) {
666
0
  idx_t result_offset = 0;
667
0
  auto to_read = num_values;
668
0
  D_ASSERT(to_read <= STANDARD_VECTOR_SIZE);
669
670
0
  while (to_read > 0) {
671
0
    auto read_now = ReadPageHeaders(to_read);
672
673
0
    ReadData(read_now, define_out, repeat_out, result, result_offset);
674
675
0
    result_offset += read_now;
676
0
    to_read -= read_now;
677
0
  }
678
0
  FinishRead(num_values);
679
680
0
  return num_values;
681
0
}
682
683
0
idx_t ColumnReader::Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result) {
684
0
  BeginRead(define_out, repeat_out);
685
0
  return ReadInternal(num_values, define_out, repeat_out, result);
686
0
}
687
688
void ColumnReader::Select(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result_out,
689
0
                          const SelectionVector &sel, idx_t approved_tuple_count) {
690
0
  if (SupportsDirectSelect() && approved_tuple_count < num_values) {
691
0
    DirectSelect(num_values, define_out, repeat_out, result_out, sel, approved_tuple_count);
692
0
    return;
693
0
  }
694
0
  Read(num_values, define_out, repeat_out, result_out);
695
0
}
696
697
void ColumnReader::DirectSelect(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result,
698
0
                                const SelectionVector &sel, idx_t approved_tuple_count) {
699
0
  auto to_read = num_values;
700
701
  // prepare the first read if we haven't yet
702
0
  BeginRead(define_out, repeat_out);
703
0
  auto read_now = ReadPageHeaders(num_values);
704
705
  // we can only push the filter into the decoder if we are reading the ENTIRE vector in one go
706
0
  if (read_now == to_read && encoding == ColumnEncoding::PLAIN) {
707
0
    const auto all_valid = PrepareRead(read_now, define_out, repeat_out, 0);
708
0
    const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(define_out);
709
0
    PlainSelect(block, define_ptr, read_now, result, sel, approved_tuple_count);
710
711
0
    page_rows_available -= read_now;
712
0
    FinishRead(num_values);
713
0
    return;
714
0
  }
715
  // fallback to regular read + filter
716
0
  ReadInternal(num_values, define_out, repeat_out, result);
717
0
}
718
719
void ColumnReader::Filter(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result,
720
                          const TableFilter &filter, TableFilterState &filter_state, SelectionVector &sel,
721
0
                          idx_t &approved_tuple_count, bool is_first_filter) {
722
0
  if (SupportsDirectFilter() && is_first_filter) {
723
0
    DirectFilter(num_values, define_out, repeat_out, result, filter, filter_state, sel, approved_tuple_count);
724
0
    return;
725
0
  }
726
0
  Select(num_values, define_out, repeat_out, result, sel, approved_tuple_count);
727
0
  ApplyFilter(result, filter, filter_state, num_values, sel, approved_tuple_count);
728
0
}
729
730
void ColumnReader::DirectFilter(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result,
731
                                const TableFilter &filter, TableFilterState &filter_state, SelectionVector &sel,
732
0
                                idx_t &approved_tuple_count) {
733
0
  auto to_read = num_values;
734
735
  // prepare the first read if we haven't yet
736
0
  BeginRead(define_out, repeat_out);
737
0
  auto read_now = ReadPageHeaders(num_values, &filter, &filter_state);
738
739
  // we can only push the filter into the decoder if we are reading the ENTIRE vector in one go
740
0
  if (encoding == ColumnEncoding::DICTIONARY && read_now == to_read && dictionary_decoder.HasFilter()) {
741
0
    if (page_is_filtered_out) {
742
      // the page has been filtered out entirely - skip
743
0
      approved_tuple_count = 0;
744
0
    } else {
745
      // Push filter into dictionary directly
746
      // read the defines/repeats
747
0
      const auto all_valid = PrepareRead(read_now, define_out, repeat_out, 0);
748
0
      const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(define_out);
749
0
      dictionary_decoder.Filter(define_ptr, read_now, result, sel, approved_tuple_count);
750
0
    }
751
0
    page_rows_available -= read_now;
752
0
    FinishRead(num_values);
753
0
    return;
754
0
  }
755
  // fallback to regular read + filter
756
0
  ReadInternal(num_values, define_out, repeat_out, result);
757
0
  ApplyFilter(result, filter, filter_state, num_values, sel, approved_tuple_count);
758
0
}
759
760
void ColumnReader::ApplyFilter(Vector &v, const TableFilter &filter, TableFilterState &filter_state, idx_t scan_count,
761
0
                               SelectionVector &sel, idx_t &approved_tuple_count) {
762
0
  UnifiedVectorFormat vdata;
763
0
  v.ToUnifiedFormat(scan_count, vdata);
764
0
  ColumnSegment::FilterSelection(sel, v, vdata, filter, filter_state, scan_count, approved_tuple_count);
765
0
}
766
767
0
void ColumnReader::Skip(idx_t num_values) {
768
0
  pending_skips += num_values;
769
0
}
770
771
0
void ColumnReader::ApplyPendingSkips(data_ptr_t define_out, data_ptr_t repeat_out) {
772
0
  if (pending_skips == 0) {
773
0
    return;
774
0
  }
775
0
  idx_t num_values = pending_skips;
776
0
  pending_skips = 0;
777
778
0
  auto to_skip = num_values;
779
0
  data_t skip_defines[STANDARD_VECTOR_SIZE] = {};
780
0
  data_t skip_repeats[STANDARD_VECTOR_SIZE];
781
0
  data_ptr_t skip_define_out = HasDefines() ? skip_defines : define_out;
782
0
  data_ptr_t skip_repeat_out = HasRepeats() ? skip_repeats : repeat_out;
783
  // start reading but do not apply skips (we are skipping now)
784
0
  BeginRead(nullptr, nullptr);
785
786
0
  while (to_skip > 0) {
787
0
    auto skip_now = ReadPageHeaders(to_skip);
788
0
    if (page_is_filtered_out) {
789
      // the page has been filtered out entirely - skip
790
0
      page_rows_available -= skip_now;
791
0
      to_skip -= skip_now;
792
0
      continue;
793
0
    }
794
0
    const auto all_valid = PrepareRead(skip_now, skip_define_out, skip_repeat_out, 0);
795
796
0
    const auto define_ptr = all_valid ? nullptr : static_cast<uint8_t *>(skip_define_out);
797
0
    switch (encoding) {
798
0
    case ColumnEncoding::DICTIONARY:
799
0
      dictionary_decoder.Skip(define_ptr, skip_now);
800
0
      break;
801
0
    case ColumnEncoding::DELTA_BINARY_PACKED:
802
0
      delta_binary_packed_decoder.Skip(define_ptr, skip_now);
803
0
      break;
804
0
    case ColumnEncoding::RLE:
805
0
      rle_decoder.Skip(define_ptr, skip_now);
806
0
      break;
807
0
    case ColumnEncoding::DELTA_LENGTH_BYTE_ARRAY:
808
0
      delta_length_byte_array_decoder.Skip(define_ptr, skip_now);
809
0
      break;
810
0
    case ColumnEncoding::DELTA_BYTE_ARRAY:
811
0
      delta_byte_array_decoder.Skip(define_ptr, skip_now);
812
0
      break;
813
0
    case ColumnEncoding::BYTE_STREAM_SPLIT:
814
0
      byte_stream_split_decoder.Skip(define_ptr, skip_now);
815
0
      break;
816
0
    default:
817
0
      PlainSkip(*block, define_ptr, skip_now);
818
0
      break;
819
0
    }
820
0
    page_rows_available -= skip_now;
821
0
    to_skip -= skip_now;
822
0
  }
823
0
  FinishRead(num_values);
824
0
}
825
826
//===--------------------------------------------------------------------===//
827
// Create Column Reader
828
//===--------------------------------------------------------------------===//
829
template <class T>
830
0
static unique_ptr<ColumnReader> CreateDecimalReader(const ParquetReader &reader, const ParquetColumnSchema &schema) {
831
0
  switch (schema.type.InternalType()) {
832
0
  case PhysicalType::INT16:
833
0
    return make_uniq<TemplatedColumnReader<int16_t, TemplatedParquetValueConversion<T>>>(reader, schema);
834
0
  case PhysicalType::INT32:
835
0
    return make_uniq<TemplatedColumnReader<int32_t, TemplatedParquetValueConversion<T>>>(reader, schema);
836
0
  case PhysicalType::INT64:
837
0
    return make_uniq<TemplatedColumnReader<int64_t, TemplatedParquetValueConversion<T>>>(reader, schema);
838
0
  case PhysicalType::INT128:
839
0
    return make_uniq<TemplatedColumnReader<hugeint_t, TemplatedParquetValueConversion<T>>>(reader, schema);
840
0
  default:
841
0
    throw NotImplementedException("Unimplemented internal type for CreateDecimalReader");
842
0
  }
843
0
}
Unexecuted instantiation: column_reader.cpp:duckdb::unique_ptr<duckdb::ColumnReader, std::__1::default_delete<duckdb::ColumnReader>, true> duckdb::CreateDecimalReader<int>(duckdb::ParquetReader const&, duckdb::ParquetColumnSchema const&)
Unexecuted instantiation: column_reader.cpp:duckdb::unique_ptr<duckdb::ColumnReader, std::__1::default_delete<duckdb::ColumnReader>, true> duckdb::CreateDecimalReader<long>(duckdb::ParquetReader const&, duckdb::ParquetColumnSchema const&)
844
845
0
unique_ptr<ColumnReader> ColumnReader::CreateReader(const ParquetReader &reader, const ParquetColumnSchema &schema) {
846
0
  switch (schema.type.id()) {
847
0
  case LogicalTypeId::BOOLEAN:
848
0
    return make_uniq<BooleanColumnReader>(reader, schema);
849
0
  case LogicalTypeId::UTINYINT:
850
0
    return make_uniq<TemplatedColumnReader<uint8_t, TemplatedParquetValueConversion<uint32_t>>>(reader, schema);
851
0
  case LogicalTypeId::USMALLINT:
852
0
    return make_uniq<TemplatedColumnReader<uint16_t, TemplatedParquetValueConversion<uint32_t>>>(reader, schema);
853
0
  case LogicalTypeId::UINTEGER:
854
0
    return make_uniq<TemplatedColumnReader<uint32_t, TemplatedParquetValueConversion<uint32_t>>>(reader, schema);
855
0
  case LogicalTypeId::UBIGINT:
856
0
    return make_uniq<TemplatedColumnReader<uint64_t, TemplatedParquetValueConversion<uint64_t>>>(reader, schema);
857
0
  case LogicalTypeId::TINYINT:
858
0
    return make_uniq<TemplatedColumnReader<int8_t, TemplatedParquetValueConversion<int32_t>>>(reader, schema);
859
0
  case LogicalTypeId::SMALLINT:
860
0
    return make_uniq<TemplatedColumnReader<int16_t, TemplatedParquetValueConversion<int32_t>>>(reader, schema);
861
0
  case LogicalTypeId::INTEGER:
862
0
    return make_uniq<TemplatedColumnReader<int32_t, TemplatedParquetValueConversion<int32_t>>>(reader, schema);
863
0
  case LogicalTypeId::BIGINT:
864
0
    return make_uniq<TemplatedColumnReader<int64_t, TemplatedParquetValueConversion<int64_t>>>(reader, schema);
865
0
  case LogicalTypeId::FLOAT:
866
0
    if (schema.type_info == ParquetExtraTypeInfo::FLOAT16) {
867
0
      return make_uniq<CallbackColumnReader<uint16_t, float, Float16ToFloat32>>(reader, schema);
868
0
    }
869
0
    return make_uniq<TemplatedColumnReader<float, TemplatedParquetValueConversion<float>>>(reader, schema);
870
0
  case LogicalTypeId::DOUBLE:
871
0
    if (schema.type_info == ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY) {
872
0
      return ParquetDecimalUtils::CreateReader(reader, schema);
873
0
    }
874
0
    return make_uniq<TemplatedColumnReader<double, TemplatedParquetValueConversion<double>>>(reader, schema);
875
0
  case LogicalTypeId::TIMESTAMP:
876
0
  case LogicalTypeId::TIMESTAMP_TZ:
877
0
    switch (schema.type_info) {
878
0
    case ParquetExtraTypeInfo::IMPALA_TIMESTAMP:
879
0
      return make_uniq<CallbackColumnReader<Int96, timestamp_t, ImpalaTimestampToTimestamp>>(reader, schema);
880
0
    case ParquetExtraTypeInfo::UNIT_MS:
881
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_t, ParquetTimestampMsToTimestamp>>(reader, schema);
882
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
883
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_t, ParquetTimestampMicrosToTimestamp>>(reader,
884
0
                                                                                                      schema);
885
0
    case ParquetExtraTypeInfo::UNIT_NS:
886
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_t, ParquetTimestampNsToTimestamp>>(reader, schema);
887
0
    default:
888
0
      throw InternalException("TIMESTAMP requires type info");
889
0
    }
890
0
  case LogicalTypeId::TIMESTAMP_NS:
891
0
    switch (schema.type_info) {
892
0
    case ParquetExtraTypeInfo::IMPALA_TIMESTAMP:
893
0
      return make_uniq<CallbackColumnReader<Int96, timestamp_ns_t, ImpalaTimestampToTimestampNS>>(reader, schema);
894
0
    case ParquetExtraTypeInfo::UNIT_MS:
895
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_ns_t, ParquetTimestampMsToTimestampNs>>(reader,
896
0
                                                                                                       schema);
897
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
898
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_ns_t, ParquetTimestampUsToTimestampNs>>(reader,
899
0
                                                                                                       schema);
900
0
    case ParquetExtraTypeInfo::UNIT_NS:
901
0
      return make_uniq<CallbackColumnReader<int64_t, timestamp_ns_t, ParquetTimestampNsToTimestampNs>>(reader,
902
0
                                                                                                       schema);
903
0
    default:
904
0
      throw InternalException("TIMESTAMP_NS requires type info");
905
0
    }
906
0
  case LogicalTypeId::DATE:
907
0
    return make_uniq<CallbackColumnReader<int32_t, date_t, ParquetIntToDate>>(reader, schema);
908
0
  case LogicalTypeId::TIME:
909
0
    switch (schema.type_info) {
910
0
    case ParquetExtraTypeInfo::UNIT_MS:
911
0
      return make_uniq<CallbackColumnReader<int32_t, dtime_t, ParquetMsIntToTime>>(reader, schema);
912
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
913
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_t, ParquetIntToTime>>(reader, schema);
914
0
    case ParquetExtraTypeInfo::UNIT_NS:
915
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_t, ParquetNsIntToTime>>(reader, schema);
916
0
    default:
917
0
      throw InternalException("TIME requires type info");
918
0
    }
919
0
  case LogicalTypeId::TIME_NS:
920
0
    switch (schema.type_info) {
921
0
    case ParquetExtraTypeInfo::UNIT_MS:
922
0
      return make_uniq<CallbackColumnReader<int32_t, dtime_ns_t, ParquetMsIntToTimeNs>>(reader, schema);
923
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
924
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_ns_t, ParquetUsIntToTimeNs>>(reader, schema);
925
0
    case ParquetExtraTypeInfo::UNIT_NS:
926
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_ns_t, ParquetIntToTimeNs>>(reader, schema);
927
0
    default:
928
0
      throw InternalException("TIME requires type info");
929
0
    }
930
0
  case LogicalTypeId::TIME_TZ:
931
0
    switch (schema.type_info) {
932
0
    case ParquetExtraTypeInfo::UNIT_MS:
933
0
      return make_uniq<CallbackColumnReader<int32_t, dtime_tz_t, ParquetIntToTimeMsTZ>>(reader, schema);
934
0
    case ParquetExtraTypeInfo::UNIT_MICROS:
935
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_tz_t, ParquetIntToTimeTZ>>(reader, schema);
936
0
    case ParquetExtraTypeInfo::UNIT_NS:
937
0
      return make_uniq<CallbackColumnReader<int64_t, dtime_tz_t, ParquetIntToTimeNsTZ>>(reader, schema);
938
0
    default:
939
0
      throw InternalException("TIME_TZ requires type info");
940
0
    }
941
0
  case LogicalTypeId::BLOB:
942
0
  case LogicalTypeId::VARCHAR:
943
0
    return make_uniq<StringColumnReader>(reader, schema);
944
0
  case LogicalTypeId::DECIMAL:
945
    // we have to figure out what kind of int we need
946
0
    switch (schema.type_info) {
947
0
    case ParquetExtraTypeInfo::DECIMAL_INT32:
948
0
      return CreateDecimalReader<int32_t>(reader, schema);
949
0
    case ParquetExtraTypeInfo::DECIMAL_INT64:
950
0
      return CreateDecimalReader<int64_t>(reader, schema);
951
0
    case ParquetExtraTypeInfo::DECIMAL_BYTE_ARRAY:
952
0
      return ParquetDecimalUtils::CreateReader(reader, schema);
953
0
    default:
954
0
      throw NotImplementedException("Unrecognized Parquet type for Decimal");
955
0
    }
956
0
  case LogicalTypeId::UUID:
957
0
    return make_uniq<UUIDColumnReader>(reader, schema);
958
0
  case LogicalTypeId::INTERVAL:
959
0
    return make_uniq<IntervalColumnReader>(reader, schema);
960
0
  case LogicalTypeId::SQLNULL:
961
0
    return make_uniq<NullColumnReader>(reader, schema);
962
0
  default:
963
0
    break;
964
0
  }
965
0
  throw NotImplementedException(schema.type.ToString());
966
0
}
967
968
} // namespace duckdb