Coverage Report

Created: 2025-11-15 07:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/extension/parquet/column_writer.cpp
Line
Count
Source
1
#include "column_writer.hpp"
2
3
#include "duckdb.hpp"
4
#include "parquet_geometry.hpp"
5
#include "parquet_rle_bp_decoder.hpp"
6
#include "parquet_bss_encoder.hpp"
7
#include "parquet_statistics.hpp"
8
#include "parquet_writer.hpp"
9
#include "writer/array_column_writer.hpp"
10
#include "writer/boolean_column_writer.hpp"
11
#include "writer/decimal_column_writer.hpp"
12
#include "writer/enum_column_writer.hpp"
13
#include "writer/list_column_writer.hpp"
14
#include "writer/primitive_column_writer.hpp"
15
#include "writer/struct_column_writer.hpp"
16
#include "writer/variant_column_writer.hpp"
17
#include "writer/templated_column_writer.hpp"
18
#include "duckdb/common/exception.hpp"
19
#include "duckdb/common/operator/comparison_operators.hpp"
20
#include "duckdb/common/serializer/buffered_file_writer.hpp"
21
#include "duckdb/common/serializer/memory_stream.hpp"
22
#include "duckdb/common/serializer/write_stream.hpp"
23
#include "duckdb/common/string_map_set.hpp"
24
#include "duckdb/common/types/hugeint.hpp"
25
#include "duckdb/common/types/time.hpp"
26
#include "duckdb/common/types/timestamp.hpp"
27
#include "duckdb/execution/expression_executor.hpp"
28
29
#include "brotli/encode.h"
30
#include "lz4.hpp"
31
#include "miniz_wrapper.hpp"
32
#include "snappy.h"
33
#include "zstd.h"
34
35
#include <cmath>
36
37
namespace duckdb {
38
39
using namespace duckdb_parquet; // NOLINT
40
using namespace duckdb_miniz;   // NOLINT
41
42
using duckdb_parquet::CompressionCodec;
43
using duckdb_parquet::ConvertedType;
44
using duckdb_parquet::Encoding;
45
using duckdb_parquet::FieldRepetitionType;
46
using duckdb_parquet::FileMetaData;
47
using duckdb_parquet::PageHeader;
48
using duckdb_parquet::PageType;
49
using ParquetRowGroup = duckdb_parquet::RowGroup;
50
using duckdb_parquet::Type;
51
52
constexpr uint16_t ColumnWriter::PARQUET_DEFINE_VALID;
53
54
//===--------------------------------------------------------------------===//
55
// ColumnWriterStatistics
56
//===--------------------------------------------------------------------===//
57
0
ColumnWriterStatistics::~ColumnWriterStatistics() {
58
0
}
59
60
0
bool ColumnWriterStatistics::HasStats() {
61
0
  return false;
62
0
}
63
64
0
string ColumnWriterStatistics::GetMin() {
65
0
  return string();
66
0
}
67
68
0
string ColumnWriterStatistics::GetMax() {
69
0
  return string();
70
0
}
71
72
0
string ColumnWriterStatistics::GetMinValue() {
73
0
  return string();
74
0
}
75
76
0
string ColumnWriterStatistics::GetMaxValue() {
77
0
  return string();
78
0
}
79
80
0
bool ColumnWriterStatistics::CanHaveNaN() {
81
0
  return false;
82
0
}
83
84
0
bool ColumnWriterStatistics::HasNaN() {
85
0
  return false;
86
0
}
87
88
0
bool ColumnWriterStatistics::MinIsExact() {
89
0
  return true;
90
0
}
91
92
0
bool ColumnWriterStatistics::MaxIsExact() {
93
0
  return true;
94
0
}
95
96
0
bool ColumnWriterStatistics::HasGeoStats() {
97
0
  return false;
98
0
}
99
100
0
optional_ptr<GeometryStatsData> ColumnWriterStatistics::GetGeoStats() {
101
0
  return nullptr;
102
0
}
103
104
0
void ColumnWriterStatistics::WriteGeoStats(duckdb_parquet::GeospatialStatistics &stats) {
105
0
  D_ASSERT(false); // this should never be called
106
0
}
107
108
//===--------------------------------------------------------------------===//
109
// ColumnWriter
110
//===--------------------------------------------------------------------===//
111
ColumnWriter::ColumnWriter(ParquetWriter &writer, const ParquetColumnSchema &column_schema,
112
                           vector<string> schema_path_p, bool can_have_nulls)
113
0
    : writer(writer), column_schema(column_schema), schema_path(std::move(schema_path_p)),
114
0
      can_have_nulls(can_have_nulls) {
115
0
}
116
0
ColumnWriter::~ColumnWriter() {
117
0
}
118
119
0
ColumnWriterState::~ColumnWriterState() {
120
0
}
121
122
void ColumnWriter::CompressPage(MemoryStream &temp_writer, size_t &compressed_size, data_ptr_t &compressed_data,
123
0
                                AllocatedData &compressed_buf) {
124
0
  switch (writer.GetCodec()) {
125
0
  case CompressionCodec::UNCOMPRESSED:
126
0
    compressed_size = temp_writer.GetPosition();
127
0
    compressed_data = temp_writer.GetData();
128
0
    break;
129
130
0
  case CompressionCodec::SNAPPY: {
131
0
    compressed_size = duckdb_snappy::MaxCompressedLength(temp_writer.GetPosition());
132
0
    compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size);
133
0
    duckdb_snappy::RawCompress(const_char_ptr_cast(temp_writer.GetData()), temp_writer.GetPosition(),
134
0
                               char_ptr_cast(compressed_buf.get()), &compressed_size);
135
0
    compressed_data = compressed_buf.get();
136
0
    D_ASSERT(compressed_size <= duckdb_snappy::MaxCompressedLength(temp_writer.GetPosition()));
137
0
    break;
138
0
  }
139
0
  case CompressionCodec::LZ4_RAW: {
140
0
    compressed_size = duckdb_lz4::LZ4_compressBound(UnsafeNumericCast<int32_t>(temp_writer.GetPosition()));
141
0
    compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size);
142
0
    compressed_size = duckdb_lz4::LZ4_compress_default(
143
0
        const_char_ptr_cast(temp_writer.GetData()), char_ptr_cast(compressed_buf.get()),
144
0
        UnsafeNumericCast<int32_t>(temp_writer.GetPosition()), UnsafeNumericCast<int32_t>(compressed_size));
145
0
    compressed_data = compressed_buf.get();
146
0
    break;
147
0
  }
148
0
  case CompressionCodec::GZIP: {
149
0
    MiniZStream s;
150
0
    compressed_size = s.MaxCompressedLength(temp_writer.GetPosition());
151
0
    compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size);
152
0
    s.Compress(const_char_ptr_cast(temp_writer.GetData()), temp_writer.GetPosition(),
153
0
               char_ptr_cast(compressed_buf.get()), &compressed_size);
154
0
    compressed_data = compressed_buf.get();
155
0
    break;
156
0
  }
