Coverage Report

Created: 2025-11-15 07:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/extension/parquet/writer/variant/convert_variant.cpp
Line
Count
Source
1
#include "writer/variant_column_writer.hpp"
2
#include "duckdb/common/types/variant.hpp"
3
#include "duckdb/planner/expression/bound_function_expression.hpp"
4
#include "duckdb/function/scalar/variant_utils.hpp"
5
#include "reader/variant/variant_binary_decoder.hpp"
6
#include "parquet_shredding.hpp"
7
#include "duckdb/common/types/decimal.hpp"
8
#include "duckdb/common/types/uuid.hpp"
9
10
namespace duckdb {
11
12
0
static idx_t CalculateByteLength(idx_t value) {
13
0
  if (value == 0) {
14
0
    return 1;
15
0
  }
16
0
  auto value_data = reinterpret_cast<data_ptr_t>(&value);
17
0
  idx_t irrelevant_bytes = 0;
18
  //! Check how many of the most significant bytes are 0
19
0
  for (idx_t i = sizeof(idx_t); i > 0 && value_data[i - 1] == 0; i--) {
20
0
    irrelevant_bytes++;
21
0
  }
22
0
  return sizeof(idx_t) - irrelevant_bytes;
23
0
}
24
25
0
static uint8_t EncodeMetadataHeader(idx_t byte_length) {
26
0
  D_ASSERT(byte_length <= 4);
27
28
0
  uint8_t header_byte = 0;
29
  //! Set 'version' to 1
30
0
  header_byte |= static_cast<uint8_t>(1);
31
  //! Set 'sorted_strings' to 1
32
0
  header_byte |= static_cast<uint8_t>(1) << 4;
33
  //! Set 'offset_size_minus_one' to byte_length-1
34
0
  header_byte |= (static_cast<uint8_t>(byte_length) - 1) << 6;
35
36
#ifdef DEBUG
37
  auto decoded_header = VariantMetadataHeader::FromHeaderByte(header_byte);
38
  D_ASSERT(decoded_header.offset_size == byte_length);
39
#endif
40
41
0
  return header_byte;
42
0
}
43
44
0
static void CreateMetadata(UnifiedVariantVectorData &variant, Vector &metadata, idx_t count) {
45
0
  auto &keys = variant.keys;
46
0
  auto keys_data = variant.keys_data;
47
48
  //! NOTE: the parquet variant is limited to a max dictionary size of NumericLimits<uint32_t>::Maximum()
49
  //! Whereas we can have NumericLimits<uint32_t>::Maximum() *per* string in DuckDB
50
0
  auto metadata_data = FlatVector::GetData<string_t>(metadata);
51
0
  for (idx_t row = 0; row < count; row++) {
52
0
    uint64_t dictionary_count = 0;
53
0
    if (variant.RowIsValid(row)) {
54
0
      auto list_entry = keys_data[keys.sel->get_index(row)];
55
0
      dictionary_count = list_entry.length;
56
0
    }
57
0
    idx_t dictionary_size = 0;
58
0
    for (idx_t i = 0; i < dictionary_count; i++) {
59
0
      auto &key = variant.GetKey(row, i);
60
0
      dictionary_size += key.GetSize();
61
0
    }
62
0
    if (dictionary_size >= NumericLimits<uint32_t>::Maximum()) {
63
0
      throw InvalidInputException("The total length of the dictionary exceeds a 4 byte value (uint32_t), failed "
64
0
                                  "to export VARIANT to Parquet");
65
0
    }
66
67
0
    auto byte_length = CalculateByteLength(dictionary_size);
68
0
    auto total_length = 1 + (byte_length * (dictionary_count + 2)) + dictionary_size;
69
70
0
    metadata_data[row] = StringVector::EmptyString(metadata, total_length);
71
0
    auto &metadata_blob = metadata_data[row];
72
0
    auto metadata_blob_data = metadata_blob.GetDataWriteable();
73
74
0
    metadata_blob_data[0] = EncodeMetadataHeader(byte_length);
75
0
    memcpy(metadata_blob_data + 1, reinterpret_cast<data_ptr_t>(&dictionary_count), byte_length);
76
77
0
    auto offset_ptr = metadata_blob_data + 1 + byte_length;
78
0
    auto string_ptr = metadata_blob_data + 1 + byte_length + ((dictionary_count + 1) * byte_length);
79
0
    idx_t total_offset = 0;
80
0
    for (idx_t i = 0; i < dictionary_count; i++) {
81
0
      memcpy(offset_ptr + (i * byte_length), reinterpret_cast<data_ptr_t>(&total_offset), byte_length);
82
0
      auto &key = variant.GetKey(row, i);
83
84
0
      memcpy(string_ptr + total_offset, key.GetData(), key.GetSize());
85
0
      total_offset += key.GetSize();
86
0
    }
87
0
    memcpy(offset_ptr + (dictionary_count * byte_length), reinterpret_cast<data_ptr_t>(&total_offset), byte_length);
88
0
    D_ASSERT(offset_ptr + ((dictionary_count + 1) * byte_length) == string_ptr);
89
0
    D_ASSERT(string_ptr + total_offset == metadata_blob_data + total_length);
90
0
    metadata_blob.SetSizeAndFinalize(total_length, total_length);
91
92
#ifdef DEBUG
93
    auto decoded_metadata = VariantMetadata(metadata_blob);
94
    D_ASSERT(decoded_metadata.strings.size() == dictionary_count);
95
    for (idx_t i = 0; i < dictionary_count; i++) {
96
      D_ASSERT(decoded_metadata.strings[i] == variant.GetKey(row, i).GetString());
97
    }
98
#endif
99
0
  }
100
0
}
101
102
namespace {
103
104
0
static unordered_set<VariantLogicalType> GetVariantType(const LogicalType &type) {
105
0
  if (type.id() == LogicalTypeId::ANY) {
106
0
    return {};
107
0
  }
108
0
  switch (type.id()) {
109
0
  case LogicalTypeId::STRUCT:
110
0
    return {VariantLogicalType::OBJECT};
111
0
  case LogicalTypeId::LIST:
112
0
    return {VariantLogicalType::ARRAY};
113
0
  case LogicalTypeId::BOOLEAN:
114
0
    return {VariantLogicalType::BOOL_TRUE, VariantLogicalType::BOOL_FALSE};
115
0
  case LogicalTypeId::TINYINT:
116
0
    return {VariantLogicalType::INT8};
117
0
  case LogicalTypeId::SMALLINT:
118
0
    return {VariantLogicalType::INT16};
119
0
  case LogicalTypeId::INTEGER:
120
0
    return {VariantLogicalType::INT32};
121
0
  case LogicalTypeId::BIGINT:
122
0
    return {VariantLogicalType::INT64};
123
0
  case LogicalTypeId::FLOAT:
124
0
    return {VariantLogicalType::FLOAT};
125
0
  case LogicalTypeId::DOUBLE:
126
0
    return {VariantLogicalType::DOUBLE};
127
0
  case LogicalTypeId::DECIMAL:
128
0
    return {VariantLogicalType::DECIMAL};
129
0
  case LogicalTypeId::DATE:
130
0
    return {VariantLogicalType::DATE};
131
0
  case LogicalTypeId::TIME:
132
0
    return {VariantLogicalType::TIME_MICROS};
133
0
  case LogicalTypeId::TIMESTAMP_TZ:
134
0
    return {VariantLogicalType::TIMESTAMP_MICROS_TZ};
135
0
  case LogicalTypeId::TIMESTAMP:
136
0
    return {VariantLogicalType::TIMESTAMP_MICROS};
137
0
  case LogicalTypeId::TIMESTAMP_NS:
138
0
    return {VariantLogicalType::TIMESTAMP_NANOS};
139
0
  case LogicalTypeId::BLOB:
140
0
    return {VariantLogicalType::BLOB};
141
0
  case LogicalTypeId::VARCHAR:
142
0
    return {VariantLogicalType::VARCHAR};
143
0
  case LogicalTypeId::UUID:
144
0
    return {VariantLogicalType::UUID};
145
0
  default:
146
0
    throw BinderException("Type '%s' can't be translated to a VARIANT type", type.ToString());
147
0
  }
148
0
}
149
150
struct ShreddingState {
151
public:
152
  explicit ShreddingState(const LogicalType &type, idx_t total_count)
153
0
      : type(type), shredded_sel(total_count), values_index_sel(total_count), result_sel(total_count) {
154
0
    variant_types = GetVariantType(type);
155
0
  }
156
157
public:
158
0
  bool ValueIsShredded(UnifiedVariantVectorData &variant, idx_t row, idx_t values_index) {
159
0
    auto type_id = variant.GetTypeId(row, values_index);
160
0
    if (!variant_types.count(type_id)) {
161
0
      return false;
162
0
    }
163
0
    if (type_id == VariantLogicalType::DECIMAL) {
164
0
      auto physical_type = type.InternalType();
165
0
      auto decimal_data = VariantUtils::DecodeDecimalData(variant, row, values_index);
166
0
      auto decimal_physical_type = decimal_data.GetPhysicalType();
167
0
      return physical_type == decimal_physical_type;
168
0
    }
169
0
    return true;
170
0
  }
171
0
  void SetShredded(idx_t row, idx_t values_index, idx_t result_idx) {
172
0
    shredded_sel[count] = row;
173
0
    values_index_sel[count] = values_index;
174
0
    result_sel[count] = result_idx;
175
0
    count++;
176
0
  }
177
0
  case_insensitive_string_set_t ObjectFields() {
178
0
    D_ASSERT(type.id() == LogicalTypeId::STRUCT);
179
0
    case_insensitive_string_set_t res;
180
0
    auto &child_types = StructType::GetChildTypes(type);
181
0
    for (auto &entry : child_types) {
182
0
      auto &type = entry.first;
183
0
      res.emplace(string_t(type.c_str(), type.size()));
184
0
    }
185
0
    return res;
186
0
  }
187
188
public:
189
  //! The type the field is shredded on
190
  const LogicalType &type;
191
  unordered_set<VariantLogicalType> variant_types;
192
  //! row that is shredded
193
  SelectionVector shredded_sel;
194
  //! 'values_index' of the shredded value
195
  SelectionVector values_index_sel;
196
  //! result row of the shredded value
197
  SelectionVector result_sel;
198
  //! The amount of rows that are shredded on
199
  idx_t count = 0;
200
};
201
202
} // namespace
203
204
vector<idx_t> GetChildIndices(const UnifiedVariantVectorData &variant, idx_t row, const VariantNestedData &nested_data,
205
0
                              optional_ptr<ShreddingState> shredding_state) {
206
0
  vector<idx_t> child_indices;
207
0
  if (!shredding_state || shredding_state->type.id() != LogicalTypeId::STRUCT) {
208
0
    for (idx_t i = 0; i < nested_data.child_count; i++) {
209
0
      child_indices.push_back(i);
210
0
    }
211
0
    return child_indices;
212
0
  }
213
  //! FIXME: The variant spec says that field names should be case-sensitive, not insensitive
214
0
  case_insensitive_string_set_t shredded_fields = shredding_state->ObjectFields();
215
216
0
  for (idx_t i = 0; i < nested_data.child_count; i++) {
217
0
    auto keys_index = variant.GetKeysIndex(row, i + nested_data.children_idx);
218
0
    auto &key = variant.GetKey(row, keys_index);
219
220
0
    if (shredded_fields.count(key)) {
221
      //! This field is shredded on, omit it from the value
222
0
      continue;
223
0
    }
224
0
    child_indices.push_back(i);
225
0
  }
226
0
  return child_indices;
227
0
}
228
229
static idx_t AnalyzeValueData(const UnifiedVariantVectorData &variant, idx_t row, uint32_t values_index,
230
0
                              vector<uint32_t> &offsets, optional_ptr<ShreddingState> shredding_state) {
231
0
  idx_t total_size = 0;
232
  //! Every value has at least a value header
233
0
  total_size++;
234
235
0
  idx_t offset_size = offsets.size();
236
0
  VariantLogicalType type_id = VariantLogicalType::VARIANT_NULL;
237
0
  if (variant.RowIsValid(row)) {
238
0
    type_id = variant.GetTypeId(row, values_index);
239
0
  }
240
0
  switch (type_id) {
241
0
  case VariantLogicalType::OBJECT: {
242
0
    auto nested_data = VariantUtils::DecodeNestedData(variant, row, values_index);
243
244
    //! Calculate value and key offsets for all children
245
0
    idx_t total_offset = 0;
246
0
    uint32_t highest_keys_index = 0;
247
248
0
    auto child_indices = GetChildIndices(variant, row, nested_data, shredding_state);
249
0
    if (nested_data.child_count && child_indices.empty()) {
250
      //! All fields of the object are shredded, omit the object entirely
251
0
      return 0;
252
0
    }
253
254
0
    auto num_elements = child_indices.size();
255
0
    offsets.resize(offset_size + num_elements + 1);
256
257
0
    for (idx_t entry = 0; entry < child_indices.size(); entry++) {
258
0
      auto i = child_indices[entry];
259
0
      auto keys_index = variant.GetKeysIndex(row, i + nested_data.children_idx);
260
0
      auto values_index = variant.GetValuesIndex(row, i + nested_data.children_idx);
261
0
      offsets[offset_size + entry] = total_offset;
262
263
0
      total_offset += AnalyzeValueData(variant, row, values_index, offsets, nullptr);
264
0
      highest_keys_index = MaxValue(highest_keys_index, keys_index);
265
0
    }
266
0
    offsets[offset_size + num_elements] = total_offset;
267
268
    //! Calculate the sizes for the objects value data
269
0
    auto field_id_size = CalculateByteLength(highest_keys_index);
270
0
    auto field_offset_size = CalculateByteLength(total_offset);
271
0
    const bool is_large = num_elements > NumericLimits<uint8_t>::Maximum();
272
273
    //! Now add the sizes for the objects value data
274
0
    if (is_large) {
275
0
      total_size += sizeof(uint32_t);
276
0
    } else {
277
0
      total_size += sizeof(uint8_t);
278
0
    }
279
0
    total_size += num_elements * field_id_size;
280
0
    total_size += (num_elements + 1) * field_offset_size;
281
0
    total_size += total_offset;
282
0
    break;
283
0
  }
284
0
  case VariantLogicalType::ARRAY: {
285
0
    auto nested_data = VariantUtils::DecodeNestedData(variant, row, values_index);
286
287
0
    idx_t total_offset = 0;
288
0
    offsets.resize(offset_size + nested_data.child_count + 1);
289
0
    for (idx_t i = 0; i < nested_data.child_count; i++) {
290
0
      auto values_index = variant.GetValuesIndex(row, i + nested_data.children_idx);
291
0
      offsets[offset_size + i] = total_offset;
292
293
0
      total_offset += AnalyzeValueData(variant, row, values_index, offsets, nullptr);
294
0
    }
295
0
    offsets[offset_size + nested_data.child_count] = total_offset;
296
297
0
    auto field_offset_size = CalculateByteLength(total_offset);
298
0
    auto num_elements = nested_data.child_count;
299
0
    const bool is_large = num_elements > NumericLimits<uint8_t>::Maximum();
300
301
0
    if (is_large) {
302
0
      total_size += sizeof(uint32_t);
303
0
    } else {
304
0
      total_size += sizeof(uint8_t);
305
0
    }
306
0
    total_size += (num_elements + 1) * field_offset_size;
307
0
    total_size += total_offset;
308
0
    break;
309
0
  }
310
0
  case VariantLogicalType::BLOB:
311
0
  case VariantLogicalType::VARCHAR: {
312
0
    auto string_value = VariantUtils::DecodeStringData(variant, row, values_index);
313
0
    total_size += string_value.GetSize();
314
0
    if (type_id == VariantLogicalType::BLOB || string_value.GetSize() > 64) {
315
      //! Save as regular string value
316
0
      total_size += sizeof(uint32_t);
317
0
    }
318
0
    break;
319
0
  }
320
0
  case VariantLogicalType::VARIANT_NULL:
321
0
  case VariantLogicalType::BOOL_TRUE:
322
0
  case VariantLogicalType::BOOL_FALSE:
323
0
    break;
324
0
  case VariantLogicalType::INT8:
325
0
    total_size += sizeof(uint8_t);
326
0
    break;
327
0
  case VariantLogicalType::INT16:
328
0
    total_size += sizeof(uint16_t);
329
0
    break;
330
0
  case VariantLogicalType::INT32:
331
0
    total_size += sizeof(uint32_t);
332
0
    break;
333
0
  case VariantLogicalType::INT64:
334
0
    total_size += sizeof(uint64_t);
335
0
    break;
336
0
  case VariantLogicalType::FLOAT:
337
0
    total_size += sizeof(float);
338
0
    break;
339
0
  case VariantLogicalType::DOUBLE:
340
0
    total_size += sizeof(double);
341
0
    break;
342
0
  case VariantLogicalType::DECIMAL: {
343
0
    auto decimal_data = VariantUtils::DecodeDecimalData(variant, row, values_index);
344
0
    total_size += 1;
345
0
    if (decimal_data.width <= 9) {
346
0
      total_size += sizeof(int32_t);
347
0
    } else if (decimal_data.width <= 18) {
348
0
      total_size += sizeof(int64_t);
349
0
    } else if (decimal_data.width <= 38) {
350
0
      total_size += sizeof(uhugeint_t);
351
0
    } else {
352
0
      throw InvalidInputException("Can't convert VARIANT DECIMAL(%d, %d) to Parquet VARIANT", decimal_data.width,
353
0
                                  decimal_data.scale);
354
0
    }
355
0
    break;
356
0
  }
357
0
  case VariantLogicalType::UUID:
358
0
    total_size += sizeof(uhugeint_t);
359
0
    break;
360
0
  case VariantLogicalType::DATE:
361
0
    total_size += sizeof(uint32_t);
362
0
    break;
363
0
  case VariantLogicalType::TIME_MICROS:
364
0
  case VariantLogicalType::TIMESTAMP_MICROS:
365
0
  case VariantLogicalType::TIMESTAMP_NANOS:
366
0
  case VariantLogicalType::TIMESTAMP_MICROS_TZ:
367
0
    total_size += sizeof(uint64_t);
368
0
    break;
369
0
  case VariantLogicalType::INTERVAL:
370
0
  case VariantLogicalType::BIGNUM:
371
0
  case VariantLogicalType::BITSTRING:
372
0
  case VariantLogicalType::TIMESTAMP_MILIS:
373
0
  case VariantLogicalType::TIMESTAMP_SEC:
374
0
  case VariantLogicalType::TIME_MICROS_TZ:
375
0
  case VariantLogicalType::TIME_NANOS:
376
0
  case VariantLogicalType::UINT8:
377
0
  case VariantLogicalType::UINT16:
378
0
  case VariantLogicalType::UINT32:
379
0
  case VariantLogicalType::UINT64:
380
0
  case VariantLogicalType::UINT128:
381
0
  case VariantLogicalType::INT128:
382
0
  default:
383
0
    throw InvalidInputException("Can't convert VARIANT of type '%s' to Parquet VARIANT",
384
0
                                EnumUtil::ToString(type_id));
385
0
  }
386
387
0
  return total_size;
388
0
}
389
390
template <VariantPrimitiveType TYPE_ID>
391
0
void WritePrimitiveTypeHeader(data_ptr_t &value_data) {
392
0
  uint8_t value_header = 0;
393
0
  value_header |= static_cast<uint8_t>(VariantBasicType::PRIMITIVE);
394
0
  value_header |= static_cast<uint8_t>(TYPE_ID) << 2;
395
396
0
  *value_data = value_header;
397
0
  value_data++;
398
0
}
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)15>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)16>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)0>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)1>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)2>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)3>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)4>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)5>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)6>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)14>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)7>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)20>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)11>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)17>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)13>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)19>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)12>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)8>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)9>(unsigned char*&)
Unexecuted instantiation: void duckdb::WritePrimitiveTypeHeader<(duckdb::VariantPrimitiveType)10>(unsigned char*&)
399
400
template <class T>
401
void CopySimplePrimitiveData(const UnifiedVariantVectorData &variant, data_ptr_t &value_data, idx_t row,
402
0
                             uint32_t values_index) {
403
0
  auto byte_offset = variant.GetByteOffset(row, values_index);
404
0
  auto data = const_data_ptr_cast(variant.GetData(row).GetData());
405
0
  auto ptr = data + byte_offset;
406
0
  memcpy(value_data, ptr, sizeof(T));
407
0
  value_data += sizeof(T);
408
0
}
Unexecuted instantiation: void duckdb::CopySimplePrimitiveData<signed char>(duckdb::UnifiedVariantVectorData const&, unsigned char*&, unsigned long, unsigned int)
Unexecuted instantiation: void duckdb::CopySimplePrimitiveData<short>(duckdb::UnifiedVariantVectorData const&, unsigned char*&, unsigned long, unsigned int)
Unexecuted instantiation: void duckdb::CopySimplePrimitiveData<int>(duckdb::UnifiedVariantVectorData const&, unsigned char*&, unsigned long, unsigned int)
Unexecuted instantiation: void duckdb::CopySimplePrimitiveData<long>(duckdb::UnifiedVariantVectorData const&, unsigned char*&, unsigned long, unsigned int)
Unexecuted instantiation: void duckdb::CopySimplePrimitiveData<float>(duckdb::UnifiedVariantVectorData const&, unsigned char*&, unsigned long, unsigned int)
Unexecuted instantiation: void duckdb::CopySimplePrimitiveData<double>(duckdb::UnifiedVariantVectorData const&, unsigned char*&, unsigned long, unsigned int)
409
410
0
void CopyUUIDData(const UnifiedVariantVectorData &variant, data_ptr_t &value_data, idx_t row, uint32_t values_index) {
411
0
  auto byte_offset = variant.GetByteOffset(row, values_index);
412
0
  auto data = const_data_ptr_cast(variant.GetData(row).GetData());
413
0
  auto ptr = data + byte_offset;
414
415
0
  auto uuid = Load<uhugeint_t>(ptr);
416
0
  BaseUUID::ToBlob(uuid, value_data);
417
0
  value_data += sizeof(uhugeint_t);
418
0
}
419
420
static void WritePrimitiveValueData(const UnifiedVariantVectorData &variant, idx_t row, uint32_t values_index,
421
0
                                    data_ptr_t &value_data, const vector<uint32_t> &offsets, idx_t &offset_index) {
422
0
  VariantLogicalType type_id = VariantLogicalType::VARIANT_NULL;
423
0
  if (variant.RowIsValid(row)) {
424
0
    type_id = variant.GetTypeId(row, values_index);
425
0
  }
426
427
0
  D_ASSERT(type_id != VariantLogicalType::OBJECT && type_id != VariantLogicalType::ARRAY);
428
0
  switch (type_id) {
429
0
  case VariantLogicalType::BLOB:
430
0
  case VariantLogicalType::VARCHAR: {
431
0
    auto string_value = VariantUtils::DecodeStringData(variant, row, values_index);
432
0
    auto string_size = string_value.GetSize();
433
0
    if (type_id == VariantLogicalType::BLOB || string_size > 64) {
434
0
      if (type_id == VariantLogicalType::BLOB) {
435
0
        WritePrimitiveTypeHeader<VariantPrimitiveType::BINARY>(value_data);
436
0
      } else {
437
0
        WritePrimitiveTypeHeader<VariantPrimitiveType::STRING>(value_data);
438
0
      }
439
0
      Store<uint32_t>(string_size, value_data);
440
0
      value_data += sizeof(uint32_t);
441
0
    } else {
442
0
      uint8_t value_header = 0;
443
0
      value_header |= static_cast<uint8_t>(VariantBasicType::SHORT_STRING);
444
0
      value_header |= static_cast<uint8_t>(string_size) << 2;
445
446
0
      *value_data = value_header;
447
0
      value_data++;
448
0
    }
449
0
    memcpy(value_data, reinterpret_cast<const char *>(string_value.GetData()), string_size);
450
0
    value_data += string_size;
451
0
    break;
452
0
  }
453
0
  case VariantLogicalType::VARIANT_NULL:
454
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::NULL_TYPE>(value_data);
455
0
    break;
456
0
  case VariantLogicalType::BOOL_TRUE:
457
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::BOOLEAN_TRUE>(value_data);
458
0
    break;
459
0
  case VariantLogicalType::BOOL_FALSE:
460
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::BOOLEAN_FALSE>(value_data);
461
0
    break;
462
0
  case VariantLogicalType::INT8:
463
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::INT8>(value_data);
464
0
    CopySimplePrimitiveData<int8_t>(variant, value_data, row, values_index);
465
0
    break;
466
0
  case VariantLogicalType::INT16:
467
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::INT16>(value_data);
468
0
    CopySimplePrimitiveData<int16_t>(variant, value_data, row, values_index);
469
0
    break;
470
0
  case VariantLogicalType::INT32:
471
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::INT32>(value_data);
472
0
    CopySimplePrimitiveData<int32_t>(variant, value_data, row, values_index);
473
0
    break;
474
0
  case VariantLogicalType::INT64:
475
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::INT64>(value_data);
476
0
    CopySimplePrimitiveData<int64_t>(variant, value_data, row, values_index);
477
0
    break;
478
0
  case VariantLogicalType::FLOAT:
479
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::FLOAT>(value_data);
480
0
    CopySimplePrimitiveData<float>(variant, value_data, row, values_index);
481
0
    break;
482
0
  case VariantLogicalType::DOUBLE:
483
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::DOUBLE>(value_data);
484
0
    CopySimplePrimitiveData<double>(variant, value_data, row, values_index);
485
0
    break;
486
0
  case VariantLogicalType::UUID:
487
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::UUID>(value_data);
488
0
    CopyUUIDData(variant, value_data, row, values_index);
489
0
    break;
490
0
  case VariantLogicalType::DATE:
491
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::DATE>(value_data);
492
0
    CopySimplePrimitiveData<int32_t>(variant, value_data, row, values_index);
493
0
    break;
494
0
  case VariantLogicalType::TIME_MICROS:
495
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::TIME_NTZ_MICROS>(value_data);
496
0
    CopySimplePrimitiveData<int64_t>(variant, value_data, row, values_index);
497
0
    break;
498
0
  case VariantLogicalType::TIMESTAMP_MICROS:
499
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::TIMESTAMP_NTZ_MICROS>(value_data);
500
0
    CopySimplePrimitiveData<int64_t>(variant, value_data, row, values_index);
501
0
    break;
502
0
  case VariantLogicalType::TIMESTAMP_NANOS:
503
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::TIMESTAMP_NTZ_NANOS>(value_data);
504
0
    CopySimplePrimitiveData<int64_t>(variant, value_data, row, values_index);
505
0
    break;
506
0
  case VariantLogicalType::TIMESTAMP_MICROS_TZ:
507
0
    WritePrimitiveTypeHeader<VariantPrimitiveType::TIMESTAMP_MICROS>(value_data);
508
0
    CopySimplePrimitiveData<int64_t>(variant, value_data, row, values_index);
509
0
    break;
510
0
  case VariantLogicalType::DECIMAL: {
511
0
    auto decimal_data = VariantUtils::DecodeDecimalData(variant, row, values_index);
512
513
0
    if (decimal_data.width <= 4 || decimal_data.width > 38) {
514
0
      throw InvalidInputException("Can't convert VARIANT DECIMAL(%d, %d) to Parquet VARIANT", decimal_data.width,
515
0
                                  decimal_data.scale);
516
0
    } else if (decimal_data.width <= 9) {
517
0
      WritePrimitiveTypeHeader<VariantPrimitiveType::DECIMAL4>(value_data);
518
0
      Store<int8_t>(decimal_data.scale, value_data);
519
0
      value_data++;
520
0
      memcpy(value_data, decimal_data.value_ptr, sizeof(int32_t));
521
0
      value_data += sizeof(int32_t);
522
0
    } else if (decimal_data.width <= 18) {
523
0
      WritePrimitiveTypeHeader<VariantPrimitiveType::DECIMAL8>(value_data);
524
0
      Store<int8_t>(decimal_data.scale, value_data);
525
0
      value_data++;
526
0
      memcpy(value_data, decimal_data.value_ptr, sizeof(int64_t));
527
0
      value_data += sizeof(int64_t);
528
0
    } else if (decimal_data.width <= 38) {
529
0
      WritePrimitiveTypeHeader<VariantPrimitiveType::DECIMAL16>(value_data);
530
0
      Store<int8_t>(decimal_data.scale, value_data);
531
0
      value_data++;
532
0
      memcpy(value_data, decimal_data.value_ptr, sizeof(hugeint_t));
533
0
      value_data += sizeof(hugeint_t);
534
0
    } else {
535
0
      throw InternalException(
536
0
          "Uncovered VARIANT(DECIMAL) -> Parquet VARIANT conversion for type 'DECIMAL(%d, %d)'",
537
0
          decimal_data.width, decimal_data.scale);
538
0
    }
539
0
    break;
540
0
  }
541
0
  case VariantLogicalType::INTERVAL:
542
0
  case VariantLogicalType::BIGNUM:
543
0
  case VariantLogicalType::BITSTRING:
544
0
  case VariantLogicalType::TIMESTAMP_MILIS:
545
0
  case VariantLogicalType::TIMESTAMP_SEC:
546
0
  case VariantLogicalType::TIME_MICROS_TZ:
547
0
  case VariantLogicalType::TIME_NANOS:
548
0
  case VariantLogicalType::UINT8:
549
0
  case VariantLogicalType::UINT16:
550
0
  case VariantLogicalType::UINT32:
551
0
  case VariantLogicalType::UINT64:
552
0
  case VariantLogicalType::UINT128:
553
0
  case VariantLogicalType::INT128:
554
0
  default:
555
0
    throw InvalidInputException("Can't convert VARIANT of type '%s' to Parquet VARIANT",
556
0
                                EnumUtil::ToString(type_id));
557
0
  }
558
0
}
559
560
static void WriteValueData(const UnifiedVariantVectorData &variant, idx_t row, uint32_t values_index,
561
                           data_ptr_t &value_data, const vector<uint32_t> &offsets, idx_t &offset_index,
562
0
                           optional_ptr<ShreddingState> shredding_state) {
563
0
  VariantLogicalType type_id = VariantLogicalType::VARIANT_NULL;
564
0
  if (variant.RowIsValid(row)) {
565
0
    type_id = variant.GetTypeId(row, values_index);
566
0
  }
567
0
  if (type_id == VariantLogicalType::OBJECT) {
568
0
    auto nested_data = VariantUtils::DecodeNestedData(variant, row, values_index);
569
570
    //! -- Object value header --
571
572
0
    auto child_indices = GetChildIndices(variant, row, nested_data, shredding_state);
573
0
    if (nested_data.child_count && child_indices.empty()) {
574
0
      throw InternalException(
575
0
          "The entire should be omitted, should have been handled by the Analyze step already");
576
0
    }
577
0
    auto num_elements = child_indices.size();
578
579
    //! Determine the 'field_id_size'
580
0
    uint32_t highest_keys_index = 0;
581
0
    for (auto &i : child_indices) {
582
0
      auto keys_index = variant.GetKeysIndex(row, i + nested_data.children_idx);
583
0
      highest_keys_index = MaxValue(highest_keys_index, keys_index);
584
0
    }
585
0
    auto field_id_size = CalculateByteLength(highest_keys_index);
586
587
0
    uint32_t last_offset = 0;
588
0
    if (num_elements) {
589
0
      last_offset = offsets[offset_index + num_elements];
590
0
    }
591
0
    offset_index += num_elements + 1;
592
0
    auto field_offset_size = CalculateByteLength(last_offset);
593
594
0
    const bool is_large = num_elements > NumericLimits<uint8_t>::Maximum();
595
596
0
    uint8_t value_header = 0;
597
0
    value_header |= static_cast<uint8_t>(VariantBasicType::OBJECT);
598
0
    value_header |= static_cast<uint8_t>(is_large) << 6;
599
0
    value_header |= (static_cast<uint8_t>(field_id_size) - 1) << 4;
600
0
    value_header |= (static_cast<uint8_t>(field_offset_size) - 1) << 2;
601
602
#ifdef DEBUG
603
    auto object_value_header = VariantValueMetadata::FromHeaderByte(value_header);
604
    D_ASSERT(object_value_header.basic_type == VariantBasicType::OBJECT);
605
    D_ASSERT(object_value_header.is_large == is_large);
606
    D_ASSERT(object_value_header.field_offset_size == field_offset_size);
607
    D_ASSERT(object_value_header.field_id_size == field_id_size);
608
#endif
609
610
0
    *value_data = value_header;
611
0
    value_data++;
612
613
    //! Write the 'num_elements'
614
0
    if (is_large) {
615
0
      Store<uint32_t>(static_cast<uint32_t>(num_elements), value_data);
616
0
      value_data += sizeof(uint32_t);
617
0
    } else {
618
0
      Store<uint8_t>(static_cast<uint8_t>(num_elements), value_data);
619
0
      value_data += sizeof(uint8_t);
620
0
    }
621
622
    //! Write the 'field_id' entries
623
0
    for (auto &i : child_indices) {
624
0
      auto keys_index = variant.GetKeysIndex(row, i + nested_data.children_idx);
625
0
      memcpy(value_data, reinterpret_cast<data_ptr_t>(&keys_index), field_id_size);
626
0
      value_data += field_id_size;
627
0
    }
628
629
    //! Write the 'field_offset' entries and the child 'value's
630
0
    auto children_ptr = value_data + ((num_elements + 1) * field_offset_size);
631
0
    idx_t total_offset = 0;
632
0
    for (auto &i : child_indices) {
633
0
      auto values_index = variant.GetValuesIndex(row, i + nested_data.children_idx);
634
635
0
      memcpy(value_data, reinterpret_cast<data_ptr_t>(&total_offset), field_offset_size);
636
0
      value_data += field_offset_size;
637
0
      auto start_ptr = children_ptr;
638
0
      WriteValueData(variant, row, values_index, children_ptr, offsets, offset_index, nullptr);
639
0
      total_offset += (children_ptr - start_ptr);
640
0
    }
641
0
    memcpy(value_data, reinterpret_cast<data_ptr_t>(&total_offset), field_offset_size);
642
0
    value_data += field_offset_size;
643
0
    D_ASSERT(children_ptr - total_offset == value_data);
644
0
    value_data = children_ptr;
645
0
  } else if (type_id == VariantLogicalType::ARRAY) {
646
0
    auto nested_data = VariantUtils::DecodeNestedData(variant, row, values_index);
647
648
    //! -- Array value header --
649
650
0
    uint32_t last_offset = 0;
651
0
    if (nested_data.child_count) {
652
0
      last_offset = offsets[offset_index + nested_data.child_count];
653
0
    }
654
0
    offset_index += nested_data.child_count + 1;
655
0
    auto field_offset_size = CalculateByteLength(last_offset);
656
657
0
    auto num_elements = nested_data.child_count;
658
0
    const bool is_large = num_elements > NumericLimits<uint8_t>::Maximum();
659
660
0
    uint8_t value_header = 0;
661
0
    value_header |= static_cast<uint8_t>(VariantBasicType::ARRAY);
662
0
    value_header |= static_cast<uint8_t>(is_large) << 4;
663
0
    value_header |= (static_cast<uint8_t>(field_offset_size) - 1) << 2;
664
665
#ifdef DEBUG
666
    auto array_value_header = VariantValueMetadata::FromHeaderByte(value_header);
667
    D_ASSERT(array_value_header.basic_type == VariantBasicType::ARRAY);
668
    D_ASSERT(array_value_header.is_large == is_large);
669
    D_ASSERT(array_value_header.field_offset_size == field_offset_size);
670
#endif
671
672
0
    *value_data = value_header;
673
0
    value_data++;
674
675
    //! Write the 'num_elements'
676
0
    if (is_large) {
677
0
      Store<uint32_t>(static_cast<uint32_t>(num_elements), value_data);
678
0
      value_data += sizeof(uint32_t);
679
0
    } else {
680
0
      Store<uint8_t>(static_cast<uint8_t>(num_elements), value_data);
681
0
      value_data += sizeof(uint8_t);
682
0
    }
683
684
    //! Write the 'field_offset' entries and the child 'value's
685
0
    auto children_ptr = value_data + ((num_elements + 1) * field_offset_size);
686
0
    idx_t total_offset = 0;
687
0
    for (idx_t i = 0; i < nested_data.child_count; i++) {
688
0
      auto values_index = variant.GetValuesIndex(row, i + nested_data.children_idx);
689
690
0
      memcpy(value_data, reinterpret_cast<data_ptr_t>(&total_offset), field_offset_size);
691
0
      value_data += field_offset_size;
692
0
      auto start_ptr = children_ptr;
693
0
      WriteValueData(variant, row, values_index, children_ptr, offsets, offset_index, nullptr);
694
0
      total_offset += (children_ptr - start_ptr);
695
0
    }
696
0
    memcpy(value_data, reinterpret_cast<data_ptr_t>(&total_offset), field_offset_size);
697
0
    value_data += field_offset_size;
698
0
    D_ASSERT(children_ptr - total_offset == value_data);
699
0
    value_data = children_ptr;
700
0
  } else {
701
0
    WritePrimitiveValueData(variant, row, values_index, value_data, offsets, offset_index);
702
0
  }
703
0
}
704
705
static void CreateValues(UnifiedVariantVectorData &variant, Vector &value, optional_ptr<const SelectionVector> sel,
706
                         optional_ptr<const SelectionVector> value_index_sel,
707
                         optional_ptr<const SelectionVector> result_sel, optional_ptr<ShreddingState> shredding_state,
708
0
                         idx_t count) {
709
0
  auto &validity = FlatVector::Validity(value);
710
0
  auto value_data = FlatVector::GetData<string_t>(value);
711
712
0
  for (idx_t i = 0; i < count; i++) {
713
0
    idx_t value_index = 0;
714
0
    if (value_index_sel) {
715
0
      value_index = value_index_sel->get_index(i);
716
0
    }
717
718
0
    idx_t row = i;
719
0
    if (sel) {
720
0
      row = sel->get_index(i);
721
0
    }
722
723
0
    idx_t result_index = i;
724
0
    if (result_sel) {
725
0
      result_index = result_sel->get_index(i);
726
0
    }
727
728
0
    bool is_shredded = false;
729
0
    if (variant.RowIsValid(row) && shredding_state && shredding_state->ValueIsShredded(variant, row, value_index)) {
730
0
      shredding_state->SetShredded(row, value_index, result_index);
731
0
      is_shredded = true;
732
0
      if (shredding_state->type.id() != LogicalTypeId::STRUCT) {
733
        //! Value is shredded, directly write a NULL to the 'value' if the type is not an OBJECT
734
        //! When the type is OBJECT, all excess fields would still need to be written to the 'value'
735
0
        validity.SetInvalid(result_index);
736
0
        continue;
737
0
      }
738
0
    }
739
740
    //! The (relative) offsets for each value, in the case of nesting
741
0
    vector<uint32_t> offsets;
742
    //! Determine the size of this 'value' blob
743
0
    idx_t blob_length = AnalyzeValueData(variant, row, value_index, offsets, shredding_state);
744
0
    if (!blob_length) {
745
      //! This is only allowed to happen for a shredded OBJECT, where there are no excess fields to write for the
746
      //! OBJECT
747
0
      (void)is_shredded;
748
0
      D_ASSERT(is_shredded);
749
0
      validity.SetInvalid(result_index);
750
0
      continue;
751
0
    }
752
0
    value_data[result_index] = StringVector::EmptyString(value, blob_length);
753
0
    auto &value_blob = value_data[result_index];
754
0
    auto value_blob_data = reinterpret_cast<data_ptr_t>(value_blob.GetDataWriteable());
755
756
0
    idx_t offset_index = 0;
757
0
    WriteValueData(variant, row, value_index, value_blob_data, offsets, offset_index, shredding_state);
758
0
    D_ASSERT(data_ptr_cast(value_blob.GetDataWriteable() + blob_length) == value_blob_data);
759
0
    value_blob.SetSizeAndFinalize(blob_length, blob_length);
760
0
  }
761
0
}
762
763
//! fwd-declare static method
764
static void WriteVariantValues(UnifiedVariantVectorData &variant, Vector &result,
765
                               optional_ptr<const SelectionVector> sel,
766
                               optional_ptr<const SelectionVector> value_index_sel,
767
                               optional_ptr<const SelectionVector> result_sel, idx_t count);
