/src/duckdb/extension/parquet/column_writer.cpp
Line | Count | Source |
1 | | #include "column_writer.hpp" |
2 | | |
3 | | #include "duckdb.hpp" |
4 | | #include "parquet_geometry.hpp" |
5 | | #include "parquet_rle_bp_decoder.hpp" |
6 | | #include "parquet_bss_encoder.hpp" |
7 | | #include "parquet_statistics.hpp" |
8 | | #include "parquet_writer.hpp" |
9 | | #include "writer/array_column_writer.hpp" |
10 | | #include "writer/boolean_column_writer.hpp" |
11 | | #include "writer/decimal_column_writer.hpp" |
12 | | #include "writer/enum_column_writer.hpp" |
13 | | #include "writer/list_column_writer.hpp" |
14 | | #include "writer/primitive_column_writer.hpp" |
15 | | #include "writer/struct_column_writer.hpp" |
16 | | #include "writer/variant_column_writer.hpp" |
17 | | #include "writer/templated_column_writer.hpp" |
18 | | #include "duckdb/common/exception.hpp" |
19 | | #include "duckdb/common/operator/comparison_operators.hpp" |
20 | | #include "duckdb/common/serializer/buffered_file_writer.hpp" |
21 | | #include "duckdb/common/serializer/memory_stream.hpp" |
22 | | #include "duckdb/common/serializer/write_stream.hpp" |
23 | | #include "duckdb/common/string_map_set.hpp" |
24 | | #include "duckdb/common/types/hugeint.hpp" |
25 | | #include "duckdb/common/types/time.hpp" |
26 | | #include "duckdb/common/types/timestamp.hpp" |
27 | | #include "duckdb/execution/expression_executor.hpp" |
28 | | |
29 | | #include "brotli/encode.h" |
30 | | #include "lz4.hpp" |
31 | | #include "miniz_wrapper.hpp" |
32 | | #include "snappy.h" |
33 | | #include "zstd.h" |
34 | | |
35 | | #include <cmath> |
36 | | |
37 | | namespace duckdb { |
38 | | |
39 | | using namespace duckdb_parquet; // NOLINT |
40 | | using namespace duckdb_miniz; // NOLINT |
41 | | |
42 | | using duckdb_parquet::CompressionCodec; |
43 | | using duckdb_parquet::ConvertedType; |
44 | | using duckdb_parquet::Encoding; |
45 | | using duckdb_parquet::FieldRepetitionType; |
46 | | using duckdb_parquet::FileMetaData; |
47 | | using duckdb_parquet::PageHeader; |
48 | | using duckdb_parquet::PageType; |
49 | | using ParquetRowGroup = duckdb_parquet::RowGroup; |
50 | | using duckdb_parquet::Type; |
51 | | |
52 | | constexpr uint16_t ColumnWriter::PARQUET_DEFINE_VALID; |
53 | | |
54 | | //===--------------------------------------------------------------------===// |
55 | | // ColumnWriterStatistics |
56 | | //===--------------------------------------------------------------------===// |
57 | 0 | ColumnWriterStatistics::~ColumnWriterStatistics() { |
58 | 0 | } |
59 | | |
60 | 0 | bool ColumnWriterStatistics::HasStats() { |
61 | 0 | return false; |
62 | 0 | } |
63 | | |
64 | 0 | string ColumnWriterStatistics::GetMin() { |
65 | 0 | return string(); |
66 | 0 | } |
67 | | |
68 | 0 | string ColumnWriterStatistics::GetMax() { |
69 | 0 | return string(); |
70 | 0 | } |
71 | | |
72 | 0 | string ColumnWriterStatistics::GetMinValue() { |
73 | 0 | return string(); |
74 | 0 | } |
75 | | |
76 | 0 | string ColumnWriterStatistics::GetMaxValue() { |
77 | 0 | return string(); |
78 | 0 | } |
79 | | |
80 | 0 | bool ColumnWriterStatistics::CanHaveNaN() { |
81 | 0 | return false; |
82 | 0 | } |
83 | | |
84 | 0 | bool ColumnWriterStatistics::HasNaN() { |
85 | 0 | return false; |
86 | 0 | } |
87 | | |
88 | 0 | bool ColumnWriterStatistics::MinIsExact() { |
89 | 0 | return true; |
90 | 0 | } |
91 | | |
92 | 0 | bool ColumnWriterStatistics::MaxIsExact() { |
93 | 0 | return true; |
94 | 0 | } |
95 | | |
96 | 0 | bool ColumnWriterStatistics::HasGeoStats() { |
97 | 0 | return false; |
98 | 0 | } |
99 | | |
100 | 0 | optional_ptr<GeometryStatsData> ColumnWriterStatistics::GetGeoStats() { |
101 | 0 | return nullptr; |
102 | 0 | } |
103 | | |
104 | 0 | void ColumnWriterStatistics::WriteGeoStats(duckdb_parquet::GeospatialStatistics &stats) { |
105 | 0 | D_ASSERT(false); // this should never be called |
106 | 0 | } |
107 | | |
108 | | //===--------------------------------------------------------------------===// |
109 | | // ColumnWriter |
110 | | //===--------------------------------------------------------------------===// |
111 | | ColumnWriter::ColumnWriter(ParquetWriter &writer, const ParquetColumnSchema &column_schema, |
112 | | vector<string> schema_path_p, bool can_have_nulls) |
113 | 0 | : writer(writer), column_schema(column_schema), schema_path(std::move(schema_path_p)), |
114 | 0 | can_have_nulls(can_have_nulls) { |
115 | 0 | } |
116 | 0 | ColumnWriter::~ColumnWriter() { |
117 | 0 | } |
118 | | |
119 | 0 | ColumnWriterState::~ColumnWriterState() { |
120 | 0 | } |
121 | | |
122 | | void ColumnWriter::CompressPage(MemoryStream &temp_writer, size_t &compressed_size, data_ptr_t &compressed_data, |
123 | 0 | AllocatedData &compressed_buf) { |
124 | 0 | switch (writer.GetCodec()) { |
125 | 0 | case CompressionCodec::UNCOMPRESSED: |
126 | 0 | compressed_size = temp_writer.GetPosition(); |
127 | 0 | compressed_data = temp_writer.GetData(); |
128 | 0 | break; |
129 | | |
130 | 0 | case CompressionCodec::SNAPPY: { |
131 | 0 | compressed_size = duckdb_snappy::MaxCompressedLength(temp_writer.GetPosition()); |
132 | 0 | compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size); |
133 | 0 | duckdb_snappy::RawCompress(const_char_ptr_cast(temp_writer.GetData()), temp_writer.GetPosition(), |
134 | 0 | char_ptr_cast(compressed_buf.get()), &compressed_size); |
135 | 0 | compressed_data = compressed_buf.get(); |
136 | 0 | D_ASSERT(compressed_size <= duckdb_snappy::MaxCompressedLength(temp_writer.GetPosition())); |
137 | 0 | break; |
138 | 0 | } |
139 | 0 | case CompressionCodec::LZ4_RAW: { |
140 | 0 | compressed_size = duckdb_lz4::LZ4_compressBound(UnsafeNumericCast<int32_t>(temp_writer.GetPosition())); |
141 | 0 | compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size); |
142 | 0 | compressed_size = duckdb_lz4::LZ4_compress_default( |
143 | 0 | const_char_ptr_cast(temp_writer.GetData()), char_ptr_cast(compressed_buf.get()), |
144 | 0 | UnsafeNumericCast<int32_t>(temp_writer.GetPosition()), UnsafeNumericCast<int32_t>(compressed_size)); |
145 | 0 | compressed_data = compressed_buf.get(); |
146 | 0 | break; |
147 | 0 | } |
148 | 0 | case CompressionCodec::GZIP: { |
149 | 0 | MiniZStream s; |
150 | 0 | compressed_size = s.MaxCompressedLength(temp_writer.GetPosition()); |
151 | 0 | compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size); |
152 | 0 | s.Compress(const_char_ptr_cast(temp_writer.GetData()), temp_writer.GetPosition(), |
153 | 0 | char_ptr_cast(compressed_buf.get()), &compressed_size); |
154 | 0 | compressed_data = compressed_buf.get(); |
155 | 0 | break; |
156 | 0 | } |
157 | 0 | case CompressionCodec::ZSTD: { |
158 | 0 | compressed_size = duckdb_zstd::ZSTD_compressBound(temp_writer.GetPosition()); |
159 | 0 | compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size); |
160 | 0 | compressed_size = duckdb_zstd::ZSTD_compress((void *)compressed_buf.get(), compressed_size, |
161 | 0 | (const void *)temp_writer.GetData(), temp_writer.GetPosition(), |
162 | 0 | UnsafeNumericCast<int32_t>(writer.CompressionLevel())); |
163 | 0 | compressed_data = compressed_buf.get(); |
164 | 0 | break; |
165 | 0 | } |
166 | 0 | case CompressionCodec::BROTLI: { |
167 | 0 | compressed_size = duckdb_brotli::BrotliEncoderMaxCompressedSize(temp_writer.GetPosition()); |
168 | 0 | compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size); |
169 | 0 | duckdb_brotli::BrotliEncoderCompress(BROTLI_DEFAULT_QUALITY, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, |
170 | 0 | temp_writer.GetPosition(), temp_writer.GetData(), &compressed_size, |
171 | 0 | compressed_buf.get()); |
172 | 0 | compressed_data = compressed_buf.get(); |
173 | 0 | break; |
174 | 0 | } |
175 | 0 | default: |
176 | 0 | throw InternalException("Unsupported codec for Parquet Writer"); |
177 | 0 | } |
178 | | |
179 | 0 | if (compressed_size > idx_t(NumericLimits<int32_t>::Maximum())) { |
180 | 0 | throw InternalException("Parquet writer: %d compressed page size out of range for type integer", |
181 | 0 | temp_writer.GetPosition()); |
182 | 0 | } |
183 | 0 | } |
184 | | |
185 | 0 | void ColumnWriter::HandleRepeatLevels(ColumnWriterState &state, ColumnWriterState *parent, idx_t count) const { |
186 | 0 | if (!parent) { |
187 | | // no repeat levels without a parent node |
188 | 0 | return; |
189 | 0 | } |
190 | 0 | if (state.repetition_levels.size() >= parent->repetition_levels.size()) { |
191 | 0 | return; |
192 | 0 | } |
193 | 0 | state.repetition_levels.insert(state.repetition_levels.end(), |
194 | 0 | parent->repetition_levels.begin() + state.repetition_levels.size(), |
195 | 0 | parent->repetition_levels.end()); |
196 | 0 | } |
197 | | |
198 | | void ColumnWriter::HandleDefineLevels(ColumnWriterState &state, ColumnWriterState *parent, const ValidityMask &validity, |
199 | 0 | const idx_t count, const uint16_t define_value, const uint16_t null_value) const { |
200 | 0 | if (parent) { |
201 | | // parent node: inherit definition level from the parent |
202 | 0 | idx_t vector_index = 0; |
203 | 0 | while (state.definition_levels.size() < parent->definition_levels.size()) { |
204 | 0 | idx_t current_index = state.definition_levels.size(); |
205 | 0 | if (parent->definition_levels[current_index] != PARQUET_DEFINE_VALID) { |
206 | | //! Inherit nulls from parent |
207 | 0 | state.definition_levels.push_back(parent->definition_levels[current_index]); |
208 | 0 | state.parent_null_count++; |
209 | 0 | } else if (validity.RowIsValid(vector_index)) { |
210 | | //! Produce a non-null define |
211 | 0 | state.definition_levels.push_back(define_value); |
212 | 0 | } else { |
213 | | //! Produce a null define |
214 | 0 | if (!can_have_nulls) { |
215 | 0 | throw IOException("Parquet writer: map key column is not allowed to contain NULL values"); |
216 | 0 | } |
217 | 0 | state.null_count++; |
218 | 0 | state.definition_levels.push_back(null_value); |
219 | 0 | } |
220 | 0 | D_ASSERT(parent->is_empty.empty() || current_index < parent->is_empty.size()); |
221 | 0 | if (parent->is_empty.empty() || !parent->is_empty[current_index]) { |
222 | 0 | vector_index++; |
223 | 0 | } |
224 | 0 | } |
225 | 0 | return; |
226 | 0 | } |
227 | | |
228 | | // no parent: set definition levels only from this validity mask |
229 | 0 | if (validity.AllValid()) { |
230 | 0 | state.definition_levels.insert(state.definition_levels.end(), count, define_value); |
231 | 0 | } else { |
232 | 0 | for (idx_t i = 0; i < count; i++) { |
233 | 0 | const auto is_null = !validity.RowIsValid(i); |
234 | 0 | state.definition_levels.emplace_back(is_null ? null_value : define_value); |
235 | 0 | state.null_count += is_null; |
236 | 0 | } |
237 | 0 | } |
238 | 0 | if (!can_have_nulls && state.null_count != 0) { |
239 | 0 | throw IOException("Parquet writer: map key column is not allowed to contain NULL values"); |
240 | 0 | } |
241 | 0 | } |
242 | | |
243 | | //===--------------------------------------------------------------------===// |
244 | | // Create Column Writer |
245 | | //===--------------------------------------------------------------------===// |
246 | | |
247 | | ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::SchemaElement> &schemas, |
248 | | const LogicalType &type, const string &name, bool allow_geometry, |
249 | | optional_ptr<const ChildFieldIDs> field_ids, |
250 | | optional_ptr<const ShreddingType> shredding_types, idx_t max_repeat, |
251 | 0 | idx_t max_define, bool can_have_nulls) { |
252 | 0 | auto null_type = can_have_nulls ? FieldRepetitionType::OPTIONAL : FieldRepetitionType::REQUIRED; |
253 | 0 | if (!can_have_nulls) { |
254 | 0 | max_define--; |
255 | 0 | } |
256 | 0 | idx_t schema_idx = schemas.size(); |
257 | |
|
258 | 0 | optional_ptr<const FieldID> field_id; |
259 | 0 | optional_ptr<const ChildFieldIDs> child_field_ids; |
260 | 0 | if (field_ids) { |
261 | 0 | auto field_id_it = field_ids->ids->find(name); |
262 | 0 | if (field_id_it != field_ids->ids->end()) { |
263 | 0 | field_id = &field_id_it->second; |
264 | 0 | child_field_ids = &field_id->child_field_ids; |
265 | 0 | } |
266 | 0 | } |
267 | 0 | optional_ptr<const ShreddingType> shredding_type; |
268 | 0 | if (shredding_types) { |
269 | 0 | shredding_type = shredding_types->GetChild(name); |
270 | 0 | } |
271 | |
|
272 | 0 | if (type.id() == LogicalTypeId::STRUCT && type.GetAlias() == "PARQUET_VARIANT") { |
273 | | // variant type |
274 | | // variants are stored as follows: |
275 | | // group <name> VARIANT { |
276 | | // metadata BYTE_ARRAY, |
277 | | // value BYTE_ARRAY, |
278 | | // [<typed_value>] |
279 | | // } |
280 | |
|
281 | 0 | const bool is_shredded = shredding_type != nullptr; |
282 | |
|
283 | 0 | child_list_t<LogicalType> child_types; |
284 | 0 | child_types.emplace_back("metadata", LogicalType::BLOB); |
285 | 0 | child_types.emplace_back("value", LogicalType::BLOB); |
286 | 0 | if (is_shredded) { |
287 | 0 | auto &typed_value_type = shredding_type->type; |
288 | 0 | if (typed_value_type.id() != LogicalTypeId::ANY) { |
289 | 0 | child_types.emplace_back("typed_value", |
290 | 0 | VariantColumnWriter::TransformTypedValueRecursive(typed_value_type)); |
291 | 0 | } |
292 | 0 | } |
293 | | |
294 | | // variant group |
295 | 0 | duckdb_parquet::SchemaElement top_element; |
296 | 0 | top_element.repetition_type = null_type; |
297 | 0 | top_element.num_children = child_types.size(); |
298 | 0 | top_element.logicalType.__isset.VARIANT = true; |
299 | 0 | top_element.logicalType.VARIANT.__isset.specification_version = true; |
300 | 0 | top_element.logicalType.VARIANT.specification_version = 1; |
301 | 0 | top_element.__isset.logicalType = true; |
302 | 0 | top_element.__isset.num_children = true; |
303 | 0 | top_element.__isset.repetition_type = true; |
304 | 0 | top_element.name = name; |
305 | 0 | schemas.push_back(std::move(top_element)); |
306 | |
|
307 | 0 | ParquetColumnSchema variant_column(name, type, max_define, max_repeat, schema_idx, 0); |
308 | 0 | variant_column.children.reserve(child_types.size()); |
309 | 0 | for (auto &child_type : child_types) { |
310 | 0 | auto &child_name = child_type.first; |
311 | 0 | bool is_optional; |
312 | 0 | if (child_name == "metadata") { |
313 | 0 | is_optional = false; |
314 | 0 | } else if (child_name == "value") { |
315 | 0 | if (is_shredded) { |
316 | | //! When shredding the variant, the 'value' becomes optional |
317 | 0 | is_optional = true; |
318 | 0 | } else { |
319 | 0 | is_optional = false; |
320 | 0 | } |
321 | 0 | } else { |
322 | 0 | D_ASSERT(child_name == "typed_value"); |
323 | 0 | is_optional = true; |
324 | 0 | } |
325 | 0 | variant_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first, |
326 | 0 | allow_geometry, child_field_ids, shredding_type, |
327 | 0 | max_repeat, max_define + 1, is_optional)); |
328 | 0 | } |
329 | 0 | return variant_column; |
330 | 0 | } |
331 | | |
332 | 0 | if (type.id() == LogicalTypeId::STRUCT || type.id() == LogicalTypeId::UNION) { |
333 | 0 | auto &child_types = StructType::GetChildTypes(type); |
334 | | // set up the schema element for this struct |
335 | 0 | duckdb_parquet::SchemaElement schema_element; |
336 | 0 | schema_element.repetition_type = null_type; |
337 | 0 | schema_element.num_children = UnsafeNumericCast<int32_t>(child_types.size()); |
338 | 0 | schema_element.__isset.num_children = true; |
339 | 0 | schema_element.__isset.type = false; |
340 | 0 | schema_element.__isset.repetition_type = true; |
341 | 0 | schema_element.name = name; |
342 | 0 | if (field_id && field_id->set) { |
343 | 0 | schema_element.__isset.field_id = true; |
344 | 0 | schema_element.field_id = field_id->field_id; |
345 | 0 | } |
346 | 0 | schemas.push_back(std::move(schema_element)); |
347 | |
|
348 | 0 | ParquetColumnSchema struct_column(name, type, max_define, max_repeat, schema_idx, 0); |
349 | | // construct the child schemas recursively |
350 | 0 | struct_column.children.reserve(child_types.size()); |
351 | 0 | for (auto &child_type : child_types) { |
352 | 0 | struct_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first, |
353 | 0 | allow_geometry, child_field_ids, shredding_type, |
354 | 0 | max_repeat, max_define + 1, true)); |
355 | 0 | } |
356 | 0 | return struct_column; |
357 | 0 | } |
358 | 0 | if (type.id() == LogicalTypeId::LIST || type.id() == LogicalTypeId::ARRAY) { |
359 | 0 | auto is_list = type.id() == LogicalTypeId::LIST; |
360 | 0 | auto &child_type = is_list ? ListType::GetChildType(type) : ArrayType::GetChildType(type); |
361 | | // set up the two schema elements for the list |
362 | | // for some reason we only set the converted type in the OPTIONAL element |
363 | | // first an OPTIONAL element |
364 | 0 | duckdb_parquet::SchemaElement optional_element; |
365 | 0 | optional_element.repetition_type = null_type; |
366 | 0 | optional_element.num_children = 1; |
367 | 0 | optional_element.converted_type = ConvertedType::LIST; |
368 | 0 | optional_element.__isset.num_children = true; |
369 | 0 | optional_element.__isset.type = false; |
370 | 0 | optional_element.__isset.repetition_type = true; |
371 | 0 | optional_element.__isset.converted_type = true; |
372 | 0 | optional_element.name = name; |
373 | 0 | if (field_id && field_id->set) { |
374 | 0 | optional_element.__isset.field_id = true; |
375 | 0 | optional_element.field_id = field_id->field_id; |
376 | 0 | } |
377 | 0 | schemas.push_back(std::move(optional_element)); |
378 | | |
379 | | // then a REPEATED element |
380 | 0 | duckdb_parquet::SchemaElement repeated_element; |
381 | 0 | repeated_element.repetition_type = FieldRepetitionType::REPEATED; |
382 | 0 | repeated_element.num_children = 1; |
383 | 0 | repeated_element.__isset.num_children = true; |
384 | 0 | repeated_element.__isset.type = false; |
385 | 0 | repeated_element.__isset.repetition_type = true; |
386 | 0 | repeated_element.name = "list"; |
387 | 0 | schemas.push_back(std::move(repeated_element)); |
388 | |
|
389 | 0 | ParquetColumnSchema list_column(name, type, max_define, max_repeat, schema_idx, 0); |
390 | 0 | list_column.children.push_back(FillParquetSchema(schemas, child_type, "element", allow_geometry, |
391 | 0 | child_field_ids, shredding_type, max_repeat + 1, |
392 | 0 | max_define + 2, true)); |
393 | 0 | return list_column; |
394 | 0 | } |
395 | 0 | if (type.id() == LogicalTypeId::MAP) { |
396 | | // map type |
397 | | // maps are stored as follows: |
398 | | // <map-repetition> group <name> (MAP) { |
399 | | // repeated group key_value { |
400 | | // required <key-type> key; |
401 | | // <value-repetition> <value-type> value; |
402 | | // } |
403 | | // } |
404 | | // top map element |
405 | 0 | duckdb_parquet::SchemaElement top_element; |
406 | 0 | top_element.repetition_type = null_type; |
407 | 0 | top_element.num_children = 1; |
408 | 0 | top_element.converted_type = ConvertedType::MAP; |
409 | 0 | top_element.__isset.repetition_type = true; |
410 | 0 | top_element.__isset.num_children = true; |
411 | 0 | top_element.__isset.converted_type = true; |
412 | 0 | top_element.__isset.type = false; |
413 | 0 | top_element.name = name; |
414 | 0 | if (field_id && field_id->set) { |
415 | 0 | top_element.__isset.field_id = true; |
416 | 0 | top_element.field_id = field_id->field_id; |
417 | 0 | } |
418 | 0 | schemas.push_back(std::move(top_element)); |
419 | | |
420 | | // key_value element |
421 | 0 | duckdb_parquet::SchemaElement kv_element; |
422 | 0 | kv_element.repetition_type = FieldRepetitionType::REPEATED; |
423 | 0 | kv_element.num_children = 2; |
424 | 0 | kv_element.__isset.repetition_type = true; |
425 | 0 | kv_element.__isset.num_children = true; |
426 | 0 | kv_element.__isset.type = false; |
427 | 0 | kv_element.name = "key_value"; |
428 | 0 | schemas.push_back(std::move(kv_element)); |
429 | | |
430 | | // construct the child types recursively |
431 | 0 | vector<LogicalType> kv_types {MapType::KeyType(type), MapType::ValueType(type)}; |
432 | 0 | vector<string> kv_names {"key", "value"}; |
433 | |
|
434 | 0 | ParquetColumnSchema map_column(name, type, max_define, max_repeat, schema_idx, 0); |
435 | 0 | map_column.children.reserve(2); |
436 | 0 | for (idx_t i = 0; i < 2; i++) { |
437 | | // key needs to be marked as REQUIRED |
438 | 0 | bool is_key = i == 0; |
439 | 0 | auto child_schema = FillParquetSchema(schemas, kv_types[i], kv_names[i], allow_geometry, child_field_ids, |
440 | 0 | shredding_type, max_repeat + 1, max_define + 2, !is_key); |
441 | |
|
442 | 0 | map_column.children.push_back(std::move(child_schema)); |
443 | 0 | } |
444 | 0 | return map_column; |
445 | 0 | } |
446 | | |
447 | 0 | duckdb_parquet::SchemaElement schema_element; |
448 | 0 | schema_element.type = ParquetWriter::DuckDBTypeToParquetType(type); |
449 | 0 | schema_element.repetition_type = null_type; |
450 | 0 | schema_element.__isset.num_children = false; |
451 | 0 | schema_element.__isset.type = true; |
452 | 0 | schema_element.__isset.repetition_type = true; |
453 | 0 | schema_element.name = name; |
454 | 0 | if (field_id && field_id->set) { |
455 | 0 | schema_element.__isset.field_id = true; |
456 | 0 | schema_element.field_id = field_id->field_id; |
457 | 0 | } |
458 | 0 | ParquetWriter::SetSchemaProperties(type, schema_element, allow_geometry); |
459 | 0 | schemas.push_back(std::move(schema_element)); |
460 | 0 | return ParquetColumnSchema(name, type, max_define, max_repeat, schema_idx, 0); |
461 | 0 | } |
462 | | |
463 | | unique_ptr<ColumnWriter> |
464 | | ColumnWriter::CreateWriterRecursive(ClientContext &context, ParquetWriter &writer, |
465 | | const vector<duckdb_parquet::SchemaElement> &parquet_schemas, |
466 | 0 | const ParquetColumnSchema &schema, vector<string> path_in_schema) { |
467 | 0 | auto &type = schema.type; |
468 | 0 | auto can_have_nulls = parquet_schemas[schema.schema_index].repetition_type == FieldRepetitionType::OPTIONAL; |
469 | 0 | path_in_schema.push_back(schema.name); |
470 | |
|
471 | 0 | if (type.id() == LogicalTypeId::STRUCT && type.GetAlias() == "PARQUET_VARIANT") { |
472 | 0 | vector<unique_ptr<ColumnWriter>> child_writers; |
473 | 0 | child_writers.reserve(schema.children.size()); |
474 | 0 | for (idx_t i = 0; i < schema.children.size(); i++) { |
475 | 0 | child_writers.push_back( |
476 | 0 | CreateWriterRecursive(context, writer, parquet_schemas, schema.children[i], path_in_schema)); |
477 | 0 | } |
478 | 0 | return make_uniq<VariantColumnWriter>(writer, schema, path_in_schema, std::move(child_writers), can_have_nulls); |
479 | 0 | } |
480 | | |
481 | 0 | if (type.id() == LogicalTypeId::STRUCT || type.id() == LogicalTypeId::UNION) { |
482 | | // construct the child writers recursively |
483 | 0 | vector<unique_ptr<ColumnWriter>> child_writers; |
484 | 0 | child_writers.reserve(schema.children.size()); |
485 | 0 | for (auto &child_column : schema.children) { |
486 | 0 | child_writers.push_back( |
487 | 0 | CreateWriterRecursive(context, writer, parquet_schemas, child_column, path_in_schema)); |
488 | 0 | } |
489 | 0 | return make_uniq<StructColumnWriter>(writer, schema, std::move(path_in_schema), std::move(child_writers), |
490 | 0 | can_have_nulls); |
491 | 0 | } |
492 | 0 | if (type.id() == LogicalTypeId::LIST || type.id() == LogicalTypeId::ARRAY) { |
493 | 0 | auto is_list = type.id() == LogicalTypeId::LIST; |
494 | 0 | path_in_schema.push_back("list"); |
495 | 0 | auto child_writer = CreateWriterRecursive(context, writer, parquet_schemas, schema.children[0], path_in_schema); |
496 | 0 | if (is_list) { |
497 | 0 | return make_uniq<ListColumnWriter>(writer, schema, std::move(path_in_schema), std::move(child_writer), |
498 | 0 | can_have_nulls); |
499 | 0 | } else { |
500 | 0 | return make_uniq<ArrayColumnWriter>(writer, schema, std::move(path_in_schema), std::move(child_writer), |
501 | 0 | can_have_nulls); |
502 | 0 | } |
503 | 0 | } |
504 | 0 | if (type.id() == LogicalTypeId::MAP) { |
505 | 0 | path_in_schema.push_back("key_value"); |
506 | | // construct the child types recursively |
507 | 0 | vector<unique_ptr<ColumnWriter>> child_writers; |
508 | 0 | child_writers.reserve(2); |
509 | 0 | for (idx_t i = 0; i < 2; i++) { |
510 | | // key needs to be marked as REQUIRED |
511 | 0 | auto child_writer = |
512 | 0 | CreateWriterRecursive(context, writer, parquet_schemas, schema.children[i], path_in_schema); |
513 | 0 | child_writers.push_back(std::move(child_writer)); |
514 | 0 | } |
515 | 0 | auto struct_writer = |
516 | 0 | make_uniq<StructColumnWriter>(writer, schema, path_in_schema, std::move(child_writers), can_have_nulls); |
517 | 0 | return make_uniq<ListColumnWriter>(writer, schema, path_in_schema, std::move(struct_writer), can_have_nulls); |
518 | 0 | } |
519 | | |
520 | 0 | switch (type.id()) { |
521 | 0 | case LogicalTypeId::BOOLEAN: |
522 | 0 | return make_uniq<BooleanColumnWriter>(writer, schema, std::move(path_in_schema), can_have_nulls); |
523 | 0 | case LogicalTypeId::TINYINT: |
524 | 0 | return make_uniq<StandardColumnWriter<int8_t, int32_t>>(writer, schema, std::move(path_in_schema), |
525 | 0 | can_have_nulls); |
526 | 0 | case LogicalTypeId::SMALLINT: |
527 | 0 | return make_uniq<StandardColumnWriter<int16_t, int32_t>>(writer, schema, std::move(path_in_schema), |
528 | 0 | can_have_nulls); |
529 | 0 | case LogicalTypeId::INTEGER: |
530 | 0 | case LogicalTypeId::DATE: |
531 | 0 | return make_uniq<StandardColumnWriter<int32_t, int32_t>>(writer, schema, std::move(path_in_schema), |
532 | 0 | can_have_nulls); |
533 | 0 | case LogicalTypeId::BIGINT: |
534 | 0 | case LogicalTypeId::TIME: |
535 | 0 | case LogicalTypeId::TIMESTAMP: |
536 | 0 | case LogicalTypeId::TIMESTAMP_TZ: |
537 | 0 | case LogicalTypeId::TIMESTAMP_MS: |
538 | 0 | return make_uniq<StandardColumnWriter<int64_t, int64_t>>(writer, schema, std::move(path_in_schema), |
539 | 0 | can_have_nulls); |
540 | 0 | case LogicalTypeId::TIME_TZ: |
541 | 0 | return make_uniq<StandardColumnWriter<dtime_tz_t, int64_t, ParquetTimeTZOperator>>( |
542 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
543 | 0 | case LogicalTypeId::HUGEINT: |
544 | 0 | return make_uniq<StandardColumnWriter<hugeint_t, double, ParquetHugeintOperator>>( |
545 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
546 | 0 | case LogicalTypeId::UHUGEINT: |
547 | 0 | return make_uniq<StandardColumnWriter<uhugeint_t, double, ParquetUhugeintOperator>>( |
548 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
549 | 0 | case LogicalTypeId::TIMESTAMP_NS: |
550 | 0 | return make_uniq<StandardColumnWriter<int64_t, int64_t, ParquetTimestampNSOperator>>( |
551 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
552 | 0 | case LogicalTypeId::TIMESTAMP_SEC: |
553 | 0 | return make_uniq<StandardColumnWriter<int64_t, int64_t, ParquetTimestampSOperator>>( |
554 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
555 | 0 | case LogicalTypeId::UTINYINT: |
556 | 0 | return make_uniq<StandardColumnWriter<uint8_t, int32_t>>(writer, schema, std::move(path_in_schema), |
557 | 0 | can_have_nulls); |
558 | 0 | case LogicalTypeId::USMALLINT: |
559 | 0 | return make_uniq<StandardColumnWriter<uint16_t, int32_t>>(writer, schema, std::move(path_in_schema), |
560 | 0 | can_have_nulls); |
561 | 0 | case LogicalTypeId::UINTEGER: |
562 | 0 | return make_uniq<StandardColumnWriter<uint32_t, uint32_t>>(writer, schema, std::move(path_in_schema), |
563 | 0 | can_have_nulls); |
564 | 0 | case LogicalTypeId::UBIGINT: |
565 | 0 | return make_uniq<StandardColumnWriter<uint64_t, uint64_t>>(writer, schema, std::move(path_in_schema), |
566 | 0 | can_have_nulls); |
567 | 0 | case LogicalTypeId::FLOAT: |
568 | 0 | return make_uniq<StandardColumnWriter<float_na_equal, float, FloatingPointOperator>>( |
569 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
570 | 0 | case LogicalTypeId::DOUBLE: |
571 | 0 | return make_uniq<StandardColumnWriter<double_na_equal, double, FloatingPointOperator>>( |
572 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
573 | 0 | case LogicalTypeId::DECIMAL: |
574 | 0 | switch (type.InternalType()) { |
575 | 0 | case PhysicalType::INT16: |
576 | 0 | return make_uniq<StandardColumnWriter<int16_t, int32_t>>(writer, schema, std::move(path_in_schema), |
577 | 0 | can_have_nulls); |
578 | 0 | case PhysicalType::INT32: |
579 | 0 | return make_uniq<StandardColumnWriter<int32_t, int32_t>>(writer, schema, std::move(path_in_schema), |
580 | 0 | can_have_nulls); |
581 | 0 | case PhysicalType::INT64: |
582 | 0 | return make_uniq<StandardColumnWriter<int64_t, int64_t>>(writer, schema, std::move(path_in_schema), |
583 | 0 | can_have_nulls); |
584 | 0 | default: |
585 | 0 | return make_uniq<FixedDecimalColumnWriter>(writer, schema, std::move(path_in_schema), can_have_nulls); |
586 | 0 | } |
587 | 0 | case LogicalTypeId::BLOB: |
588 | 0 | return make_uniq<StandardColumnWriter<string_t, string_t, ParquetBlobOperator>>( |
589 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
590 | 0 | case LogicalTypeId::GEOMETRY: |
591 | 0 | return make_uniq<StandardColumnWriter<string_t, string_t, ParquetGeometryOperator>>( |
592 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
593 | 0 | case LogicalTypeId::VARCHAR: |
594 | 0 | return make_uniq<StandardColumnWriter<string_t, string_t, ParquetStringOperator>>( |
595 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
596 | 0 | case LogicalTypeId::UUID: |
597 | 0 | return make_uniq<StandardColumnWriter<hugeint_t, ParquetUUIDTargetType, ParquetUUIDOperator>>( |
598 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
599 | 0 | case LogicalTypeId::INTERVAL: |
600 | 0 | return make_uniq<StandardColumnWriter<interval_t, ParquetIntervalTargetType, ParquetIntervalOperator>>( |
601 | 0 | writer, schema, std::move(path_in_schema), can_have_nulls); |
602 | 0 | case LogicalTypeId::ENUM: |
603 | 0 | return make_uniq<EnumColumnWriter>(writer, schema, std::move(path_in_schema), can_have_nulls); |
604 | 0 | default: |
605 | 0 | throw InternalException("Unsupported type \"%s\" in Parquet writer", type.ToString()); |
606 | 0 | } |
607 | 0 | } |
608 | | |
609 | | template <> |
610 | | struct NumericLimits<float_na_equal> { |
611 | 0 | static constexpr float Minimum() { |
612 | 0 | return std::numeric_limits<float>::lowest(); |
613 | 0 | }; |
614 | 0 | static constexpr float Maximum() { |
615 | 0 | return std::numeric_limits<float>::max(); |
616 | 0 | }; |
617 | 0 | static constexpr bool IsSigned() { |
618 | 0 | return std::is_signed<float>::value; |
619 | 0 | } |
620 | 0 | static constexpr bool IsIntegral() { |
621 | 0 | return std::is_integral<float>::value; |
622 | 0 | } |
623 | | }; |
624 | | |
625 | | template <> |
626 | | struct NumericLimits<double_na_equal> { |
627 | 0 | static constexpr double Minimum() { |
628 | 0 | return std::numeric_limits<double>::lowest(); |
629 | 0 | }; |
630 | 0 | static constexpr double Maximum() { |
631 | 0 | return std::numeric_limits<double>::max(); |
632 | 0 | }; |
633 | 0 | static constexpr bool IsSigned() { |
634 | 0 | return std::is_signed<double>::value; |
635 | 0 | } |
636 | 0 | static constexpr bool IsIntegral() { |
637 | 0 | return std::is_integral<double>::value; |
638 | 0 | } |
639 | | }; |
640 | | |
641 | | template <> |
642 | 0 | hash_t Hash(ParquetIntervalTargetType val) { |
643 | 0 | return Hash(const_char_ptr_cast(val.bytes), ParquetIntervalTargetType::PARQUET_INTERVAL_SIZE); |
644 | 0 | } |
645 | | |
646 | | template <> |
647 | 0 | hash_t Hash(ParquetUUIDTargetType val) { |
648 | 0 | return Hash(const_char_ptr_cast(val.bytes), ParquetUUIDTargetType::PARQUET_UUID_SIZE); |
649 | 0 | } |
650 | | |
651 | | template <> |
652 | 0 | hash_t Hash(float_na_equal val) { |
653 | 0 | if (std::isnan(val.val)) { |
654 | 0 | return Hash<float>(std::numeric_limits<float>::quiet_NaN()); |
655 | 0 | } |
656 | 0 | return Hash<float>(val.val); |
657 | 0 | } |
658 | | |
659 | | template <> |
660 | 0 | hash_t Hash(double_na_equal val) { |
661 | 0 | if (std::isnan(val.val)) { |
662 | 0 | return Hash<double>(std::numeric_limits<double>::quiet_NaN()); |
663 | 0 | } |
664 | 0 | return Hash<double>(val.val); |
665 | 0 | } |
666 | | |
667 | | } // namespace duckdb |