157
0
  case CompressionCodec::ZSTD: {
158
0
    compressed_size = duckdb_zstd::ZSTD_compressBound(temp_writer.GetPosition());
159
0
    compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size);
160
0
    compressed_size = duckdb_zstd::ZSTD_compress((void *)compressed_buf.get(), compressed_size,
161
0
                                                 (const void *)temp_writer.GetData(), temp_writer.GetPosition(),
162
0
                                                 UnsafeNumericCast<int32_t>(writer.CompressionLevel()));
163
0
    compressed_data = compressed_buf.get();
164
0
    break;
165
0
  }
166
0
  case CompressionCodec::BROTLI: {
167
0
    compressed_size = duckdb_brotli::BrotliEncoderMaxCompressedSize(temp_writer.GetPosition());
168
0
    compressed_buf = BufferAllocator::Get(writer.GetContext()).Allocate(compressed_size);
169
0
    duckdb_brotli::BrotliEncoderCompress(BROTLI_DEFAULT_QUALITY, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE,
170
0
                                         temp_writer.GetPosition(), temp_writer.GetData(), &compressed_size,
171
0
                                         compressed_buf.get());
172
0
    compressed_data = compressed_buf.get();
173
0
    break;
174
0
  }
175
0
  default:
176
0
    throw InternalException("Unsupported codec for Parquet Writer");
177
0
  }