768
769
static void WriteTypedObjectValues(UnifiedVariantVectorData &variant, Vector &result, const SelectionVector &sel,
770
                                   const SelectionVector &value_index_sel, const SelectionVector &result_sel,
771
0
                                   idx_t count) {
772
0
  auto &type = result.GetType();
773
0
  D_ASSERT(type.id() == LogicalTypeId::STRUCT);
774
775
0
  auto &validity = FlatVector::Validity(result);
776
0
  (void)validity;
777
778
  //! Collect the nested data for the objects
779
0
  auto nested_data = make_unsafe_uniq_array_uninitialized<VariantNestedData>(count);
780
0
  for (idx_t i = 0; i < count; i++) {
781
0
    auto row = sel[i];
782
    //! When we're shredding an object, the top-level struct of it should always be valid
783
0
    D_ASSERT(validity.RowIsValid(result_sel[i]));
784
0
    auto value_index = value_index_sel[i];
785
0
    D_ASSERT(variant.GetTypeId(row, value_index) == VariantLogicalType::OBJECT);
786
0
    nested_data[i] = VariantUtils::DecodeNestedData(variant, row, value_index);
787
0
  }
788
789
0
  auto &shredded_types = StructType::GetChildTypes(type);
790
0
  auto &shredded_fields = StructVector::GetEntries(result);
791
0
  D_ASSERT(shredded_types.size() == shredded_fields.size());
792
793
0
  SelectionVector child_values_indexes;
794
0
  SelectionVector child_row_sel;
795
0
  SelectionVector child_result_sel;
796
0
  child_values_indexes.Initialize(count);
797
0
  child_row_sel.Initialize(count);
798
0
  child_result_sel.Initialize(count);
799
800
0
  for (idx_t child_idx = 0; child_idx < shredded_types.size(); child_idx++) {
801
0
    auto &child_vec = *shredded_fields[child_idx];
802
0
    D_ASSERT(child_vec.GetType() == shredded_types[child_idx].second);
803
804
    //! Prepare the path component to perform the lookup for
805
0
    auto &key = shredded_types[child_idx].first;
806
0
    VariantPathComponent path_component;
807
0
    path_component.lookup_mode = VariantChildLookupMode::BY_KEY;
808
0
    path_component.key = key;
809
810
0
    ValidityMask lookup_validity(count);
811
0
    VariantUtils::FindChildValues(variant, path_component, sel, child_values_indexes, lookup_validity,
812
0
                                  nested_data.get(), count);
813
814
0
    if (!lookup_validity.AllValid()) {
815
0
      auto &child_variant_vectors = StructVector::GetEntries(child_vec);
816
817
      //! For some of the rows the field is missing, adjust the selection vector to exclude these rows.
818
0
      idx_t child_count = 0;
819
0
      for (idx_t i = 0; i < count; i++) {
820
0
        if (!lookup_validity.RowIsValid(i)) {
821
          //! The field is missing, set it to null
822
0
          FlatVector::SetNull(*child_variant_vectors[0], result_sel[i], true);
823
0
          if (child_variant_vectors.size() >= 2) {
824
0
            FlatVector::SetNull(*child_variant_vectors[1], result_sel[i], true);
825
0
          }
826
0
          continue;
827
0
        }
828
829
0
        child_row_sel[child_count] = sel[i];
830
0
        child_values_indexes[child_count] = child_values_indexes[i];
831
0
        child_result_sel[child_count] = result_sel[i];
832
0
        child_count++;
833
0
      }
834
835
0
      if (child_count) {
836
        //! If not all rows are missing this field, write the values for it
837
0
        WriteVariantValues(variant, child_vec, child_row_sel, child_values_indexes, child_result_sel,
838
0
                           child_count);
839
0
      }
840
0
    } else {
841
0
      WriteVariantValues(variant, child_vec, &sel, child_values_indexes, result_sel, count);
842
0
    }
843
0
  }
844
0
}
845
846
static void WriteTypedArrayValues(UnifiedVariantVectorData &variant, Vector &result, const SelectionVector &sel,
847
                                  const SelectionVector &value_index_sel, const SelectionVector &result_sel,
848
0
                                  idx_t count) {
849
0
  auto list_data = FlatVector::GetData<list_entry_t>(result);
850
851
0
  auto nested_data = make_unsafe_uniq_array_uninitialized<VariantNestedData>(count);
852
853
0
  idx_t total_offset = 0;
854
0
  for (idx_t i = 0; i < count; i++) {
855
0
    auto row = sel[i];
856
0
    auto value_index = value_index_sel[i];
857
0
    auto result_row = result_sel[i];
858
859
0
    D_ASSERT(variant.GetTypeId(row, value_index) == VariantLogicalType::ARRAY);
860
0
    nested_data[i] = VariantUtils::DecodeNestedData(variant, row, value_index);
861
862
0
    list_entry_t list_entry;
863
0
    list_entry.length = nested_data[i].child_count;
864
0
    list_entry.offset = total_offset;
865
0
    list_data[result_row] = list_entry;
866
867
0
    total_offset += nested_data[i].child_count;
868
0
  }
869
0
  ListVector::Reserve(result, total_offset);
870
0
  ListVector::SetListSize(result, total_offset);
871
872
0
  SelectionVector child_sel;
873
0
  child_sel.Initialize(total_offset);
874
875
0
  SelectionVector child_value_index_sel;
876
0
  child_value_index_sel.Initialize(total_offset);
877
878
0
  SelectionVector child_result_sel;
879
0
  child_result_sel.Initialize(total_offset);
880
881
0
  for (idx_t i = 0; i < count; i++) {
882
0
    auto row = sel[i];
883
0
    auto result_row = result_sel[i];
884
885
0
    auto &array_data = nested_data[i];
886
0
    auto &entry = list_data[result_row];
887
0
    for (idx_t j = 0; j < entry.length; j++) {
888
0
      auto offset = entry.offset + j;
889
0
      child_sel[offset] = row;
890
0
      child_value_index_sel[offset] = variant.GetValuesIndex(row, array_data.children_idx + j);
891
0
      child_result_sel[offset] = offset;
892
0
    }
893
0
  }
894
895
0
  auto &child_vector = ListVector::GetEntry(result);
896
0
  WriteVariantValues(variant, child_vector, child_sel, child_value_index_sel, child_result_sel, total_offset);
897
0
}
898
899
//! TODO: introduce a third selection vector, because we also need one to map to the result row to write
900
//! This becomes necessary when we introduce LISTs into the equation because lists are stored on the same VARIANT row,
901
//! but we're now going to write the flattened child vector
902
static void WriteShreddedPrimitive(UnifiedVariantVectorData &variant, Vector &result, const SelectionVector &sel,
903
                                   const SelectionVector &value_index_sel, const SelectionVector &result_sel,
904
0
                                   idx_t count, idx_t type_size) {
905
0
  auto result_data = FlatVector::GetData(result);
906
0
  for (idx_t i = 0; i < count; i++) {
907
0
    auto row = sel[i];
908
0
    auto result_row = result_sel[i];
909
0
    auto value_index = value_index_sel[i];
910
0
    D_ASSERT(variant.RowIsValid(row));
911
912
0
    auto byte_offset = variant.GetByteOffset(row, value_index);
913
0
    auto &data = variant.GetData(row);
914
0
    auto value_ptr = data.GetData();
915
0
    auto result_offset = type_size * result_row;
916
0
    memcpy(result_data + result_offset, value_ptr + byte_offset, type_size);
917
0
  }
918
0
}
919
920
template <class T>
921
static void WriteShreddedDecimal(UnifiedVariantVectorData &variant, Vector &result, const SelectionVector &sel,
922
                                 const SelectionVector &value_index_sel, const SelectionVector &result_sel,
923
0
                                 idx_t count) {
924
0
  auto result_data = FlatVector::GetData(result);
925
0
  for (idx_t i = 0; i < count; i++) {
926
0
    auto row = sel[i];
927
0
    auto result_row = result_sel[i];
928
0
    auto value_index = value_index_sel[i];
929
0
    D_ASSERT(variant.RowIsValid(row) && variant.GetTypeId(row, value_index) == VariantLogicalType::DECIMAL);
930
931
0
    auto decimal_data = VariantUtils::DecodeDecimalData(variant, row, value_index);
932
0
    D_ASSERT(decimal_data.width <= DecimalWidth<T>::max);
933
0
    auto result_offset = sizeof(T) * result_row;
934
0
    memcpy(result_data + result_offset, decimal_data.value_ptr, sizeof(T));
935
0
  }
936
0
}
Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::WriteShreddedDecimal<int>(duckdb::UnifiedVariantVectorData&, duckdb::Vector&, duckdb::SelectionVector const&, duckdb::SelectionVector const&, duckdb::SelectionVector const&, unsigned long)
Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::WriteShreddedDecimal<long>(duckdb::UnifiedVariantVectorData&, duckdb::Vector&, duckdb::SelectionVector const&, duckdb::SelectionVector const&, duckdb::SelectionVector const&, unsigned long)
Unexecuted instantiation: ub_duckdb_parquet_writer_variant.cpp:void duckdb::WriteShreddedDecimal<duckdb::hugeint_t>(duckdb::UnifiedVariantVectorData&, duckdb::Vector&, duckdb::SelectionVector const&, duckdb::SelectionVector const&, duckdb::SelectionVector const&, unsigned long)
937
938
static void WriteShreddedString(UnifiedVariantVectorData &variant, Vector &result, const SelectionVector &sel,
939
                                const SelectionVector &value_index_sel, const SelectionVector &result_sel,
940
0
                                idx_t count) {
941
0
  auto result_data = FlatVector::GetData<string_t>(result);
942
0
  for (idx_t i = 0; i < count; i++) {
943
0
    auto row = sel[i];
944
0
    auto result_row = result_sel[i];
945
0
    auto value_index = value_index_sel[i];
946
0
    D_ASSERT(variant.RowIsValid(row) && (variant.GetTypeId(row, value_index) == VariantLogicalType::VARCHAR ||
947
0
                                         variant.GetTypeId(row, value_index) == VariantLogicalType::BLOB));
948
949
0
    auto string_data = VariantUtils::DecodeStringData(variant, row, value_index);
950
0
    result_data[result_row] = StringVector::AddStringOrBlob(result, string_data);
951
0
  }
952
0
}
953
954
static void WriteShreddedBoolean(UnifiedVariantVectorData &variant, Vector &result, const SelectionVector &sel,
955
                                 const SelectionVector &value_index_sel, const SelectionVector &result_sel,
956
0
                                 idx_t count) {
957
0
  auto result_data = FlatVector::GetData<bool>(result);
958
0
  for (idx_t i = 0; i < count; i++) {
959
0
    auto row = sel[i];
960
0
    auto result_row = result_sel[i];
961
0
    auto value_index = value_index_sel[i];
962
0
    D_ASSERT(variant.RowIsValid(row));
963
0
    auto type_id = variant.GetTypeId(row, value_index);
964
0
    D_ASSERT(type_id == VariantLogicalType::BOOL_FALSE || type_id == VariantLogicalType::BOOL_TRUE);
965
966
0
    result_data[result_row] = type_id == VariantLogicalType::BOOL_TRUE;
967
0
  }
968
0
}
969
970
static void WriteTypedPrimitiveValues(UnifiedVariantVectorData &variant, Vector &result, const SelectionVector &sel,
971
                                      const SelectionVector &value_index_sel, const SelectionVector &result_sel,
972
0
                                      idx_t count) {
973
0
  auto &type = result.GetType();
974
0
  D_ASSERT(!type.IsNested());
975
0
  switch (type.id()) {
976
0
  case LogicalTypeId::TINYINT:
977
0
  case LogicalTypeId::SMALLINT:
978
0
  case LogicalTypeId::INTEGER:
979
0
  case LogicalTypeId::BIGINT:
980
0
  case LogicalTypeId::FLOAT:
981
0
  case LogicalTypeId::DOUBLE:
982
0
  case LogicalTypeId::DATE:
983
0
  case LogicalTypeId::TIME:
984
0
  case LogicalTypeId::TIMESTAMP_TZ:
985
0
  case LogicalTypeId::TIMESTAMP:
986
0
  case LogicalTypeId::TIMESTAMP_NS:
987
0
  case LogicalTypeId::UUID: {
988
0
    const auto physical_type = type.InternalType();
989
0
    WriteShreddedPrimitive(variant, result, sel, value_index_sel, result_sel, count, GetTypeIdSize(physical_type));
990
0
    break;
991
0
  }
992
0
  case LogicalTypeId::DECIMAL: {
993
0
    const auto physical_type = type.InternalType();
994
0
    switch (physical_type) {
995
    //! DECIMAL4
996
0
    case PhysicalType::INT32:
997
0
      WriteShreddedDecimal<int32_t>(variant, result, sel, value_index_sel, result_sel, count);
998
0
      break;
999
    //! DECIMAL8
1000
0
    case PhysicalType::INT64:
1001
0
      WriteShreddedDecimal<int64_t>(variant, result, sel, value_index_sel, result_sel, count);
1002
0
      break;
1003
    //! DECIMAL16
1004
0
    case PhysicalType::INT128:
1005
0
      WriteShreddedDecimal<hugeint_t>(variant, result, sel, value_index_sel, result_sel, count);
1006
0
      break;
1007
0
    default:
1008
0
      throw InvalidInputException("Can't shred on column of type '%s'", type.ToString());
1009
0
    }
1010
0
    break;
1011
0
  }
1012
0
  case LogicalTypeId::BLOB:
1013
0
  case LogicalTypeId::VARCHAR: {
1014
0
    WriteShreddedString(variant, result, sel, value_index_sel, result_sel, count);
1015
0
    break;
1016
0
  }
1017
0
  case LogicalTypeId::BOOLEAN:
1018
0
    WriteShreddedBoolean(variant, result, sel, value_index_sel, result_sel, count);
1019
0
    break;
1020
0
  default:
1021
0
    throw InvalidInputException("Can't shred on type: %s", type.ToString());
1022
0
  }
1023
0
}
1024
1025
static void WriteTypedValues(UnifiedVariantVectorData &variant, Vector &result, const SelectionVector &sel,
1026
0
                             const SelectionVector &value_index_sel, const SelectionVector &result_sel, idx_t count) {
1027
0
  auto &type = result.GetType();
1028
1029
0
  if (type.id() == LogicalTypeId::STRUCT) {
1030
    //! Shredded OBJECT
1031
0
    WriteTypedObjectValues(variant, result, sel, value_index_sel, result_sel, count);
1032
0
  } else if (type.id() == LogicalTypeId::LIST) {
1033
    //! Shredded ARRAY
1034
0
    WriteTypedArrayValues(variant, result, sel, value_index_sel, result_sel, count);
1035
0
  } else {
1036
    //! Primitive types
1037
0
    WriteTypedPrimitiveValues(variant, result, sel, value_index_sel, result_sel, count);
1038
0
  }
1039
0
}
1040
1041
static void WriteVariantValues(UnifiedVariantVectorData &variant, Vector &result,
1042
                               optional_ptr<const SelectionVector> sel,
1043
                               optional_ptr<const SelectionVector> value_index_sel,
1044
0
                               optional_ptr<const SelectionVector> result_sel, idx_t count) {
1045
0
  optional_ptr<Vector> value;
1046
0
  optional_ptr<Vector> typed_value;
1047
1048
0
  auto &result_type = result.GetType();
1049
0
  D_ASSERT(result_type.id() == LogicalTypeId::STRUCT);
1050
0
  auto &child_types = StructType::GetChildTypes(result_type);
1051
0
  auto &child_vectors = StructVector::GetEntries(result);
1052
0
  D_ASSERT(child_types.size() == child_vectors.size());
1053
0
  for (idx_t i = 0; i < child_types.size(); i++) {
1054
0
    auto &name = child_types[i].first;
1055
0
    if (name == "value") {
1056
0
      value = child_vectors[i].get();
1057
0
    } else if (name == "typed_value") {
1058
0
      typed_value = child_vectors[i].get();
1059
0
    }
1060
0
  }
1061
1062
0
  if (typed_value) {
1063
0
    ShreddingState shredding_state(typed_value->GetType(), count);
1064
0
    CreateValues(variant, *value, sel, value_index_sel, result_sel, &shredding_state, count);
1065
1066
0
    SelectionVector null_values;
1067
0
    if (shredding_state.count) {
1068
0
      WriteTypedValues(variant, *typed_value, shredding_state.shredded_sel, shredding_state.values_index_sel,
1069
0
                       shredding_state.result_sel, shredding_state.count);
1070
      //! 'shredding_state.result_sel' will always be a subset of 'result_sel', set the rows not in the subset to
1071
      //! NULL
1072
0
      idx_t sel_idx = 0;
1073
0
      for (idx_t i = 0; i < count; i++) {
1074
0
        auto original_index = result_sel ? result_sel->get_index(i) : i;
1075
0
        if (sel_idx < shredding_state.count && shredding_state.result_sel[sel_idx] == original_index) {
1076
0
          sel_idx++;
1077
0
          continue;
1078
0
        }
1079
0
        FlatVector::SetNull(*typed_value, original_index, true);
1080
0
      }
1081
0
    } else {
1082
      //! Set all rows of the typed_value to NULL, nothing is shredded on
1083
0
      for (idx_t i = 0; i < count; i++) {
1084
0
        FlatVector::SetNull(*typed_value, result_sel ? result_sel->get_index(i) : i, true);
1085
0
      }
1086
0
    }
1087
0
  } else {
1088
0
    CreateValues(variant, *value, sel, value_index_sel, result_sel, nullptr, count);
1089
0
  }
1090
0
}
1091
1092
0
static void ToParquetVariant(DataChunk &input, ExpressionState &state, Vector &result) {
1093
  // DuckDB Variant:
1094
  // - keys = VARCHAR[]
1095
  // - children = STRUCT(keys_index UINTEGER, values_index UINTEGER)[]
1096
  // - values = STRUCT(type_id UTINYINT, byte_offset UINTEGER)[]
1097
  // - data = BLOB
1098
1099
  // Parquet VARIANT:
1100
  // - metadata = BLOB
1101
  // - value = BLOB
1102
1103
0
  auto &variant_vec = input.data[0];
1104
0
  auto count = input.size();
1105
1106
0
  RecursiveUnifiedVectorFormat recursive_format;
1107
0
  Vector::RecursiveToUnifiedFormat(variant_vec, count, recursive_format);
1108
0
  UnifiedVariantVectorData variant(recursive_format);
1109
1110
0
  auto &result_vectors = StructVector::GetEntries(result);
1111
0
  auto &metadata = *result_vectors[0];
1112
0
  CreateMetadata(variant, metadata, count);
1113
0
  WriteVariantValues(variant, result, nullptr, nullptr, nullptr, count);
1114
1115
0
  if (input.AllConstant()) {
1116
0
    result.SetVectorType(VectorType::CONSTANT_VECTOR);
1117
0
  }
1118
0
}
1119
1120
0
LogicalType VariantColumnWriter::TransformTypedValueRecursive(const LogicalType &type) {
1121
0
  switch (type.id()) {
1122
0
  case LogicalTypeId::STRUCT: {
1123
    //! Wrap all fields of the struct in a struct with 'value' and 'typed_value' fields
1124
0
    auto &child_types = StructType::GetChildTypes(type);
1125
0
    child_list_t<LogicalType> replaced_types;
1126
0
    for (auto &entry : child_types) {
1127
0
      child_list_t<LogicalType> child_children;
1128
0
      child_children.emplace_back("value", LogicalType::BLOB);
1129
0
      if (entry.second.id() != LogicalTypeId::VARIANT) {
1130
0
        child_children.emplace_back("typed_value", TransformTypedValueRecursive(entry.second));
1131
0
      }
1132
0
      replaced_types.emplace_back(entry.first, LogicalType::STRUCT(child_children));
1133
0
    }
1134
0
    return LogicalType::STRUCT(replaced_types);
1135
0
  }
1136
0
  case LogicalTypeId::LIST: {
1137
0
    auto &child_type = ListType::GetChildType(type);
1138
0
    child_list_t<LogicalType> replaced_types;
1139
0
    replaced_types.emplace_back("value", LogicalType::BLOB);
1140
0
    if (child_type.id() != LogicalTypeId::VARIANT) {
1141
0
      replaced_types.emplace_back("typed_value", TransformTypedValueRecursive(child_type));
1142
0
    }
1143
0
    return LogicalType::LIST(LogicalType::STRUCT(replaced_types));
1144
0
  }
1145
0
  case LogicalTypeId::UNION:
1146
0
  case LogicalTypeId::MAP:
1147
0
  case LogicalTypeId::VARIANT:
1148
0
  case LogicalTypeId::ARRAY:
1149
0
    throw BinderException("'%s' can't appear inside the a 'typed_value' shredded type!", type.ToString());
1150
0
  default:
1151
0
    return type;
1152
0
  }
1153
0
}
1154
1155
0
static LogicalType GetParquetVariantType(optional_ptr<LogicalType> shredding = nullptr) {
1156
0
  child_list_t<LogicalType> children;
1157
0
  children.emplace_back("metadata", LogicalType::BLOB);
1158
0
  children.emplace_back("value", LogicalType::BLOB);
1159
0
  if (shredding) {
1160
0
    children.emplace_back("typed_value", VariantColumnWriter::TransformTypedValueRecursive(*shredding));
1161
0
  }
1162
0
  auto res = LogicalType::STRUCT(std::move(children));
1163
0
  res.SetAlias("PARQUET_VARIANT");
1164
0
  return res;
1165
0
}
1166
1167
static unique_ptr<FunctionData> BindTransform(ClientContext &context, ScalarFunction &bound_function,
1168
0
                                              vector<unique_ptr<Expression>> &arguments) {
1169
0
  if (arguments.empty()) {
1170
0
    return nullptr;
1171
0
  }
1172
0
  auto type = ExpressionBinder::GetExpressionReturnType(*arguments[0]);
1173
1174
0
  if (arguments.size() == 2) {
1175
0
    auto &shredding = *arguments[1];
1176
0
    auto expr_return_type = ExpressionBinder::GetExpressionReturnType(shredding);
1177
0
    expr_return_type = LogicalType::NormalizeType(expr_return_type);
1178
0
    if (expr_return_type.id() != LogicalTypeId::VARCHAR) {
1179
0
      throw BinderException("Optional second argument 'shredding' has to be of type VARCHAR, i.e: "
1180
0
                            "'STRUCT(my_field BOOLEAN)', found type: '%s' instead",
1181
0
                            expr_return_type);
1182
0
    }
1183
0
    if (!shredding.IsFoldable()) {
1184
0
      throw BinderException("Optional second argument 'shredding' has to be a constant expression");
1185
0
    }
1186
0
    Value type_str = ExpressionExecutor::EvaluateScalar(context, shredding);
1187
0
    if (type_str.IsNull()) {
1188
0
      throw BinderException("Optional second argument 'shredding' can not be NULL");
1189
0
    }
1190
0
    auto shredded_type = TransformStringToLogicalType(type_str.GetValue<string>());
1191
0
    bound_function.SetReturnType(GetParquetVariantType(shredded_type));
1192
0
  } else {
1193
0
    bound_function.SetReturnType(GetParquetVariantType());
1194
0
  }
1195
1196
0
  return nullptr;
1197
0
}
1198
1199
9.40k
ScalarFunction VariantColumnWriter::GetTransformFunction() {
1200
9.40k
  ScalarFunction transform("variant_to_parquet_variant", {LogicalType::VARIANT()}, LogicalType::ANY, ToParquetVariant,
1201
9.40k
                           BindTransform);
1202
9.40k
  transform.SetNullHandling(FunctionNullHandling::SPECIAL_HANDLING);
1203
9.40k
  return transform;
1204
9.40k
}
1205
1206
} // namespace duckdb