/src/duckdb/extension/parquet/reader/variant_column_reader.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | #include "reader/variant_column_reader.hpp" |
2 | | #include "reader/variant/variant_binary_decoder.hpp" |
3 | | #include "reader/variant/variant_shredded_conversion.hpp" |
4 | | |
5 | | namespace duckdb { |
6 | | |
7 | | //===--------------------------------------------------------------------===// |
8 | | // Variant Column Reader |
9 | | //===--------------------------------------------------------------------===// |
10 | | VariantColumnReader::VariantColumnReader(ClientContext &context, ParquetReader &reader, |
11 | | const ParquetColumnSchema &schema, |
12 | | vector<unique_ptr<ColumnReader>> child_readers_p) |
13 | 0 | : ColumnReader(reader, schema), context(context), child_readers(std::move(child_readers_p)) { |
14 | 0 | D_ASSERT(Type().InternalType() == PhysicalType::VARCHAR); |
15 | |
|
16 | 0 | if (child_readers[0]->Schema().name == "metadata" && child_readers[1]->Schema().name == "value") { |
17 | 0 | metadata_reader_idx = 0; |
18 | 0 | value_reader_idx = 1; |
19 | 0 | } else if (child_readers[1]->Schema().name == "metadata" && child_readers[0]->Schema().name == "value") { |
20 | 0 | metadata_reader_idx = 1; |
21 | 0 | value_reader_idx = 0; |
22 | 0 | } else { |
23 | 0 | throw InternalException("The Variant column must have 'metadata' and 'value' as the first two columns"); |
24 | 0 | } |
25 | 0 | } |
26 | | |
27 | 0 | ColumnReader &VariantColumnReader::GetChildReader(idx_t child_idx) { |
28 | 0 | if (!child_readers[child_idx]) { |
29 | 0 | throw InternalException("VariantColumnReader::GetChildReader(%d) - but this child reader is not set", |
30 | 0 | child_idx); |
31 | 0 | } |
32 | 0 | return *child_readers[child_idx].get(); |
33 | 0 | } |
34 | | |
35 | | void VariantColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChunk> &columns, |
36 | 0 | TProtocol &protocol_p) { |
37 | 0 | for (auto &child : child_readers) { |
38 | 0 | if (!child) { |
39 | 0 | continue; |
40 | 0 | } |
41 | 0 | child->InitializeRead(row_group_idx_p, columns, protocol_p); |
42 | 0 | } |
43 | 0 | } |
44 | | |
45 | 0 | static LogicalType GetIntermediateGroupType(optional_ptr<ColumnReader> typed_value) { |
46 | 0 | child_list_t<LogicalType> children; |
47 | 0 | children.emplace_back("value", LogicalType::BLOB); |
48 | 0 | if (typed_value) { |
49 | 0 | children.emplace_back("typed_value", typed_value->Type()); |
50 | 0 | } |
51 | 0 | return LogicalType::STRUCT(std::move(children)); |
52 | 0 | } |
53 | | |
54 | 0 | idx_t VariantColumnReader::Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result) { |
55 | 0 | if (pending_skips > 0) { |
56 | 0 | throw InternalException("VariantColumnReader cannot have pending skips"); |
57 | 0 | } |
58 | 0 | optional_ptr<ColumnReader> typed_value_reader = child_readers.size() == 3 ? child_readers[2].get() : nullptr; |
59 | | |
60 | | // If the child reader values are all valid, "define_out" may not be initialized at all |
61 | | // So, we just initialize them to all be valid beforehand |
62 | 0 | std::fill_n(define_out, num_values, MaxDefine()); |
63 | |
|
64 | 0 | optional_idx read_count; |
65 | |
|
66 | 0 | Vector metadata_intermediate(LogicalType::BLOB, num_values); |
67 | 0 | Vector intermediate_group(GetIntermediateGroupType(typed_value_reader), num_values); |
68 | 0 | auto &group_entries = StructVector::GetEntries(intermediate_group); |
69 | 0 | auto &value_intermediate = *group_entries[0]; |
70 | |
|
71 | 0 | auto metadata_values = |
72 | 0 | child_readers[metadata_reader_idx]->Read(num_values, define_out, repeat_out, metadata_intermediate); |
73 | 0 | auto value_values = child_readers[value_reader_idx]->Read(num_values, define_out, repeat_out, value_intermediate); |
74 | |
|
75 | 0 | D_ASSERT(child_readers[metadata_reader_idx]->Schema().name == "metadata"); |
76 | 0 | D_ASSERT(child_readers[value_reader_idx]->Schema().name == "value"); |
77 | |
|
78 | 0 | if (metadata_values != value_values) { |
79 | 0 | throw InvalidInputException( |
80 | 0 | "The Variant column did not contain the same amount of values for 'metadata' and 'value'"); |
81 | 0 | } |
82 | | |
83 | 0 | auto result_data = FlatVector::GetData<string_t>(result); |
84 | 0 | auto &result_validity = FlatVector::Validity(result); |
85 | |
|
86 | 0 | vector<VariantValue> conversion_result; |
87 | 0 | if (typed_value_reader) { |
88 | 0 | auto typed_values = typed_value_reader->Read(num_values, define_out, repeat_out, *group_entries[1]); |
89 | 0 | if (typed_values != value_values) { |
90 | 0 | throw InvalidInputException( |
91 | 0 | "The shredded Variant column did not contain the same amount of values for 'typed_value' and 'value'"); |
92 | 0 | } |
93 | 0 | } |
94 | 0 | conversion_result = |
95 | 0 | VariantShreddedConversion::Convert(metadata_intermediate, intermediate_group, 0, num_values, num_values); |
96 | |
|
97 | 0 | for (idx_t i = 0; i < conversion_result.size(); i++) { |
98 | 0 | auto &variant = conversion_result[i]; |
99 | 0 | if (variant.IsNull()) { |
100 | 0 | result_validity.SetInvalid(i); |
101 | 0 | continue; |
102 | 0 | } |
103 | | |
104 | | //! Write the result to a string |
105 | 0 | VariantDecodeResult decode_result; |
106 | 0 | decode_result.doc = yyjson_mut_doc_new(nullptr); |
107 | 0 | auto json_val = variant.ToJSON(context, decode_result.doc); |
108 | |
|
109 | 0 | size_t len; |
110 | 0 | decode_result.data = |
111 | 0 | yyjson_mut_val_write_opts(json_val, YYJSON_WRITE_ALLOW_INF_AND_NAN, nullptr, &len, nullptr); |
112 | 0 | if (!decode_result.data) { |
113 | 0 | throw InvalidInputException("Could not serialize the JSON to string, yyjson failed"); |
114 | 0 | } |
115 | 0 | result_data[i] = StringVector::AddString(result, decode_result.data, static_cast<idx_t>(len)); |
116 | 0 | } |
117 | | |
118 | 0 | read_count = value_values; |
119 | 0 | return read_count.GetIndex(); |
120 | 0 | } |
121 | | |
122 | 0 | void VariantColumnReader::Skip(idx_t num_values) { |
123 | 0 | for (auto &child : child_readers) { |
124 | 0 | if (!child) { |
125 | 0 | continue; |
126 | 0 | } |
127 | 0 | child->Skip(num_values); |
128 | 0 | } |
129 | 0 | } |
130 | | |
131 | 0 | void VariantColumnReader::RegisterPrefetch(ThriftFileTransport &transport, bool allow_merge) { |
132 | 0 | for (auto &child : child_readers) { |
133 | 0 | if (!child) { |
134 | 0 | continue; |
135 | 0 | } |
136 | 0 | child->RegisterPrefetch(transport, allow_merge); |
137 | 0 | } |
138 | 0 | } |
139 | | |
140 | 0 | uint64_t VariantColumnReader::TotalCompressedSize() { |
141 | 0 | uint64_t size = 0; |
142 | 0 | for (auto &child : child_readers) { |
143 | 0 | if (!child) { |
144 | 0 | continue; |
145 | 0 | } |
146 | 0 | size += child->TotalCompressedSize(); |
147 | 0 | } |
148 | 0 | return size; |
149 | 0 | } |
150 | | |
151 | 0 | idx_t VariantColumnReader::GroupRowsAvailable() { |
152 | 0 | for (auto &child : child_readers) { |
153 | 0 | if (!child) { |
154 | 0 | continue; |
155 | 0 | } |
156 | 0 | return child->GroupRowsAvailable(); |
157 | 0 | } |
158 | 0 | throw InternalException("No projected columns in struct?"); |
159 | 0 | } |
160 | | |
161 | | } // namespace duckdb |