178
179
0
  if (compressed_size > idx_t(NumericLimits<int32_t>::Maximum())) {
180
0
    throw InternalException("Parquet writer: %d compressed page size out of range for type integer",
181
0
                            temp_writer.GetPosition());
182
0
  }
183
0
}
184
185
0
void ColumnWriter::HandleRepeatLevels(ColumnWriterState &state, ColumnWriterState *parent, idx_t count) const {
186
0
  if (!parent) {
187
    // no repeat levels without a parent node
188
0
    return;
189
0
  }
190
0
  if (state.repetition_levels.size() >= parent->repetition_levels.size()) {
191
0
    return;
192
0
  }
193
0
  state.repetition_levels.insert(state.repetition_levels.end(),
194
0
                                 parent->repetition_levels.begin() + state.repetition_levels.size(),
195
0
                                 parent->repetition_levels.end());
196
0
}
197
198
void ColumnWriter::HandleDefineLevels(ColumnWriterState &state, ColumnWriterState *parent, const ValidityMask &validity,
199
0
                                      const idx_t count, const uint16_t define_value, const uint16_t null_value) const {
200
0
  if (parent) {
201
    // parent node: inherit definition level from the parent
202
0
    idx_t vector_index = 0;
203
0
    while (state.definition_levels.size() < parent->definition_levels.size()) {
204
0
      idx_t current_index = state.definition_levels.size();
205
0
      if (parent->definition_levels[current_index] != PARQUET_DEFINE_VALID) {
206
        //! Inherit nulls from parent
207
0
        state.definition_levels.push_back(parent->definition_levels[current_index]);
208
0
        state.parent_null_count++;
209
0
      } else if (validity.RowIsValid(vector_index)) {
210
        //! Produce a non-null define
211
0
        state.definition_levels.push_back(define_value);
212
0
      } else {
213
        //! Produce a null define
214
0
        if (!can_have_nulls) {
215
0
          throw IOException("Parquet writer: map key column is not allowed to contain NULL values");
216
0
        }
217
0
        state.null_count++;
218
0
        state.definition_levels.push_back(null_value);
219
0
      }
220
0
      D_ASSERT(parent->is_empty.empty() || current_index < parent->is_empty.size());
221
0
      if (parent->is_empty.empty() || !parent->is_empty[current_index]) {
222
0
        vector_index++;
223
0
      }
224
0
    }
225
0
    return;
226
0
  }
227
228
  // no parent: set definition levels only from this validity mask
229
0
  if (validity.AllValid()) {
230
0
    state.definition_levels.insert(state.definition_levels.end(), count, define_value);
231
0
  } else {
232
0
    for (idx_t i = 0; i < count; i++) {
233
0
      const auto is_null = !validity.RowIsValid(i);
234
0
      state.definition_levels.emplace_back(is_null ? null_value : define_value);
235
0
      state.null_count += is_null;
236
0
    }
237
0
  }
238
0
  if (!can_have_nulls && state.null_count != 0) {
239
0
    throw IOException("Parquet writer: map key column is not allowed to contain NULL values");
240
0
  }
241
0
}
242
243
//===--------------------------------------------------------------------===//
244
// Create Column Writer
245
//===--------------------------------------------------------------------===//
246
247
ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::SchemaElement> &schemas,
248
                                                    const LogicalType &type, const string &name, bool allow_geometry,
249
                                                    optional_ptr<const ChildFieldIDs> field_ids,
250
                                                    optional_ptr<const ShreddingType> shredding_types, idx_t max_repeat,
