Coverage Report

Created: 2025-09-05 08:05

/src/duckdb/extension/parquet/reader/variant_column_reader.cpp
Line
Count
Source (jump to first uncovered line)
1
#include "reader/variant_column_reader.hpp"
2
#include "reader/variant/variant_binary_decoder.hpp"
3
#include "reader/variant/variant_shredded_conversion.hpp"
4
5
namespace duckdb {
6
7
//===--------------------------------------------------------------------===//
8
// Variant Column Reader
9
//===--------------------------------------------------------------------===//
10
VariantColumnReader::VariantColumnReader(ClientContext &context, ParquetReader &reader,
11
                                         const ParquetColumnSchema &schema,
12
                                         vector<unique_ptr<ColumnReader>> child_readers_p)
13
0
    : ColumnReader(reader, schema), context(context), child_readers(std::move(child_readers_p)) {
14
0
  D_ASSERT(Type().InternalType() == PhysicalType::VARCHAR);
15
16
0
  if (child_readers[0]->Schema().name == "metadata" && child_readers[1]->Schema().name == "value") {
17
0
    metadata_reader_idx = 0;
18
0
    value_reader_idx = 1;
19
0
  } else if (child_readers[1]->Schema().name == "metadata" && child_readers[0]->Schema().name == "value") {
20
0
    metadata_reader_idx = 1;
21
0
    value_reader_idx = 0;
22
0
  } else {
23
0
    throw InternalException("The Variant column must have 'metadata' and 'value' as the first two columns");
24
0
  }
25
0
}
26
27
0
ColumnReader &VariantColumnReader::GetChildReader(idx_t child_idx) {
28
0
  if (!child_readers[child_idx]) {
29
0
    throw InternalException("VariantColumnReader::GetChildReader(%d) - but this child reader is not set",
30
0
                            child_idx);
31
0
  }
32
0
  return *child_readers[child_idx].get();
33
0
}
34
35
void VariantColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChunk> &columns,
36
0
                                         TProtocol &protocol_p) {
37
0
  for (auto &child : child_readers) {
38
0
    if (!child) {
39
0
      continue;
40
0
    }
41
0
    child->InitializeRead(row_group_idx_p, columns, protocol_p);
42
0
  }
43
0
}
44
45
0
static LogicalType GetIntermediateGroupType(optional_ptr<ColumnReader> typed_value) {
46
0
  child_list_t<LogicalType> children;
47
0
  children.emplace_back("value", LogicalType::BLOB);
48
0
  if (typed_value) {
49
0
    children.emplace_back("typed_value", typed_value->Type());
50
0
  }
51
0
  return LogicalType::STRUCT(std::move(children));
52
0
}
53
54
0
idx_t VariantColumnReader::Read(uint64_t num_values, data_ptr_t define_out, data_ptr_t repeat_out, Vector &result) {
55
0
  if (pending_skips > 0) {
56
0
    throw InternalException("VariantColumnReader cannot have pending skips");
57
0
  }
58
0
  optional_ptr<ColumnReader> typed_value_reader = child_readers.size() == 3 ? child_readers[2].get() : nullptr;
59
60
  // If the child reader values are all valid, "define_out" may not be initialized at all
61
  // So, we just initialize them to all be valid beforehand
62
0
  std::fill_n(define_out, num_values, MaxDefine());
63
64
0
  optional_idx read_count;
65
66
0
  Vector metadata_intermediate(LogicalType::BLOB, num_values);
67
0
  Vector intermediate_group(GetIntermediateGroupType(typed_value_reader), num_values);
68
0
  auto &group_entries = StructVector::GetEntries(intermediate_group);
69
0
  auto &value_intermediate = *group_entries[0];
70
71
0
  auto metadata_values =
72
0
      child_readers[metadata_reader_idx]->Read(num_values, define_out, repeat_out, metadata_intermediate);
73
0
  auto value_values = child_readers[value_reader_idx]->Read(num_values, define_out, repeat_out, value_intermediate);
74
75
0
  D_ASSERT(child_readers[metadata_reader_idx]->Schema().name == "metadata");
76
0
  D_ASSERT(child_readers[value_reader_idx]->Schema().name == "value");
77
78
0
  if (metadata_values != value_values) {
79
0
    throw InvalidInputException(
80
0
        "The Variant column did not contain the same amount of values for 'metadata' and 'value'");
81
0
  }
82
83
0
  auto result_data = FlatVector::GetData<string_t>(result);
84
0
  auto &result_validity = FlatVector::Validity(result);
85
86
0
  vector<VariantValue> conversion_result;
87
0
  if (typed_value_reader) {
88
0
    auto typed_values = typed_value_reader->Read(num_values, define_out, repeat_out, *group_entries[1]);
89
0
    if (typed_values != value_values) {
90
0
      throw InvalidInputException(
91
0
          "The shredded Variant column did not contain the same amount of values for 'typed_value' and 'value'");
92
0
    }
93
0
  }
94
0
  conversion_result =
95
0
      VariantShreddedConversion::Convert(metadata_intermediate, intermediate_group, 0, num_values, num_values);
96
97
0
  for (idx_t i = 0; i < conversion_result.size(); i++) {
98
0
    auto &variant = conversion_result[i];
99
0
    if (variant.IsNull()) {
100
0
      result_validity.SetInvalid(i);
101
0
      continue;
102
0
    }
103
104
    //! Write the result to a string
105
0
    VariantDecodeResult decode_result;
106
0
    decode_result.doc = yyjson_mut_doc_new(nullptr);
107
0
    auto json_val = variant.ToJSON(context, decode_result.doc);
108
109
0
    size_t len;
110
0
    decode_result.data =
111
0
        yyjson_mut_val_write_opts(json_val, YYJSON_WRITE_ALLOW_INF_AND_NAN, nullptr, &len, nullptr);
112
0
    if (!decode_result.data) {
113
0
      throw InvalidInputException("Could not serialize the JSON to string, yyjson failed");
114
0
    }
115
0
    result_data[i] = StringVector::AddString(result, decode_result.data, static_cast<idx_t>(len));
116
0
  }
117
118
0
  read_count = value_values;
119
0
  return read_count.GetIndex();
120
0
}
121
122
0
void VariantColumnReader::Skip(idx_t num_values) {
123
0
  for (auto &child : child_readers) {
124
0
    if (!child) {
125
0
      continue;
126
0
    }
127
0
    child->Skip(num_values);
128
0
  }
129
0
}
130
131
0
void VariantColumnReader::RegisterPrefetch(ThriftFileTransport &transport, bool allow_merge) {
132
0
  for (auto &child : child_readers) {
133
0
    if (!child) {
134
0
      continue;
135
0
    }
136
0
    child->RegisterPrefetch(transport, allow_merge);
137
0
  }
138
0
}
139
140
0
uint64_t VariantColumnReader::TotalCompressedSize() {
141
0
  uint64_t size = 0;
142
0
  for (auto &child : child_readers) {
143
0
    if (!child) {
144
0
      continue;
145
0
    }
146
0
    size += child->TotalCompressedSize();
147
0
  }
148
0
  return size;
149
0
}
150
151
0
idx_t VariantColumnReader::GroupRowsAvailable() {
152
0
  for (auto &child : child_readers) {
153
0
    if (!child) {
154
0
      continue;
155
0
    }
156
0
    return child->GroupRowsAvailable();
157
0
  }
158
0
  throw InternalException("No projected columns in struct?");
159
0
}
160
161
} // namespace duckdb