Coverage Report

Created: 2025-08-28 07:58

/src/duckdb/extension/parquet/writer/primitive_column_writer.cpp
Line
Count
Source (jump to first uncovered line)
1
#include "writer/primitive_column_writer.hpp"
2
#include "parquet_rle_bp_decoder.hpp"
3
#include "parquet_rle_bp_encoder.hpp"
4
#include "parquet_writer.hpp"
5
6
namespace duckdb {
7
using duckdb_parquet::Encoding;
8
using duckdb_parquet::PageType;
9
10
PrimitiveColumnWriter::PrimitiveColumnWriter(ParquetWriter &writer, const ParquetColumnSchema &column_schema,
11
                                             vector<string> schema_path, bool can_have_nulls)
12
0
    : ColumnWriter(writer, column_schema, std::move(schema_path), can_have_nulls) {
13
0
}
14
15
0
unique_ptr<ColumnWriterState> PrimitiveColumnWriter::InitializeWriteState(duckdb_parquet::RowGroup &row_group) {
16
0
  auto result = make_uniq<PrimitiveColumnWriterState>(writer, row_group, row_group.columns.size());
17
0
  RegisterToRowGroup(row_group);
18
0
  return std::move(result);
19
0
}
20
21
0
void PrimitiveColumnWriter::RegisterToRowGroup(duckdb_parquet::RowGroup &row_group) {
22
0
  duckdb_parquet::ColumnChunk column_chunk;
23
0
  column_chunk.__isset.meta_data = true;
24
0
  column_chunk.meta_data.codec = writer.GetCodec();
25
0
  column_chunk.meta_data.path_in_schema = schema_path;
26
0
  column_chunk.meta_data.num_values = 0;
27
0
  column_chunk.meta_data.type = writer.GetType(SchemaIndex());
28
0
  row_group.columns.push_back(std::move(column_chunk));
29
0
}
30
31
unique_ptr<ColumnWriterPageState> PrimitiveColumnWriter::InitializePageState(PrimitiveColumnWriterState &state,
32
0
                                                                             idx_t page_idx) {
33
0
  return nullptr;
34
0
}
35
36
0
void PrimitiveColumnWriter::FlushPageState(WriteStream &temp_writer, ColumnWriterPageState *state) {
37
0
}
38
39
void PrimitiveColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count,
40
0
                                    bool vector_can_span_multiple_pages) {
41
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
42
0
  auto &col_chunk = state.row_group.columns[state.col_idx];
43
44
0
  idx_t vcount = parent ? parent->definition_levels.size() - state.definition_levels.size() : count;
45
0
  idx_t parent_index = state.definition_levels.size();
46
0
  auto &validity = FlatVector::Validity(vector);
47
0
  HandleRepeatLevels(state, parent, count, MaxRepeat());
48
0
  HandleDefineLevels(state, parent, validity, count, MaxDefine(), MaxDefine() - 1);
49
50
0
  idx_t vector_index = 0;
51
0
  reference<PageInformation> page_info_ref = state.page_info.back();
52
0
  col_chunk.meta_data.num_values += NumericCast<int64_t>(vcount);
53
54
0
  const bool check_parent_empty = parent && !parent->is_empty.empty();
55
0
  if (!check_parent_empty && validity.AllValid() && TypeIsConstantSize(vector.GetType().InternalType()) &&
56
0
      page_info_ref.get().estimated_page_size + GetRowSize(vector, vector_index, state) * vcount <
57
0
          MAX_UNCOMPRESSED_PAGE_SIZE) {
58
    // Fast path: fixed-size type, all valid, and it fits on the current page
59
0
    auto &page_info = page_info_ref.get();
60
0
    page_info.row_count += vcount;
61
0
    page_info.estimated_page_size += GetRowSize(vector, vector_index, state) * vcount;
62
0
  } else {
63
0
    for (idx_t i = 0; i < vcount; i++) {
64
0
      auto &page_info = page_info_ref.get();
65
0
      page_info.row_count++;
66
0
      if (check_parent_empty && parent->is_empty[parent_index + i]) {
67
0
        page_info.empty_count++;
68
0
        continue;
69
0
      }
70
0
      if (validity.RowIsValid(vector_index)) {
71
0
        page_info.estimated_page_size += GetRowSize(vector, vector_index, state);
72
0
        if (page_info.estimated_page_size >= MAX_UNCOMPRESSED_PAGE_SIZE) {
73
0
          if (!vector_can_span_multiple_pages && i != 0) {
74
            // Vector is not allowed to span multiple pages, and we already started writing it
75
0
            continue;
76
0
          }
77
0
          PageInformation new_info;
78
0
          new_info.offset = page_info.offset + page_info.row_count;
79
0
          state.page_info.push_back(new_info);
80
0
          page_info_ref = state.page_info.back();
81
0
        }
82
0
      } else {
83
0
        page_info.null_count++;
84
0
      }
85
0
      vector_index++;
86
0
    }
87
0
  }
88
0
}
89
90
0
duckdb_parquet::Encoding::type PrimitiveColumnWriter::GetEncoding(PrimitiveColumnWriterState &state) {
91
0
  return Encoding::PLAIN;
92
0
}
93
94
0
void PrimitiveColumnWriter::BeginWrite(ColumnWriterState &state_p) {
95
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
96
97
  // set up the page write info
98
0
  state.stats_state = InitializeStatsState();
99
0
  for (idx_t page_idx = 0; page_idx < state.page_info.size(); page_idx++) {
100
0
    auto &page_info = state.page_info[page_idx];
101
0
    if (page_info.row_count == 0) {
102
0
      D_ASSERT(page_idx + 1 == state.page_info.size());
103
0
      state.page_info.erase_at(page_idx);
104
0
      break;
105
0
    }
106
0
    PageWriteInformation write_info;
107
    // set up the header
108
0
    auto &hdr = write_info.page_header;
109
0
    hdr.compressed_page_size = 0;
110
0
    hdr.uncompressed_page_size = 0;
111
0
    hdr.type = PageType::DATA_PAGE;
112
0
    hdr.__isset.data_page_header = true;
113
114
0
    hdr.data_page_header.num_values = UnsafeNumericCast<int32_t>(page_info.row_count);
115
0
    hdr.data_page_header.encoding = GetEncoding(state);
116
0
    hdr.data_page_header.definition_level_encoding = Encoding::RLE;
117
0
    hdr.data_page_header.repetition_level_encoding = Encoding::RLE;
118
119
0
    write_info.temp_writer = make_uniq<MemoryStream>(
120
0
        BufferAllocator::Get(writer.GetContext()),
121
0
        MaxValue<idx_t>(NextPowerOfTwo(page_info.estimated_page_size), MemoryStream::DEFAULT_INITIAL_CAPACITY));
122
0
    write_info.write_count = page_info.empty_count;
123
0
    write_info.max_write_count = page_info.row_count;
124
0
    write_info.page_state = InitializePageState(state, page_idx);
125
126
0
    write_info.compressed_size = 0;
127
0
    write_info.compressed_data = nullptr;
128
129
0
    state.write_info.push_back(std::move(write_info));
130
0
  }
131
132
  // start writing the first page
133
0
  NextPage(state);
134
0
}
135
136
void PrimitiveColumnWriter::WriteLevels(Allocator &allocator, WriteStream &temp_writer,
137
                                        const unsafe_vector<uint16_t> &levels, idx_t max_value, idx_t offset,
138
0
                                        idx_t count, optional_idx null_count) {
139
0
  if (levels.empty() || count == 0) {
140
0
    return;
141
0
  }
142
143
  // write the levels using the RLE-BP encoding
144
0
  const auto bit_width = RleBpDecoder::ComputeBitWidth((max_value));
145
0
  RleBpEncoder rle_encoder(bit_width);
146
147
  // have to write to an intermediate stream first because we need to know the size
148
0
  MemoryStream intermediate_stream(allocator);
149
150
0
  rle_encoder.BeginWrite();
151
0
  if (null_count.IsValid() && null_count.GetIndex() == 0) {
152
    // Fast path: no nulls
153
0
    rle_encoder.WriteMany(intermediate_stream, levels[0], count);
154
0
  } else {
155
0
    for (idx_t i = offset; i < offset + count; i++) {
156
0
      rle_encoder.WriteValue(intermediate_stream, levels[i]);
157
0
    }
158
0
  }
159
0
  rle_encoder.FinishWrite(intermediate_stream);
160
161
  // start off by writing the byte count as a uint32_t
162
0
  temp_writer.Write(NumericCast<uint32_t>(intermediate_stream.GetPosition()));
163
  // copy over the written data
164
0
  temp_writer.WriteData(intermediate_stream.GetData(), intermediate_stream.GetPosition());
165
0
}
166
167
0
void PrimitiveColumnWriter::NextPage(PrimitiveColumnWriterState &state) {
168
0
  if (state.current_page > 0) {
169
    // need to flush the current page
170
0
    FlushPage(state);
171
0
  }
172
0
  if (state.current_page >= state.write_info.size()) {
173
0
    state.current_page = state.write_info.size() + 1;
174
0
    return;
175
0
  }
176
0
  auto &page_info = state.page_info[state.current_page];
177
0
  auto &write_info = state.write_info[state.current_page];
178
0
  state.current_page++;
179
180
0
  auto &temp_writer = *write_info.temp_writer;
181
182
  // write the repetition levels
183
0
  auto &allocator = BufferAllocator::Get(writer.GetContext());
184
0
  WriteLevels(allocator, temp_writer, state.repetition_levels, MaxRepeat(), page_info.offset, page_info.row_count);
185
186
  // write the definition levels
187
0
  WriteLevels(allocator, temp_writer, state.definition_levels, MaxDefine(), page_info.offset, page_info.row_count,
188
0
              state.null_count + state.parent_null_count);
189
0
}
190
191
0
void PrimitiveColumnWriter::FlushPage(PrimitiveColumnWriterState &state) {
192
0
  D_ASSERT(state.current_page > 0);
193
0
  if (state.current_page > state.write_info.size()) {
194
0
    return;
195
0
  }
196
197
  // compress the page info
198
0
  auto &write_info = state.write_info[state.current_page - 1];
199
0
  auto &temp_writer = *write_info.temp_writer;
200
0
  auto &hdr = write_info.page_header;
201
202
0
  FlushPageState(temp_writer, write_info.page_state.get());
203
204
  // now that we have finished writing the data we know the uncompressed size
205
0
  if (temp_writer.GetPosition() > idx_t(NumericLimits<int32_t>::Maximum())) {
206
0
    throw InternalException("Parquet writer: %d uncompressed page size out of range for type integer",
207
0
                            temp_writer.GetPosition());
208
0
  }
209
0
  hdr.uncompressed_page_size = UnsafeNumericCast<int32_t>(temp_writer.GetPosition());
210
211
  // compress the data
212
0
  CompressPage(temp_writer, write_info.compressed_size, write_info.compressed_data, write_info.compressed_buf);
213
0
  hdr.compressed_page_size = UnsafeNumericCast<int32_t>(write_info.compressed_size);
214
0
  D_ASSERT(hdr.uncompressed_page_size > 0);
215
0
  D_ASSERT(hdr.compressed_page_size > 0);
216
217
0
  if (write_info.compressed_buf) {
218
    // if the data has been compressed, we no longer need the uncompressed data
219
0
    D_ASSERT(write_info.compressed_buf.get() == write_info.compressed_data);
220
0
    write_info.temp_writer.reset();
221
0
  }
222
0
}
223
224
0
unique_ptr<ColumnWriterStatistics> PrimitiveColumnWriter::InitializeStatsState() {
225
0
  return make_uniq<ColumnWriterStatistics>();
226
0
}
227
228
idx_t PrimitiveColumnWriter::GetRowSize(const Vector &vector, const idx_t index,
229
0
                                        const PrimitiveColumnWriterState &state) const {
230
0
  throw InternalException("GetRowSize unsupported for struct/list column writers");
231
0
}
232
233
0
void PrimitiveColumnWriter::Write(ColumnWriterState &state_p, Vector &vector, idx_t count) {
234
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
235
236
0
  idx_t remaining = count;
237
0
  idx_t offset = 0;
238
0
  while (remaining > 0) {
239
0
    auto &write_info = state.write_info[state.current_page - 1];
240
0
    if (!write_info.temp_writer) {
241
0
      throw InternalException("Writes are not correctly aligned!?");
242
0
    }
243
0
    auto &temp_writer = *write_info.temp_writer;
244
0
    idx_t write_count = MinValue<idx_t>(remaining, write_info.max_write_count - write_info.write_count);
245
0
    D_ASSERT(write_count > 0);
246
247
0
    WriteVector(temp_writer, state.stats_state.get(), write_info.page_state.get(), vector, offset,
248
0
                offset + write_count);
249
250
0
    write_info.write_count += write_count;
251
0
    if (write_info.write_count == write_info.max_write_count) {
252
0
      NextPage(state);
253
0
    }
254
0
    offset += write_count;
255
0
    remaining -= write_count;
256
0
  }
257
0
}
258
259
void PrimitiveColumnWriter::SetParquetStatistics(PrimitiveColumnWriterState &state,
260
0
                                                 duckdb_parquet::ColumnChunk &column_chunk) {
261
0
  if (!state.stats_state) {
262
0
    return;
263
0
  }
264
0
  if (MaxRepeat() == 0) {
265
0
    column_chunk.meta_data.statistics.null_count = NumericCast<int64_t>(state.null_count);
266
0
    column_chunk.meta_data.statistics.__isset.null_count = true;
267
0
    column_chunk.meta_data.__isset.statistics = true;
268
0
  }
269
  // if we have NaN values - don't write the min/max here
270
0
  if (!state.stats_state->HasNaN()) {
271
    // set min/max/min_value/max_value
272
    // this code is not going to win any beauty contests, but well
273
0
    auto min = state.stats_state->GetMin();
274
0
    if (!min.empty()) {
275
0
      column_chunk.meta_data.statistics.min = std::move(min);
276
0
      column_chunk.meta_data.statistics.__isset.min = true;
277
0
      column_chunk.meta_data.__isset.statistics = true;
278
0
    }
279
0
    auto max = state.stats_state->GetMax();
280
0
    if (!max.empty()) {
281
0
      column_chunk.meta_data.statistics.max = std::move(max);
282
0
      column_chunk.meta_data.statistics.__isset.max = true;
283
0
      column_chunk.meta_data.__isset.statistics = true;
284
0
    }
285
286
0
    if (state.stats_state->HasStats()) {
287
0
      column_chunk.meta_data.statistics.min_value = state.stats_state->GetMinValue();
288
0
      column_chunk.meta_data.statistics.__isset.min_value = true;
289
0
      column_chunk.meta_data.__isset.statistics = true;
290
0
      column_chunk.meta_data.statistics.is_min_value_exact = state.stats_state->MinIsExact();
291
0
      column_chunk.meta_data.statistics.__isset.is_min_value_exact = true;
292
293
0
      column_chunk.meta_data.statistics.max_value = state.stats_state->GetMaxValue();
294
0
      column_chunk.meta_data.statistics.__isset.max_value = true;
295
0
      column_chunk.meta_data.__isset.statistics = true;
296
0
      column_chunk.meta_data.statistics.is_max_value_exact = state.stats_state->MaxIsExact();
297
0
      column_chunk.meta_data.statistics.__isset.is_max_value_exact = true;
298
0
    }
299
0
  }
300
0
  if (HasDictionary(state)) {
301
0
    column_chunk.meta_data.statistics.distinct_count = UnsafeNumericCast<int64_t>(DictionarySize(state));
302
0
    column_chunk.meta_data.statistics.__isset.distinct_count = true;
303
0
    column_chunk.meta_data.__isset.statistics = true;
304
0
  }
305
0
  for (const auto &write_info : state.write_info) {
306
    // only care about data page encodings, data_page_header.encoding is meaningless for dict
307
0
    if (write_info.page_header.type != PageType::DATA_PAGE &&
308
0
        write_info.page_header.type != PageType::DATA_PAGE_V2) {
309
0
      continue;
310
0
    }
311
0
    column_chunk.meta_data.encodings.push_back(write_info.page_header.data_page_header.encoding);
312
0
  }
313
0
}
314
315
0
void PrimitiveColumnWriter::FinalizeWrite(ColumnWriterState &state_p) {
316
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
317
0
  auto &column_chunk = state.row_group.columns[state.col_idx];
318
319
  // flush the last page (if any remains)
320
0
  FlushPage(state);
321
322
0
  auto &column_writer = writer.GetWriter();
323
0
  auto start_offset = column_writer.GetTotalWritten();
324
  // flush the dictionary
325
0
  if (HasDictionary(state)) {
326
0
    column_chunk.meta_data.statistics.distinct_count = UnsafeNumericCast<int64_t>(DictionarySize(state));
327
0
    column_chunk.meta_data.statistics.__isset.distinct_count = true;
328
0
    column_chunk.meta_data.dictionary_page_offset = UnsafeNumericCast<int64_t>(column_writer.GetTotalWritten());
329
0
    column_chunk.meta_data.__isset.dictionary_page_offset = true;
330
0
    FlushDictionary(state, state.stats_state.get());
331
0
  }
332
333
  // record the start position of the pages for this column
334
0
  column_chunk.meta_data.data_page_offset = 0;
335
0
  SetParquetStatistics(state, column_chunk);
336
337
  // write the individual pages to disk
338
0
  idx_t total_uncompressed_size = 0;
339
0
  for (auto &write_info : state.write_info) {
340
    // set the data page offset whenever we see the *first* data page
341
0
    if (column_chunk.meta_data.data_page_offset == 0 && (write_info.page_header.type == PageType::DATA_PAGE ||
342
0
                                                         write_info.page_header.type == PageType::DATA_PAGE_V2)) {
343
0
      column_chunk.meta_data.data_page_offset = UnsafeNumericCast<int64_t>(column_writer.GetTotalWritten());
344
0
    }
345
0
    D_ASSERT(write_info.page_header.uncompressed_page_size > 0);
346
0
    auto header_start_offset = column_writer.GetTotalWritten();
347
0
    writer.Write(write_info.page_header);
348
    // total uncompressed size in the column chunk includes the header size (!)
349
0
    total_uncompressed_size += column_writer.GetTotalWritten() - header_start_offset;
350
0
    total_uncompressed_size += write_info.page_header.uncompressed_page_size;
351
0
    writer.WriteData(write_info.compressed_data, write_info.compressed_size);
352
0
  }
353
0
  column_chunk.meta_data.total_compressed_size =
354
0
      UnsafeNumericCast<int64_t>(column_writer.GetTotalWritten() - start_offset);
355
0
  column_chunk.meta_data.total_uncompressed_size = UnsafeNumericCast<int64_t>(total_uncompressed_size);
356
0
  state.row_group.total_byte_size += column_chunk.meta_data.total_uncompressed_size;
357
358
0
  if (state.bloom_filter) {
359
0
    writer.BufferBloomFilter(state.col_idx, std::move(state.bloom_filter));
360
0
  }
361
362
  // finalize the stats
363
0
  writer.FlushColumnStats(state.col_idx, column_chunk, state.stats_state.get());
364
0
}
365
366
0
void PrimitiveColumnWriter::FlushDictionary(PrimitiveColumnWriterState &state, ColumnWriterStatistics *stats) {
367
0
  throw InternalException("This page does not have a dictionary");
368
0
}
369
370
0
idx_t PrimitiveColumnWriter::DictionarySize(PrimitiveColumnWriterState &state) {
371
0
  throw InternalException("This page does not have a dictionary");
372
0
}
373
374
void PrimitiveColumnWriter::WriteDictionary(PrimitiveColumnWriterState &state, unique_ptr<MemoryStream> temp_writer,
375
0
                                            idx_t row_count) {
376
0
  D_ASSERT(temp_writer);
377
0
  D_ASSERT(temp_writer->GetPosition() > 0);
378
379
  // write the dictionary page header
380
0
  PageWriteInformation write_info;
381
  // set up the header
382
0
  auto &hdr = write_info.page_header;
383
0
  hdr.uncompressed_page_size = UnsafeNumericCast<int32_t>(temp_writer->GetPosition());
384
0
  hdr.type = PageType::DICTIONARY_PAGE;
385
0
  hdr.__isset.dictionary_page_header = true;
386
387
0
  hdr.dictionary_page_header.encoding = Encoding::PLAIN;
388
0
  hdr.dictionary_page_header.is_sorted = false;
389
0
  hdr.dictionary_page_header.num_values = UnsafeNumericCast<int32_t>(row_count);
390
391
0
  write_info.temp_writer = std::move(temp_writer);
392
0
  write_info.write_count = 0;
393
0
  write_info.max_write_count = 0;
394
395
  // compress the contents of the dictionary page
396
0
  CompressPage(*write_info.temp_writer, write_info.compressed_size, write_info.compressed_data,
397
0
               write_info.compressed_buf);
398
0
  hdr.compressed_page_size = UnsafeNumericCast<int32_t>(write_info.compressed_size);
399
400
0
  if (write_info.compressed_buf) {
401
    // if the data has been compressed, we no longer need the uncompressed data
402
0
    D_ASSERT(write_info.compressed_buf.get() == write_info.compressed_data);
403
0
    write_info.temp_writer.reset();
404
0
  }
405
406
  // insert the dictionary page as the first page to write for this column
407
0
  state.write_info.insert(state.write_info.begin(), std::move(write_info));
408
0
}
409
410
} // namespace duckdb