251
0
                                                    idx_t max_define, bool can_have_nulls) {
252
0
  auto null_type = can_have_nulls ? FieldRepetitionType::OPTIONAL : FieldRepetitionType::REQUIRED;
253
0
  if (!can_have_nulls) {
254
0
    max_define--;
255
0
  }
256
0
  idx_t schema_idx = schemas.size();
257
258
0
  optional_ptr<const FieldID> field_id;
259
0
  optional_ptr<const ChildFieldIDs> child_field_ids;
260
0
  if (field_ids) {
261
0
    auto field_id_it = field_ids->ids->find(name);
262
0
    if (field_id_it != field_ids->ids->end()) {
263
0
      field_id = &field_id_it->second;
264
0
      child_field_ids = &field_id->child_field_ids;
265
0
    }
266
0
  }
267
0
  optional_ptr<const ShreddingType> shredding_type;
268
0
  if (shredding_types) {
269
0
    shredding_type = shredding_types->GetChild(name);
270
0
  }
271
272
0
  if (type.id() == LogicalTypeId::STRUCT && type.GetAlias() == "PARQUET_VARIANT") {
273
    // variant type
274
    // variants are stored as follows:
275
    // group <name> VARIANT {
276
    //  metadata BYTE_ARRAY,
277
    //  value BYTE_ARRAY,
278
    //  [<typed_value>]
279
    // }
280
281
0
    const bool is_shredded = shredding_type != nullptr;
282
283
0
    child_list_t<LogicalType> child_types;
284
0
    child_types.emplace_back("metadata", LogicalType::BLOB);
285
0
    child_types.emplace_back("value", LogicalType::BLOB);
286
0
    if (is_shredded) {
287
0
      auto &typed_value_type = shredding_type->type;
288
0
      if (typed_value_type.id() != LogicalTypeId::ANY) {
289
0
        child_types.emplace_back("typed_value",
290
0
                                 VariantColumnWriter::TransformTypedValueRecursive(typed_value_type));
291
0
      }
292
0
    }
293
294
    // variant group
295
0
    duckdb_parquet::SchemaElement top_element;
296
0
    top_element.repetition_type = null_type;
297
0
    top_element.num_children = child_types.size();
298
0
    top_element.logicalType.__isset.VARIANT = true;
299
0
    top_element.logicalType.VARIANT.__isset.specification_version = true;
300
0
    top_element.logicalType.VARIANT.specification_version = 1;
301
0
    top_element.__isset.logicalType = true;
302
0
    top_element.__isset.num_children = true;
303
0
    top_element.__isset.repetition_type = true;
304
0
    top_element.name = name;
305
0
    schemas.push_back(std::move(top_element));
306
307
0
    ParquetColumnSchema variant_column(name, type, max_define, max_repeat, schema_idx, 0);
308
0
    variant_column.children.reserve(child_types.size());
309
0
    for (auto &child_type : child_types) {
310
0
      auto &child_name = child_type.first;
311
0
      bool is_optional;
312
0
      if (child_name == "metadata") {
313
0
        is_optional = false;
314
0
      } else if (child_name == "value") {
315
0
        if (is_shredded) {
316
          //! When shredding the variant, the 'value' becomes optional
317
0
          is_optional = true;
318
0
        } else {
319
0
          is_optional = false;
320
0
        }
321
0
      } else {
322
0
        D_ASSERT(child_name == "typed_value");
323
0
        is_optional = true;
324
0
      }
325
0
      variant_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first,
326
0
                                                             allow_geometry, child_field_ids, shredding_type,
327
0
                                                             max_repeat, max_define + 1, is_optional));
328
0
    }
329
0
    return variant_column;
330
0
  }
331
332
0
  if (type.id() == LogicalTypeId::STRUCT || type.id() == LogicalTypeId::UNION) {
333
0
    auto &child_types = StructType::GetChildTypes(type);
334
    // set up the schema element for this struct
335
0
    duckdb_parquet::SchemaElement schema_element;
336
0
    schema_element.repetition_type = null_type;
337
0
    schema_element.num_children = UnsafeNumericCast<int32_t>(child_types.size());
338
0
    schema_element.__isset.num_children = true;
339
0
    schema_element.__isset.type = false;
340
0
    schema_element.__isset.repetition_type = true;
341
0
    schema_element.name = name;
342
0
    if (field_id && field_id->set) {
343
0
      schema_element.__isset.field_id = true;
344
0
      schema_element.field_id = field_id->field_id;
345
0
    }
346
0
    schemas.push_back(std::move(schema_element));
347
348
0
    ParquetColumnSchema struct_column(name, type, max_define, max_repeat, schema_idx, 0);
349
    // construct the child schemas recursively
350
0
    struct_column.children.reserve(child_types.size());
351
0
    for (auto &child_type : child_types) {
352
0
      struct_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first,
353
0
                                                            allow_geometry, child_field_ids, shredding_type,
354
0
                                                            max_repeat, max_define + 1, true));
355
0
    }
356
0
    return struct_column;
357
0
  }
