/src/duckdb/extension/parquet/reader/struct_column_reader.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | #include "reader/struct_column_reader.hpp" |
2 | | |
3 | | namespace duckdb { |
4 | | |
5 | | //===--------------------------------------------------------------------===// |
6 | | // Struct Column Reader |
7 | | //===--------------------------------------------------------------------===// |
8 | | StructColumnReader::StructColumnReader(ParquetReader &reader, const ParquetColumnSchema &schema, |
9 | | vector<unique_ptr<ColumnReader>> child_readers_p) |
10 | 0 | : ColumnReader(reader, schema), child_readers(std::move(child_readers_p)) { |
11 | 0 | D_ASSERT(Type().InternalType() == PhysicalType::STRUCT); |
12 | 0 | } |
13 | | |
14 | 0 | ColumnReader &StructColumnReader::GetChildReader(idx_t child_idx) { |
15 | 0 | if (!child_readers[child_idx]) { |
16 | 0 | throw InternalException("StructColumnReader::GetChildReader(%d) - but this child reader is not set", child_idx); |
17 | 0 | } |
18 | 0 | return *child_readers[child_idx].get(); |
19 | 0 | } |
20 | | |
21 | | void StructColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChunk> &columns, |
22 | 0 | TProtocol &protocol_p) { |
23 | 0 | for (auto &child : child_readers) { |
24 | 0 | if (!child) { |
25 | 0 | continue; |
26 | 0 | } |
27 | 0 | child->InitializeRead(row_group_idx_p, columns, protocol_p); |
28 | 0 | } |
29 | 0 | } |
30 | | |
31 | 0 | idx_t StructColumnReader::Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result) { |
32 | 0 | auto &struct_entries = StructVector::GetEntries(result); |
33 | 0 | D_ASSERT(StructType::GetChildTypes(Type()).size() == struct_entries.size()); |
34 | |
|
35 | 0 | if (pending_skips > 0) { |
36 | 0 | throw InternalException("StructColumnReader cannot have pending skips"); |
37 | 0 | } |
38 | | |
39 | | // If the child reader values are all valid, "define_out" may not be initialized at all |
40 | | // So, we just initialize them to all be valid beforehand |
41 | 0 | std::fill_n(define_out, num_values, MaxDefine()); |
42 | |
|
43 | 0 | optional_idx read_count; |
44 | 0 | for (idx_t i = 0; i < child_readers.size(); i++) { |
45 | 0 | auto &child = child_readers[i]; |
46 | 0 | auto &target_vector = *struct_entries[i]; |
47 | 0 | if (!child) { |
48 | | // if we are not scanning this vector - set it to NULL |
49 | 0 | target_vector.SetVectorType(VectorType::CONSTANT_VECTOR); |
50 | 0 | ConstantVector::SetNull(target_vector, true); |
51 | 0 | continue; |
52 | 0 | } |
53 | 0 | auto child_num_values = child->Read(num_values, define_out, repeat_out, target_vector); |
54 | 0 | if (!read_count.IsValid()) { |
55 | 0 | read_count = child_num_values; |
56 | 0 | } else if (read_count.GetIndex() != child_num_values) { |
57 | 0 | throw std::runtime_error("Struct child row count mismatch"); |
58 | 0 | } |
59 | 0 | } |
60 | 0 | if (!read_count.IsValid()) { |
61 | 0 | read_count = num_values; |
62 | 0 | } |
63 | | // set the validity mask for this level |
64 | 0 | auto &validity = FlatVector::Validity(result); |
65 | 0 | for (idx_t i = 0; i < read_count.GetIndex(); i++) { |
66 | 0 | if (define_out[i] < MaxDefine()) { |
67 | 0 | validity.SetInvalid(i); |
68 | 0 | } |
69 | 0 | } |
70 | |
|
71 | 0 | return read_count.GetIndex(); |
72 | 0 | } |
73 | | |
74 | 0 | void StructColumnReader::Skip(idx_t num_values) { |
75 | 0 | for (auto &child : child_readers) { |
76 | 0 | if (!child) { |
77 | 0 | continue; |
78 | 0 | } |
79 | 0 | child->Skip(num_values); |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | 0 | void StructColumnReader::RegisterPrefetch(ThriftFileTransport &transport, bool allow_merge) { |
84 | 0 | for (auto &child : child_readers) { |
85 | 0 | if (!child) { |
86 | 0 | continue; |
87 | 0 | } |
88 | 0 | child->RegisterPrefetch(transport, allow_merge); |
89 | 0 | } |
90 | 0 | } |
91 | | |
92 | 0 | uint64_t StructColumnReader::TotalCompressedSize() { |
93 | 0 | uint64_t size = 0; |
94 | 0 | for (auto &child : child_readers) { |
95 | 0 | if (!child) { |
96 | 0 | continue; |
97 | 0 | } |
98 | 0 | size += child->TotalCompressedSize(); |
99 | 0 | } |
100 | 0 | return size; |
101 | 0 | } |
102 | | |
103 | 0 | static bool TypeHasExactRowCount(const LogicalType &type) { |
104 | 0 | switch (type.id()) { |
105 | 0 | case LogicalTypeId::LIST: |
106 | 0 | case LogicalTypeId::MAP: |
107 | 0 | return false; |
108 | 0 | case LogicalTypeId::STRUCT: |
109 | 0 | for (auto &kv : StructType::GetChildTypes(type)) { |
110 | 0 | if (TypeHasExactRowCount(kv.second)) { |
111 | 0 | return true; |
112 | 0 | } |
113 | 0 | } |
114 | 0 | return false; |
115 | 0 | default: |
116 | 0 | return true; |
117 | 0 | } |
118 | 0 | } |
119 | | |
120 | 0 | idx_t StructColumnReader::GroupRowsAvailable() { |
121 | 0 | for (auto &child : child_readers) { |
122 | 0 | if (!child) { |
123 | 0 | continue; |
124 | 0 | } |
125 | 0 | if (TypeHasExactRowCount(child->Type())) { |
126 | 0 | return child->GroupRowsAvailable(); |
127 | 0 | } |
128 | 0 | } |
129 | 0 | for (auto &child : child_readers) { |
130 | 0 | if (!child) { |
131 | 0 | continue; |
132 | 0 | } |
133 | 0 | return child->GroupRowsAvailable(); |
134 | 0 | } |
135 | 0 | throw InternalException("No projected columns in struct?"); |
136 | 0 | } |
137 | | |
138 | | } // namespace duckdb |