/src/duckdb/extension/parquet/writer/variant/analyze_variant.cpp
Line | Count | Source |
1 | | #include <stdint.h> |
2 | | #include <string> |
3 | | #include <unordered_map> |
4 | | #include <utility> |
5 | | |
6 | | #include "writer/variant_column_writer.hpp" |
7 | | #include "parquet_writer.hpp" |
8 | | #include "column_writer.hpp" |
9 | | #include "duckdb/common/assert.hpp" |
10 | | #include "duckdb/common/case_insensitive_map.hpp" |
11 | | #include "duckdb/common/helper.hpp" |
12 | | #include "duckdb/common/typedefs.hpp" |
13 | | #include "duckdb/common/types.hpp" |
14 | | #include "duckdb/common/types/string_type.hpp" |
15 | | #include "duckdb/common/types/variant.hpp" |
16 | | #include "duckdb/common/types/vector.hpp" |
17 | | #include "duckdb/common/unique_ptr.hpp" |
18 | | #include "duckdb/common/vector.hpp" |
19 | | #include "duckdb/common/vector/unified_vector_format.hpp" |
20 | | #include "duckdb/function/scalar/variant_utils.hpp" |
21 | | #include "parquet_column_schema.hpp" |
22 | | |
23 | | namespace duckdb { |
24 | | |
25 | 0 | unique_ptr<ParquetAnalyzeSchemaState> VariantColumnWriter::AnalyzeSchemaInit() { |
26 | 0 | if (child_writers.size() == 2 && !is_analyzed) { |
27 | 0 | return make_uniq<VariantAnalyzeSchemaState>(); |
28 | 0 | } |
29 | | //! Variant is already shredded explicitly, no need to analyze |
30 | 0 | return nullptr; |
31 | 0 | } |
32 | | |
33 | | static void AnalyzeSchemaInternal(VariantAnalyzeData &state, UnifiedVariantVectorData &variant, idx_t row, |
34 | 0 | uint32_t values_index) { |
35 | 0 | state.total_count++; |
36 | 0 | if (!variant.RowIsValid(row)) { |
37 | 0 | state.type_map[static_cast<uint8_t>(VariantLogicalType::VARIANT_NULL)]++; |
38 | 0 | return; |
39 | 0 | } |
40 | | |
41 | 0 | auto type_id = variant.GetTypeId(row, values_index); |
42 | 0 | state.type_map[static_cast<uint8_t>(type_id)]++; |
43 | |
|
44 | 0 | if (type_id == VariantLogicalType::OBJECT) { |
45 | 0 | if (!state.object_data) { |
46 | 0 | state.object_data = make_uniq<ObjectAnalyzeData>(); |
47 | 0 | } |
48 | 0 | auto &object_data = *state.object_data; |
49 | |
|
50 | 0 | auto nested_data = VariantUtils::DecodeNestedData(variant, row, values_index); |
51 | 0 | for (idx_t i = 0; i < nested_data.child_count; i++) { |
52 | 0 | auto child_values_index = variant.GetValuesIndex(row, i + nested_data.children_idx); |
53 | 0 | auto child_key_index = variant.GetKeysIndex(row, i + nested_data.children_idx); |
54 | |
|
55 | 0 | auto &key = variant.GetKey(row, child_key_index); |
56 | 0 | auto &child_state = object_data.fields[key.GetString()]; |
57 | 0 | AnalyzeSchemaInternal(child_state, variant, row, child_values_index); |
58 | 0 | } |
59 | 0 | } else if (type_id == VariantLogicalType::ARRAY) { |
60 | 0 | if (!state.array_data) { |
61 | 0 | state.array_data = make_uniq<ArrayAnalyzeData>(); |
62 | 0 | } |
63 | 0 | auto &array_data = *state.array_data; |
64 | 0 | auto nested_data = VariantUtils::DecodeNestedData(variant, row, values_index); |
65 | 0 | for (idx_t i = 0; i < nested_data.child_count; i++) { |
66 | 0 | auto child_values_index = variant.GetValuesIndex(row, i + nested_data.children_idx); |
67 | 0 | auto &child_state = array_data.child; |
68 | 0 | AnalyzeSchemaInternal(child_state, variant, row, child_values_index); |
69 | 0 | } |
70 | 0 | } else if (type_id == VariantLogicalType::DECIMAL) { |
71 | 0 | auto decimal_data = VariantUtils::DecodeDecimalData(variant, row, values_index); |
72 | 0 | auto decimal_count = state.type_map[static_cast<uint8_t>(VariantLogicalType::DECIMAL)]; |
73 | 0 | decimal_count--; |
74 | 0 | if (!decimal_count) { |
75 | 0 | state.decimal_width = decimal_data.width; |
76 | 0 | state.decimal_scale = decimal_data.scale; |
77 | 0 | state.decimal_consistent = true; |
78 | 0 | return; |
79 | 0 | } |
80 | 0 | if (!state.decimal_consistent) { |
81 | 0 | return; |
82 | 0 | } |
83 | 0 | if (decimal_data.width != state.decimal_width || decimal_data.scale != state.decimal_scale) { |
84 | 0 | state.decimal_consistent = false; |
85 | 0 | } |
86 | 0 | } else if (type_id == VariantLogicalType::BOOL_FALSE) { |
87 | | //! Move it to bool_true to have the counts all in one place |
88 | 0 | state.type_map[static_cast<uint8_t>(VariantLogicalType::BOOL_TRUE)]++; |
89 | 0 | state.type_map[static_cast<uint8_t>(VariantLogicalType::BOOL_FALSE)]--; |
90 | 0 | } |
91 | 0 | } |
92 | | |
93 | 0 | void VariantColumnWriter::AnalyzeSchema(ParquetAnalyzeSchemaState &state_p, Vector &input, idx_t count) { |
94 | 0 | auto &state = state_p.Cast<VariantAnalyzeSchemaState>(); |
95 | |
|
96 | 0 | RecursiveUnifiedVectorFormat recursive_format; |
97 | 0 | Vector::RecursiveToUnifiedFormat(input, recursive_format); |
98 | 0 | UnifiedVariantVectorData variant(recursive_format); |
99 | |
|
100 | 0 | for (idx_t i = 0; i < count; i++) { |
101 | 0 | AnalyzeSchemaInternal(state.analyze_data, variant, i, 0); |
102 | 0 | } |
103 | 0 | } |
104 | | |
105 | | namespace { |
106 | | |
107 | | struct ShredAnalysisState { |
108 | | idx_t highest_count = 0; |
109 | | LogicalType type = LogicalType::INVALID; |
110 | | }; |
111 | | |
112 | | } // namespace |
113 | | |
114 | | template <VariantLogicalType VARIANT_TYPE, LogicalTypeId SHREDDED_TYPE> |
115 | 0 | static void CheckPrimitive(const VariantAnalyzeData &state, ShredAnalysisState &result) { |
116 | 0 | auto count = state.type_map[static_cast<uint8_t>(VARIANT_TYPE)]; |
117 | 0 | if (count <= result.highest_count) { |
118 | 0 | return; |
119 | 0 | } |
120 | 0 | if (VARIANT_TYPE == VariantLogicalType::DECIMAL) { |
121 | 0 | D_ASSERT(count); |
122 | 0 | if (!state.decimal_consistent) { |
123 | 0 | return; |
124 | 0 | } |
125 | 0 | result.highest_count = count; |
126 | 0 | result.type = LogicalType::DECIMAL(state.decimal_width, state.decimal_scale); |
127 | 0 | } else { |
128 | 0 | result.highest_count = count; |
129 | 0 | result.type = SHREDDED_TYPE; |
130 | 0 | } |
131 | 0 | } Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)1, (duckdb::LogicalTypeId)10>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)3, (duckdb::LogicalTypeId)11>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)4, (duckdb::LogicalTypeId)12>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)5, (duckdb::LogicalTypeId)13>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)6, (duckdb::LogicalTypeId)14>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)13, (duckdb::LogicalTypeId)22>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)14, (duckdb::LogicalTypeId)23>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)15, (duckdb::LogicalTypeId)21>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)19, (duckdb::LogicalTypeId)15>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)20, (duckdb::LogicalTypeId)16>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)24, (duckdb::LogicalTypeId)19>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)25, (duckdb::LogicalTypeId)20>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)27, (duckdb::LogicalTypeId)32>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)34, (duckdb::LogicalTypeId)33>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)17, (duckdb::LogicalTypeId)26>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)16, (duckdb::LogicalTypeId)25>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)18, (duckdb::LogicalTypeId)54>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)8, (duckdb::LogicalTypeId)12>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)9, (duckdb::LogicalTypeId)13>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)10, (duckdb::LogicalTypeId)14>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)11, (duckdb::LogicalTypeId)14>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)12, (duckdb::LogicalTypeId)14>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::CheckPrimitive<(duckdb::VariantLogicalType)7, (duckdb::LogicalTypeId)14>(duckdb::VariantAnalyzeData const&, duckdb::(anonymous namespace)::ShredAnalysisState&) |
132 | | |
133 | 0 | static bool ConstructShreddedType(const VariantAnalyzeData &state, LogicalType &out) { |
134 | 0 | ShredAnalysisState result; |
135 | |
|
136 | 0 | if (state.type_map[0] == state.total_count) { |
137 | | //! All NULL, emit INT32 |
138 | 0 | out = LogicalType::INTEGER; |
139 | 0 | return true; |
140 | 0 | } |
141 | | |
142 | 0 | CheckPrimitive<VariantLogicalType::BOOL_TRUE, LogicalTypeId::BOOLEAN>(state, result); |
143 | 0 | CheckPrimitive<VariantLogicalType::INT8, LogicalTypeId::TINYINT>(state, result); |
144 | 0 | CheckPrimitive<VariantLogicalType::INT16, LogicalTypeId::SMALLINT>(state, result); |
145 | 0 | CheckPrimitive<VariantLogicalType::INT32, LogicalTypeId::INTEGER>(state, result); |
146 | 0 | CheckPrimitive<VariantLogicalType::INT64, LogicalTypeId::BIGINT>(state, result); |
147 | 0 | CheckPrimitive<VariantLogicalType::FLOAT, LogicalTypeId::FLOAT>(state, result); |
148 | 0 | CheckPrimitive<VariantLogicalType::DOUBLE, LogicalTypeId::DOUBLE>(state, result); |
149 | 0 | CheckPrimitive<VariantLogicalType::DECIMAL, LogicalTypeId::DECIMAL>(state, result); |
150 | 0 | CheckPrimitive<VariantLogicalType::DATE, LogicalTypeId::DATE>(state, result); |
151 | 0 | CheckPrimitive<VariantLogicalType::TIME_MICROS, LogicalTypeId::TIME>(state, result); |
152 | 0 | CheckPrimitive<VariantLogicalType::TIMESTAMP_MICROS, LogicalTypeId::TIMESTAMP>(state, result); |
153 | 0 | CheckPrimitive<VariantLogicalType::TIMESTAMP_NANOS, LogicalTypeId::TIMESTAMP_NS>(state, result); |
154 | 0 | CheckPrimitive<VariantLogicalType::TIMESTAMP_MICROS_TZ, LogicalTypeId::TIMESTAMP_TZ>(state, result); |
155 | 0 | CheckPrimitive<VariantLogicalType::TIMESTAMP_NANOS_TZ, LogicalTypeId::TIMESTAMP_TZ_NS>(state, result); |
156 | 0 | CheckPrimitive<VariantLogicalType::BLOB, LogicalTypeId::BLOB>(state, result); |
157 | 0 | CheckPrimitive<VariantLogicalType::VARCHAR, LogicalTypeId::VARCHAR>(state, result); |
158 | 0 | CheckPrimitive<VariantLogicalType::UUID, LogicalTypeId::UUID>(state, result); |
159 | | // these types are not natively supported in Parquet - we convert them during write |
160 | | // during analysis map them to the type we convert them into |
161 | 0 | CheckPrimitive<VariantLogicalType::UINT8, LogicalTypeId::SMALLINT>(state, result); |
162 | 0 | CheckPrimitive<VariantLogicalType::UINT16, LogicalTypeId::INTEGER>(state, result); |
163 | 0 | CheckPrimitive<VariantLogicalType::UINT32, LogicalTypeId::BIGINT>(state, result); |
164 | 0 | CheckPrimitive<VariantLogicalType::UINT64, LogicalTypeId::BIGINT>(state, result); |
165 | 0 | CheckPrimitive<VariantLogicalType::UINT128, LogicalTypeId::BIGINT>(state, result); |
166 | 0 | CheckPrimitive<VariantLogicalType::INT128, LogicalTypeId::BIGINT>(state, result); |
167 | |
|
168 | 0 | auto array_count = state.type_map[static_cast<uint8_t>(VariantLogicalType::ARRAY)]; |
169 | 0 | auto object_count = state.type_map[static_cast<uint8_t>(VariantLogicalType::OBJECT)]; |
170 | 0 | if (array_count > object_count) { |
171 | 0 | if (array_count > result.highest_count) { |
172 | 0 | auto &array_data = *state.array_data; |
173 | 0 | LogicalType child_type; |
174 | 0 | if (!ConstructShreddedType(array_data.child, child_type)) { |
175 | 0 | return false; |
176 | 0 | } |
177 | 0 | out = LogicalType::LIST(child_type); |
178 | 0 | return true; |
179 | 0 | } |
180 | 0 | } else { |
181 | 0 | if (object_count > result.highest_count) { |
182 | 0 | auto &object_data = *state.object_data; |
183 | | |
184 | | //! TODO: implement some logic to determine which fields are worth shredding, considering the overhead when |
185 | | //! only 10% of rows make use of the field |
186 | 0 | child_list_t<LogicalType> field_types; |
187 | 0 | for (auto &field : object_data.fields) { |
188 | 0 | LogicalType child_type; |
189 | 0 | if (!ConstructShreddedType(field.second, child_type)) { |
190 | | // cannot shred on this field - skip |
191 | 0 | continue; |
192 | 0 | } |
193 | 0 | field_types.emplace_back(field.first, child_type); |
194 | 0 | } |
195 | 0 | if (field_types.empty()) { |
196 | | // no field types to shred on - avoid shredding |
197 | 0 | return false; |
198 | 0 | } |
199 | 0 | out = LogicalType::STRUCT(field_types); |
200 | 0 | return true; |
201 | 0 | } |
202 | 0 | } |
203 | 0 | if (result.type.id() == LogicalTypeId::INVALID) { |
204 | 0 | return false; |
205 | 0 | } |
206 | 0 | out = result.type; |
207 | 0 | return true; |
208 | 0 | } |
209 | | |
210 | 0 | void VariantColumnWriter::AnalyzeSchemaFinalize(const ParquetAnalyzeSchemaState &state_p) { |
211 | 0 | auto &state = state_p.Cast<VariantAnalyzeSchemaState>(); |
212 | 0 | LogicalType shredded_type; |
213 | 0 | if (!ConstructShreddedType(state.analyze_data, shredded_type)) { |
214 | | //! Can't shred, keep the original children |
215 | 0 | return; |
216 | 0 | } |
217 | 0 | is_analyzed = true; |
218 | 0 | analyzed_shredding_type = ShreddingType(shredded_type); |
219 | 0 | auto typed_value = TransformTypedValueRecursive(shredded_type); |
220 | 0 | auto &schema = Schema(); |
221 | 0 | auto &context = writer.GetContext(); |
222 | 0 | D_ASSERT(child_writers.size() == 2); |
223 | 0 | child_writers.pop_back(); |
224 | | //! Recreate the column writer for 'value' because this is now "optional" |
225 | 0 | child_writers.push_back(ColumnWriter::CreateWriterRecursive(context, writer, schema_path, LogicalType::BLOB, |
226 | 0 | "value", false, nullptr, nullptr, schema.max_repeat, |
227 | 0 | schema.max_define + 1, true)); |
228 | 0 | child_writers.push_back(ColumnWriter::CreateWriterRecursive(context, writer, schema_path, typed_value, |
229 | 0 | "typed_value", false, nullptr, nullptr, |
230 | 0 | schema.max_repeat, schema.max_define + 1, true)); |
231 | 0 | } |
232 | | |
233 | 0 | bool VariantColumnWriter::TryExportPreparedShreddingType(ShreddingType &result) const { |
234 | 0 | if (analyzed_shredding_type.set) { |
235 | 0 | result = analyzed_shredding_type.Copy(); |
236 | 0 | return true; |
237 | 0 | } |
238 | 0 | return StructColumnWriter::TryExportPreparedShreddingType(result); |
239 | 0 | } |
240 | | |
241 | | } // namespace duckdb |