358
0
  if (type.id() == LogicalTypeId::LIST || type.id() == LogicalTypeId::ARRAY) {
359
0
    auto is_list = type.id() == LogicalTypeId::LIST;
360
0
    auto &child_type = is_list ? ListType::GetChildType(type) : ArrayType::GetChildType(type);
361
    // set up the two schema elements for the list
362
    // for some reason we only set the converted type in the OPTIONAL element
363
    // first an OPTIONAL element
364
0
    duckdb_parquet::SchemaElement optional_element;
365
0
    optional_element.repetition_type = null_type;
366
0
    optional_element.num_children = 1;
367
0
    optional_element.converted_type = ConvertedType::LIST;
368
0
    optional_element.__isset.num_children = true;
369
0
    optional_element.__isset.type = false;
370
0
    optional_element.__isset.repetition_type = true;
371
0
    optional_element.__isset.converted_type = true;
372
0
    optional_element.name = name;
373
0
    if (field_id && field_id->set) {
374
0
      optional_element.__isset.field_id = true;
375
0
      optional_element.field_id = field_id->field_id;
376
0
    }
377
0
    schemas.push_back(std::move(optional_element));
378
379
    // then a REPEATED element
380
0
    duckdb_parquet::SchemaElement repeated_element;
381
0
    repeated_element.repetition_type = FieldRepetitionType::REPEATED;
382
0
    repeated_element.num_children = 1;
383
0
    repeated_element.__isset.num_children = true;
384
0
    repeated_element.__isset.type = false;
385
0
    repeated_element.__isset.repetition_type = true;
386
0
    repeated_element.name = "list";
387
0
    schemas.push_back(std::move(repeated_element));
388
389
0
    ParquetColumnSchema list_column(name, type, max_define, max_repeat, schema_idx, 0);
390
0
    list_column.children.push_back(FillParquetSchema(schemas, child_type, "element", allow_geometry,
391
0
                                                     child_field_ids, shredding_type, max_repeat + 1,
392
0
                                                     max_define + 2, true));
393
0
    return list_column;
394
0
  }
395
0
  if (type.id() == LogicalTypeId::MAP) {
396
    // map type
397
    // maps are stored as follows:
398
    // <map-repetition> group <name> (MAP) {
399
    //  repeated group key_value {
400
    //    required <key-type> key;
401
    //    <value-repetition> <value-type> value;
402
    //  }
403
    // }
404
    // top map element
405
0
    duckdb_parquet::SchemaElement top_element;
406
0
    top_element.repetition_type = null_type;
407
0
    top_element.num_children = 1;
408
0
    top_element.converted_type = ConvertedType::MAP;
409
0
    top_element.__isset.repetition_type = true;
410
0
    top_element.__isset.num_children = true;
411
0
    top_element.__isset.converted_type = true;
412
0
    top_element.__isset.type = false;
413
0
    top_element.name = name;
414
0
    if (field_id && field_id->set) {
415
0
      top_element.__isset.field_id = true;
416
0
      top_element.field_id = field_id->field_id;
417
0
    }
418
0
    schemas.push_back(std::move(top_element));
419
420
    // key_value element
421
0
    duckdb_parquet::SchemaElement kv_element;
422
0
    kv_element.repetition_type = FieldRepetitionType::REPEATED;
423
0
    kv_element.num_children = 2;
424
0
    kv_element.__isset.repetition_type = true;
425
0
    kv_element.__isset.num_children = true;
426
0
    kv_element.__isset.type = false;
427
0
    kv_element.name = "key_value";
428
0
    schemas.push_back(std::move(kv_element));
429
430
    // construct the child types recursively
431
0
    vector<LogicalType> kv_types {MapType::KeyType(type), MapType::ValueType(type)};
432
0
    vector<string> kv_names {"key", "value"};
433
434
0
    ParquetColumnSchema map_column(name, type, max_define, max_repeat, schema_idx, 0);
435
0
    map_column.children.reserve(2);
436
0
    for (idx_t i = 0; i < 2; i++) {
437
      // key needs to be marked as REQUIRED
438
0
      bool is_key = i == 0;
439
0
      auto child_schema = FillParquetSchema(schemas, kv_types[i], kv_names[i], allow_geometry, child_field_ids,
440
0
                                            shredding_type, max_repeat + 1, max_define + 2, !is_key);
441
442
0
      map_column.children.push_back(std::move(child_schema));
443
0
    }
444
0
    return map_column;
445
0
  }
446
447
0
  duckdb_parquet::SchemaElement schema_element;
448
0
  schema_element.type = ParquetWriter::DuckDBTypeToParquetType(type);
449
0
  schema_element.repetition_type = null_type;
450
0
  schema_element.__isset.num_children = false;
451
0
  schema_element.__isset.type = true;
452
0
  schema_element.__isset.repetition_type = true;
453
0
  schema_element.name = name;
454
0
  if (field_id && field_id->set) {
455
0
    schema_element.__isset.field_id = true;
456
0
    schema_element.field_id = field_id->field_id;
457
0
  }
458
0
  ParquetWriter::SetSchemaProperties(type, schema_element, allow_geometry);
459
0
  schemas.push_back(std::move(schema_element));
460
0
  return ParquetColumnSchema(name, type, max_define, max_repeat, schema_idx, 0);
461
0
}
462
463
unique_ptr<ColumnWriter>
464
ColumnWriter::CreateWriterRecursive(ClientContext &context, ParquetWriter &writer,
465
                                    const vector<duckdb_parquet::SchemaElement> &parquet_schemas,
466
0
                                    const ParquetColumnSchema &schema, vector<string> path_in_schema) {
467
0
  auto &type = schema.type;
468
0
  auto can_have_nulls = parquet_schemas[schema.schema_index].repetition_type == FieldRepetitionType::OPTIONAL;
469
0
  path_in_schema.push_back(schema.name);
470
471
0
  if (type.id() == LogicalTypeId::STRUCT && type.GetAlias() == "PARQUET_VARIANT") {
472
0
    vector<unique_ptr<ColumnWriter>> child_writers;
473
0
    child_writers.reserve(schema.children.size());
474
0
    for (idx_t i = 0; i < schema.children.size(); i++) {
475
0
      child_writers.push_back(
476
0
          CreateWriterRecursive(context, writer, parquet_schemas, schema.children[i], path_in_schema));
477
0
    }
478
0
    return make_uniq<VariantColumnWriter>(writer, schema, path_in_schema, std::move(child_writers), can_have_nulls);
479
0
  }
480
481
0
  if (type.id() == LogicalTypeId::STRUCT || type.id() == LogicalTypeId::UNION) {
482
    // construct the child writers recursively
483
0
    vector<unique_ptr<ColumnWriter>> child_writers;
484
0
    child_writers.reserve(schema.children.size());
485
0
    for (auto &child_column : schema.children) {
486
0
      child_writers.push_back(
487
0
          CreateWriterRecursive(context, writer, parquet_schemas, child_column, path_in_schema));
488
0
    }
489
0
    return make_uniq<StructColumnWriter>(writer, schema, std::move(path_in_schema), std::move(child_writers),
490
0
                                         can_have_nulls);
491
0
  }
492
0
  if (type.id() == LogicalTypeId::LIST || type.id() == LogicalTypeId::ARRAY) {
493
0
    auto is_list = type.id() == LogicalTypeId::LIST;
494
0
    path_in_schema.push_back("list");
495
0
    auto child_writer = CreateWriterRecursive(context, writer, parquet_schemas, schema.children[0], path_in_schema);
496
0
    if (is_list) {
497
0
      return make_uniq<ListColumnWriter>(writer, schema, std::move(path_in_schema), std::move(child_writer),
498
0
                                         can_have_nulls);
499
0
    } else {
500
0
      return make_uniq<ArrayColumnWriter>(writer, schema, std::move(path_in_schema), std::move(child_writer),
501
0
                                          can_have_nulls);
502
0
    }
503
0
  }
504
0
  if (type.id() == LogicalTypeId::MAP) {
505
0
    path_in_schema.push_back("key_value");
506
    // construct the child types recursively
507
0
    vector<unique_ptr<ColumnWriter>> child_writers;
508
0
    child_writers.reserve(2);
509
0
    for (idx_t i = 0; i < 2; i++) {
510
      // key needs to be marked as REQUIRED
511
0
      auto child_writer =
512
0
          CreateWriterRecursive(context, writer, parquet_schemas, schema.children[i], path_in_schema);
513
0
      child_writers.push_back(std::move(child_writer));
514
0
    }
515
0
    auto struct_writer =
516
0
        make_uniq<StructColumnWriter>(writer, schema, path_in_schema, std::move(child_writers), can_have_nulls);
517
0
    return make_uniq<ListColumnWriter>(writer, schema, path_in_schema, std::move(struct_writer), can_have_nulls);
518
0
  }
519
520
0
  switch (type.id()) {
521
0
  case LogicalTypeId::BOOLEAN:
522
0
    return make_uniq<BooleanColumnWriter>(writer, schema, std::move(path_in_schema), can_have_nulls);
523
0
  case LogicalTypeId::TINYINT:
524
0
    return make_uniq<StandardColumnWriter<int8_t, int32_t>>(writer, schema, std::move(path_in_schema),
525
0
                                                            can_have_nulls);
526
0
  case LogicalTypeId::SMALLINT:
527
0
    return make_uniq<StandardColumnWriter<int16_t, int32_t>>(writer, schema, std::move(path_in_schema),
528
0
                                                             can_have_nulls);
529
0
  case LogicalTypeId::INTEGER:
530
0
  case LogicalTypeId::DATE:
531
0
    return make_uniq<StandardColumnWriter<int32_t, int32_t>>(writer, schema, std::move(path_in_schema),
532
0
                                                             can_have_nulls);
533
0
  case LogicalTypeId::BIGINT:
534
0
  case LogicalTypeId::TIME:
535
0
  case LogicalTypeId::TIMESTAMP:
536
0
  case LogicalTypeId::TIMESTAMP_TZ:
537
0
  case LogicalTypeId::TIMESTAMP_MS:
538
0
    return make_uniq<StandardColumnWriter<int64_t, int64_t>>(writer, schema, std::move(path_in_schema),
539
0
                                                             can_have_nulls);
540
0
  case LogicalTypeId::TIME_TZ:
541
0
    return make_uniq<StandardColumnWriter<dtime_tz_t, int64_t, ParquetTimeTZOperator>>(
542
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
543
0
  case LogicalTypeId::HUGEINT:
544
0
    return make_uniq<StandardColumnWriter<hugeint_t, double, ParquetHugeintOperator>>(
545
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
546
0
  case LogicalTypeId::UHUGEINT:
547
0
    return make_uniq<StandardColumnWriter<uhugeint_t, double, ParquetUhugeintOperator>>(
548
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
549
0
  case LogicalTypeId::TIMESTAMP_NS:
550
0
    return make_uniq<StandardColumnWriter<int64_t, int64_t, ParquetTimestampNSOperator>>(
551
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
552
0
  case LogicalTypeId::TIMESTAMP_SEC:
553
0
    return make_uniq<StandardColumnWriter<int64_t, int64_t, ParquetTimestampSOperator>>(
554
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
555
0
  case LogicalTypeId::UTINYINT:
556
0
    return make_uniq<StandardColumnWriter<uint8_t, int32_t>>(writer, schema, std::move(path_in_schema),
557
0
                                                             can_have_nulls);
558
0
  case LogicalTypeId::USMALLINT:
559
0
    return make_uniq<StandardColumnWriter<uint16_t, int32_t>>(writer, schema, std::move(path_in_schema),
560
0
                                                              can_have_nulls);
561
0
  case LogicalTypeId::UINTEGER:
562
0
    return make_uniq<StandardColumnWriter<uint32_t, uint32_t>>(writer, schema, std::move(path_in_schema),
563
0
                                                               can_have_nulls);
564
0
  case LogicalTypeId::UBIGINT:
565
0
    return make_uniq<StandardColumnWriter<uint64_t, uint64_t>>(writer, schema, std::move(path_in_schema),
566
0
                                                               can_have_nulls);
567
0
  case LogicalTypeId::FLOAT:
568
0
    return make_uniq<StandardColumnWriter<float_na_equal, float, FloatingPointOperator>>(
569
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
570
0
  case LogicalTypeId::DOUBLE:
571
0
    return make_uniq<StandardColumnWriter<double_na_equal, double, FloatingPointOperator>>(
572
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
573
0
  case LogicalTypeId::DECIMAL:
574
0
    switch (type.InternalType()) {
575
0
    case PhysicalType::INT16:
576
0
      return make_uniq<StandardColumnWriter<int16_t, int32_t>>(writer, schema, std::move(path_in_schema),
577
0
                                                               can_have_nulls);
578
0
    case PhysicalType::INT32:
579
0
      return make_uniq<StandardColumnWriter<int32_t, int32_t>>(writer, schema, std::move(path_in_schema),
580
0
                                                               can_have_nulls);
581
0
    case PhysicalType::INT64:
582
0
      return make_uniq<StandardColumnWriter<int64_t, int64_t>>(writer, schema, std::move(path_in_schema),
583
0
                                                               can_have_nulls);
584
0
    default:
585
0
      return make_uniq<FixedDecimalColumnWriter>(writer, schema, std::move(path_in_schema), can_have_nulls);
586
0
    }
587
0
  case LogicalTypeId::BLOB:
588
0
    return make_uniq<StandardColumnWriter<string_t, string_t, ParquetBlobOperator>>(
589
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
590
0
  case LogicalTypeId::GEOMETRY:
591
0
    return make_uniq<StandardColumnWriter<string_t, string_t, ParquetGeometryOperator>>(
592
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
593
0
  case LogicalTypeId::VARCHAR:
594
0
    return make_uniq<StandardColumnWriter<string_t, string_t, ParquetStringOperator>>(
595
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
596
0
  case LogicalTypeId::UUID:
597
0
    return make_uniq<StandardColumnWriter<hugeint_t, ParquetUUIDTargetType, ParquetUUIDOperator>>(
598
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
599
0
  case LogicalTypeId::INTERVAL:
600
0
    return make_uniq<StandardColumnWriter<interval_t, ParquetIntervalTargetType, ParquetIntervalOperator>>(
601
0
        writer, schema, std::move(path_in_schema), can_have_nulls);
602
0
  case LogicalTypeId::ENUM:
603
0
    return make_uniq<EnumColumnWriter>(writer, schema, std::move(path_in_schema), can_have_nulls);
604
0
  default:
605
0
    throw InternalException("Unsupported type \"%s\" in Parquet writer", type.ToString());
606
0
  }
607
0
}
608
609
template <>
610
struct NumericLimits<float_na_equal> {
611
0
  static constexpr float Minimum() {
612
0
    return std::numeric_limits<float>::lowest();
613
0
  };
614
0
  static constexpr float Maximum() {
615
0
    return std::numeric_limits<float>::max();
616
0
  };
617
0
  static constexpr bool IsSigned() {
618
0
    return std::is_signed<float>::value;
619
0
  }
620
0
  static constexpr bool IsIntegral() {
621
0
    return std::is_integral<float>::value;
622
0
  }
623
};
624
625
template <>
626
struct NumericLimits<double_na_equal> {
627
0
  static constexpr double Minimum() {
628
0
    return std::numeric_limits<double>::lowest();
629
0
  };
630
0
  static constexpr double Maximum() {
631
0
    return std::numeric_limits<double>::max();
632
0
  };
633
0
  static constexpr bool IsSigned() {
634
0
    return std::is_signed<double>::value;
635
0
  }
636
0
  static constexpr bool IsIntegral() {
637
0
    return std::is_integral<double>::value;
638
0
  }
639
};
640
641
template <>
642
0
hash_t Hash(ParquetIntervalTargetType val) {
643
0
  return Hash(const_char_ptr_cast(val.bytes), ParquetIntervalTargetType::PARQUET_INTERVAL_SIZE);
644
0
}
645
646
template <>
647
0
hash_t Hash(ParquetUUIDTargetType val) {
648
0
  return Hash(const_char_ptr_cast(val.bytes), ParquetUUIDTargetType::PARQUET_UUID_SIZE);
649
0
}
650
651
template <>
652
0
hash_t Hash(float_na_equal val) {
653
0
  if (std::isnan(val.val)) {
654
0
    return Hash<float>(std::numeric_limits<float>::quiet_NaN());
655
0
  }
656
0
  return Hash<float>(val.val);
657
0
}
658
659
template <>
660
0
hash_t Hash(double_na_equal val) {
661
0
  if (std::isnan(val.val)) {
662
0
    return Hash<double>(std::numeric_limits<double>::quiet_NaN());
663
0
  }
664
0
  return Hash<double>(val.val);
665
0
}
666
667
} // namespace duckdb