Coverage Report

Created: 2026-06-30 06:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/extension/parquet/writer/primitive_column_writer.cpp
Line
Count
Source
1
#include "writer/primitive_column_writer.hpp"
2
3
#include <functional>
4
#include <utility>
5
#include <vector>
6
7
#include "parquet_rle_bp_decoder.hpp"
8
#include "parquet_rle_bp_encoder.hpp"
9
#include "parquet_writer.hpp"
10
#include "duckdb/common/assert.hpp"
11
#include "duckdb/common/constants.hpp"
12
#include "duckdb/common/exception.hpp"
13
#include "duckdb/common/helper.hpp"
14
#include "duckdb/common/limits.hpp"
15
#include "duckdb/common/numeric_utils.hpp"
16
#include "duckdb/common/optional_ptr.hpp"
17
#include "duckdb/common/primitive_dictionary.hpp"
18
#include "duckdb/common/serializer/write_stream.hpp"
19
#include "duckdb/common/types.hpp"
20
#include "duckdb/common/types/validity_mask.hpp"
21
#include "duckdb/common/types/vector.hpp"
22
#include "duckdb/common/vector/flat_vector.hpp"
23
#include "parquet_column_schema.hpp"
24
#include "parquet_geometry.hpp"
25
26
namespace duckdb {
27
using duckdb_parquet::Encoding;
28
using duckdb_parquet::PageType;
29
30
constexpr const idx_t PrimitiveColumnWriter::MAX_UNCOMPRESSED_PAGE_SIZE;
31
constexpr const idx_t PrimitiveColumnWriter::MAX_UNCOMPRESSED_DICT_PAGE_SIZE;
32
33
class ParquetPagePayloadBuffer : public AsyncWriteBuffer {
34
public:
35
  ParquetPagePayloadBuffer(idx_t size_p, unique_ptr<MemoryStream> temp_writer_p, AllocatedData compressed_buf_p)
36
0
      : size(size_p), temp_writer(std::move(temp_writer_p)), compressed_buf(std::move(compressed_buf_p)) {
37
0
    D_ASSERT(temp_writer || compressed_buf.IsSet());
38
0
  }
39
40
0
  data_ptr_t Ptr() override {
41
0
    if (compressed_buf.IsSet()) {
42
0
      return compressed_buf.get();
43
0
    }
44
0
    D_ASSERT(temp_writer);
45
0
    return temp_writer->GetData();
46
0
  }
47
48
0
  idx_t Size() const override {
49
0
    return size;
50
0
  }
51
52
private:
53
  idx_t size;
54
  unique_ptr<MemoryStream> temp_writer;
55
  AllocatedData compressed_buf;
56
};
57
58
0
static PageWriteInformation CreateDictionaryPageWriteInformation(idx_t uncompressed_size, idx_t row_count) {
59
0
  PageWriteInformation write_info;
60
0
  auto &hdr = write_info.page_header;
61
0
  hdr.uncompressed_page_size = UnsafeNumericCast<int32_t>(uncompressed_size);
62
0
  hdr.type = PageType::DICTIONARY_PAGE;
63
0
  hdr.__isset.dictionary_page_header = true;
64
65
0
  hdr.dictionary_page_header.encoding = Encoding::PLAIN;
66
0
  hdr.dictionary_page_header.is_sorted = false;
67
0
  hdr.dictionary_page_header.num_values = UnsafeNumericCast<int32_t>(row_count);
68
69
0
  write_info.write_count = 0;
70
0
  write_info.max_write_count = 0;
71
0
  return write_info;
72
0
}
73
74
PrimitiveColumnWriter::PrimitiveColumnWriter(ParquetWriter &writer, ParquetColumnSchema &&column_schema,
75
                                             vector<Identifier> schema_path)
76
0
    : ColumnWriter(writer, std::move(column_schema), std::move(schema_path)) {
77
0
}
78
79
0
unique_ptr<ColumnWriterState> PrimitiveColumnWriter::InitializeWriteState(duckdb_parquet::RowGroup &row_group) {
80
0
  auto result = make_uniq<PrimitiveColumnWriterState>(writer, row_group, row_group.columns.size());
81
0
  RegisterToRowGroup(row_group);
82
0
  return std::move(result);
83
0
}
84
85
0
void PrimitiveColumnWriter::RegisterToRowGroup(duckdb_parquet::RowGroup &row_group) {
86
0
  duckdb_parquet::ColumnChunk column_chunk;
87
0
  column_chunk.__isset.meta_data = true;
88
0
  column_chunk.meta_data.codec = writer.GetCodec();
89
0
  column_chunk.meta_data.path_in_schema = IdentifiersToStrings(schema_path);
90
0
  column_chunk.meta_data.num_values = 0;
91
0
  column_chunk.meta_data.type = writer.GetType(SchemaIndex());
92
0
  row_group.columns.push_back(std::move(column_chunk));
93
0
}
94
95
unique_ptr<ColumnWriterPageState> PrimitiveColumnWriter::InitializePageState(PrimitiveColumnWriterState &state,
96
0
                                                                             idx_t page_idx) {
97
0
  return nullptr;
98
0
}
99
100
0
void PrimitiveColumnWriter::FlushPageState(WriteStream &temp_writer, ColumnWriterPageState *state) {
101
0
}
102
103
void PrimitiveColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count,
104
0
                                    bool vector_can_span_multiple_pages) {
105
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
106
0
  auto &col_chunk = state.row_group.columns[state.col_idx];
107
108
0
  idx_t vcount = parent ? parent->definition_levels.size() - state.definition_levels.size() : count;
109
0
  idx_t parent_index = state.definition_levels.size();
110
0
  auto &validity = FlatVector::ValidityMutable(vector);
111
0
  HandleRepeatLevels(state, parent, count);
112
0
  HandleDefineLevels(state, parent, validity, count, MaxDefine(), MaxDefine() - 1);
113
114
0
  idx_t vector_index = 0;
115
0
  reference<PageInformation> page_info_ref = state.page_info.back();
116
0
  col_chunk.meta_data.num_values += NumericCast<int64_t>(vcount);
117
118
0
  const bool check_parent_empty = parent && !parent->is_empty.empty();
119
0
  if (!check_parent_empty && validity.CannotHaveNull() && TypeIsConstantSize(vector.GetType().InternalType()) &&
120
0
      page_info_ref.get().estimated_page_size + GetRowSize(vector, vector_index, state) * vcount <
121
0
          MAX_UNCOMPRESSED_PAGE_SIZE) {
122
    // Fast path: fixed-size type, all valid, and it fits on the current page
123
0
    auto &page_info = page_info_ref.get();
124
0
    page_info.row_count += vcount;
125
0
    page_info.estimated_page_size += GetRowSize(vector, vector_index, state) * vcount;
126
0
  } else {
127
0
    for (idx_t i = 0; i < vcount; i++) {
128
0
      auto &page_info = page_info_ref.get();
129
0
      page_info.row_count++;
130
0
      if (check_parent_empty && parent->is_empty[parent_index + i]) {
131
0
        page_info.empty_count++;
132
0
        continue;
133
0
      }
134
0
      if (validity.RowIsValid(vector_index)) {
135
0
        page_info.estimated_page_size += GetRowSize(vector, vector_index, state);
136
0
        if (page_info.estimated_page_size >= MAX_UNCOMPRESSED_PAGE_SIZE) {
137
0
          if (!vector_can_span_multiple_pages && i != 0) {
138
            // Vector is not allowed to span multiple pages, and we already started writing it
139
0
            continue;
140
0
          }
141
0
          PageInformation new_info;
142
0
          new_info.offset = page_info.offset + page_info.row_count;
143
0
          state.page_info.push_back(new_info);
144
0
          page_info_ref = state.page_info.back();
145
0
        }
146
0
      } else {
147
0
        page_info.null_count++;
148
0
      }
149
0
      vector_index++;
150
0
    }
151
0
  }
152
0
}
153
154
0
duckdb_parquet::Encoding::type PrimitiveColumnWriter::GetEncoding(PrimitiveColumnWriterState &state) {
155
0
  return Encoding::PLAIN;
156
0
}
157
158
0
void PrimitiveColumnWriter::BeginWrite(ColumnWriterState &state_p) {
159
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
160
161
  // set up the page write info
162
0
  state.stats_state = InitializeStatsState();
163
0
  for (idx_t page_idx = 0; page_idx < state.page_info.size(); page_idx++) {
164
0
    auto &page_info = state.page_info[page_idx];
165
0
    if (page_info.row_count == 0) {
166
0
      D_ASSERT(page_idx + 1 == state.page_info.size());
167
0
      state.page_info.erase_at(page_idx);
168
0
      break;
169
0
    }
170
0
    PageWriteInformation write_info;
171
    // set up the header
172
0
    auto &hdr = write_info.page_header;
173
0
    hdr.compressed_page_size = 0;
174
0
    hdr.uncompressed_page_size = 0;
175
0
    hdr.type = PageType::DATA_PAGE;
176
0
    hdr.__isset.data_page_header = true;
177
178
0
    hdr.data_page_header.num_values = NumericCast<int32_t>(page_info.row_count);
179
0
    hdr.data_page_header.encoding = GetEncoding(state);
180
0
    hdr.data_page_header.definition_level_encoding = Encoding::RLE;
181
0
    hdr.data_page_header.repetition_level_encoding = Encoding::RLE;
182
183
0
    write_info.temp_writer = make_uniq<MemoryStream>(
184
0
        BufferAllocator::Get(writer.GetContext()),
185
0
        MaxValue<idx_t>(NextPowerOfTwo(page_info.estimated_page_size), MemoryStream::DEFAULT_INITIAL_CAPACITY));
186
0
    write_info.write_count = page_info.empty_count;
187
0
    write_info.max_write_count = page_info.row_count;
188
0
    write_info.page_state = InitializePageState(state, page_idx);
189
190
0
    write_info.compressed_size = 0;
191
0
    write_info.compressed_data = nullptr;
192
193
0
    state.write_info.push_back(std::move(write_info));
194
0
  }
195
196
  // start writing the first page
197
0
  NextPage(state);
198
0
}
199
200
void PrimitiveColumnWriter::WriteLevels(Allocator &allocator, WriteStream &temp_writer,
201
                                        const unsafe_vector<uint16_t> &levels, idx_t max_value, idx_t offset,
202
0
                                        idx_t count, optional_idx null_count) {
203
  // For definition levels: the column is REQUIRED, nothing to encode.
204
  // For repetition levels: the column is not repeated, every value appears exactly once, nothing to encode.
205
0
  if (max_value == 0) {
206
0
    return;
207
0
  }
208
209
0
  if (levels.empty() || count == 0) {
210
0
    return;
211
0
  }
212
213
  // write the levels using the RLE-BP encoding
214
0
  const auto bit_width = RleBpDecoder::ComputeBitWidthFromMaxValue(max_value);
215
0
  RleBpEncoder rle_encoder(bit_width);
216
217
  // have to write to an intermediate stream first because we need to know the size
218
0
  MemoryStream intermediate_stream(allocator);
219
220
0
  rle_encoder.BeginWrite();
221
0
  if (null_count.IsValid() && null_count.GetIndex() == 0) {
222
    // Fast path: no nulls
223
0
    rle_encoder.WriteMany(intermediate_stream, levels[0], count);
224
0
  } else {
225
0
    for (idx_t i = offset; i < offset + count; i++) {
226
0
      rle_encoder.WriteValue(intermediate_stream, levels[i]);
227
0
    }
228
0
  }
229
0
  rle_encoder.FinishWrite(intermediate_stream);
230
231
  // start off by writing the byte count as a uint32_t
232
0
  temp_writer.Write(NumericCast<uint32_t>(intermediate_stream.GetPosition()));
233
  // copy over the written data
234
0
  temp_writer.WriteData(intermediate_stream.GetData(), intermediate_stream.GetPosition());
235
0
}
236
237
0
void PrimitiveColumnWriter::NextPage(PrimitiveColumnWriterState &state) {
238
0
  if (state.current_page > 0) {
239
    // need to flush the current page
240
0
    FlushPage(state);
241
0
  }
242
0
  if (state.current_page >= state.write_info.size()) {
243
0
    state.current_page = state.write_info.size() + 1;
244
0
    return;
245
0
  }
246
0
  auto &page_info = state.page_info[state.current_page];
247
0
  auto &write_info = state.write_info[state.current_page];
248
0
  state.current_page++;
249
250
0
  auto &temp_writer = *write_info.temp_writer;
251
252
  // write the repetition levels
253
0
  auto &allocator = BufferAllocator::Get(writer.GetContext());
254
0
  WriteLevels(allocator, temp_writer, state.repetition_levels, MaxRepeat(), page_info.offset, page_info.row_count);
255
256
  // write the definition levels
257
0
  WriteLevels(allocator, temp_writer, state.definition_levels, MaxDefine(), page_info.offset, page_info.row_count,
258
0
              state.null_count + state.parent_null_count);
259
0
}
260
261
0
void PrimitiveColumnWriter::FlushPage(PrimitiveColumnWriterState &state) {
262
0
  D_ASSERT(state.current_page > 0);
263
0
  if (state.current_page > state.write_info.size()) {
264
0
    return;
265
0
  }
266
267
  // compress the page info
268
0
  auto &write_info = state.write_info[state.current_page - 1];
269
0
  auto &temp_writer = *write_info.temp_writer;
270
0
  auto &hdr = write_info.page_header;
271
272
0
  FlushPageState(temp_writer, write_info.page_state.get());
273
274
  // now that we have finished writing the data we know the uncompressed size
275
0
  if (temp_writer.GetPosition() > idx_t(NumericLimits<int32_t>::Maximum())) {
276
0
    throw InternalException("Parquet writer: %d uncompressed page size out of range for type integer",
277
0
                            temp_writer.GetPosition());
278
0
  }
279
0
  hdr.uncompressed_page_size = UnsafeNumericCast<int32_t>(temp_writer.GetPosition());
280
281
  // compress the data
282
0
  CompressPage(temp_writer, write_info.compressed_size, write_info.compressed_data, write_info.compressed_buf);
283
0
  hdr.compressed_page_size = UnsafeNumericCast<int32_t>(write_info.compressed_size);
284
0
  D_ASSERT(hdr.uncompressed_page_size > 0);
285
0
  D_ASSERT(hdr.compressed_page_size > 0);
286
287
0
  if (write_info.compressed_buf.IsSet()) {
288
    // if the data has been compressed, we no longer need the uncompressed data
289
0
    D_ASSERT(write_info.compressed_buf.get() == write_info.compressed_data);
290
0
    write_info.temp_writer.reset();
291
0
  }
292
0
}
293
294
0
unique_ptr<ColumnWriterStatistics> PrimitiveColumnWriter::InitializeStatsState() {
295
0
  return make_uniq<ColumnWriterStatistics>();
296
0
}
297
298
idx_t PrimitiveColumnWriter::GetRowSize(const Vector &vector, const idx_t index,
299
0
                                        const PrimitiveColumnWriterState &state) const {
300
0
  throw InternalException("GetRowSize unsupported for struct/list column writers");
301
0
}
302
303
0
void PrimitiveColumnWriter::Write(ColumnWriterState &state_p, Vector &vector, idx_t count) {
304
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
305
306
0
  idx_t remaining = count;
307
0
  idx_t offset = 0;
308
0
  while (remaining > 0) {
309
0
    auto &write_info = state.write_info[state.current_page - 1];
310
0
    if (!write_info.temp_writer) {
311
0
      throw InternalException("Writes are not correctly aligned!?");
312
0
    }
313
0
    auto &temp_writer = *write_info.temp_writer;
314
0
    idx_t write_count = MinValue<idx_t>(remaining, write_info.max_write_count - write_info.write_count);
315
0
    D_ASSERT(write_count > 0);
316
317
0
    WriteVector(temp_writer, state.stats_state.get(), write_info.page_state.get(), vector, offset,
318
0
                offset + write_count);
319
320
0
    write_info.write_count += write_count;
321
0
    if (write_info.write_count == write_info.max_write_count) {
322
0
      NextPage(state);
323
0
    }
324
0
    offset += write_count;
325
0
    remaining -= write_count;
326
0
  }
327
0
}
328
329
void PrimitiveColumnWriter::SetParquetStatistics(PrimitiveColumnWriterState &state,
330
0
                                                 duckdb_parquet::ColumnChunk &column_chunk) {
331
0
  auto add_encoding = [&](duckdb_parquet::Encoding::type encoding) {
332
0
    for (const auto &existing_encoding : column_chunk.meta_data.encodings) {
333
0
      if (existing_encoding == encoding) {
334
0
        return;
335
0
      }
336
0
    }
337
0
    column_chunk.meta_data.encodings.push_back(encoding);
338
0
  };
339
340
0
  for (const auto &write_info : state.write_info) {
341
    // only care about data page encodings, data_page_header.encoding is meaningless for dict
342
0
    switch (write_info.page_header.type) {
343
0
    case PageType::DATA_PAGE:
344
0
      add_encoding(write_info.page_header.data_page_header.encoding);
345
0
      break;
346
0
    case PageType::DATA_PAGE_V2:
347
0
      add_encoding(write_info.page_header.data_page_header_v2.encoding);
348
0
      break;
349
0
    default:
350
0
      break;
351
0
    }
352
0
  }
353
354
0
  if (!state.stats_state) {
355
0
    return;
356
0
  }
357
0
  auto null_count = MaxRepeat() == 0 ? state.null_count : state.null_count + state.parent_null_count;
358
0
  column_chunk.meta_data.statistics.null_count = NumericCast<int64_t>(null_count);
359
0
  column_chunk.meta_data.statistics.__isset.null_count = true;
360
0
  column_chunk.meta_data.__isset.statistics = true;
361
362
  // if we have NaN values - don't write the min/max here
363
0
  if (!state.stats_state->HasNaN()) {
364
    // set min/max/min_value/max_value
365
    // this code is not going to win any beauty contests, but well
366
0
    auto min = state.stats_state->GetMin();
367
0
    if (!min.empty()) {
368
0
      column_chunk.meta_data.statistics.min = std::move(min);
369
0
      column_chunk.meta_data.statistics.__isset.min = true;
370
0
      column_chunk.meta_data.__isset.statistics = true;
371
0
    }
372
0
    auto max = state.stats_state->GetMax();
373
0
    if (!max.empty()) {
374
0
      column_chunk.meta_data.statistics.max = std::move(max);
375
0
      column_chunk.meta_data.statistics.__isset.max = true;
376
0
      column_chunk.meta_data.__isset.statistics = true;
377
0
    }
378
379
0
    if (state.stats_state->HasStats()) {
380
0
      column_chunk.meta_data.statistics.min_value = state.stats_state->GetMinValue();
381
0
      column_chunk.meta_data.statistics.__isset.min_value = true;
382
0
      column_chunk.meta_data.__isset.statistics = true;
383
0
      column_chunk.meta_data.statistics.is_min_value_exact = state.stats_state->MinIsExact();
384
0
      column_chunk.meta_data.statistics.__isset.is_min_value_exact = true;
385
386
0
      column_chunk.meta_data.statistics.max_value = state.stats_state->GetMaxValue();
387
0
      column_chunk.meta_data.statistics.__isset.max_value = true;
388
0
      column_chunk.meta_data.__isset.statistics = true;
389
0
      column_chunk.meta_data.statistics.is_max_value_exact = state.stats_state->MaxIsExact();
390
0
      column_chunk.meta_data.statistics.__isset.is_max_value_exact = true;
391
0
    }
392
0
  }
393
0
  if (HasDictionary(state)) {
394
0
    column_chunk.meta_data.statistics.distinct_count = UnsafeNumericCast<int64_t>(DictionarySize(state));
395
0
    column_chunk.meta_data.statistics.__isset.distinct_count = true;
396
0
    column_chunk.meta_data.__isset.statistics = true;
397
0
  }
398
399
0
  if (state.stats_state->HasGeoStats()) {
400
0
    auto gpq_version = writer.GetGeoParquetVersion();
401
402
0
    const auto has_real_stats = gpq_version == GeoParquetVersion::NONE || gpq_version == GeoParquetVersion::BOTH ||
403
0
                                gpq_version == GeoParquetVersion::V2;
404
0
    const auto has_json_stats = gpq_version == GeoParquetVersion::V1 || gpq_version == GeoParquetVersion::BOTH ||
405
0
                                gpq_version == GeoParquetVersion::V2;
406
407
0
    if (has_real_stats) {
408
      // Write the parquet native geospatial statistics
409
0
      column_chunk.meta_data.__isset.geospatial_statistics = true;
410
0
      state.stats_state->WriteGeoStats(column_chunk.meta_data.geospatial_statistics);
411
0
    }
412
0
    if (has_json_stats) {
413
      // Add the geospatial statistics to the extra GeoParquet metadata
414
0
      writer.GetGeoParquetData().AddGeoParquetStats(writer.GetContext(), column_schema.name, column_schema.type,
415
0
                                                    *state.stats_state->GetGeoStats(), gpq_version);
416
0
    }
417
0
  }
418
0
}
419
420
0
void PrimitiveColumnWriter::PrepareWrite(ColumnWriterState &state_p) {
421
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
422
423
  // Flush the last page and materialize any dictionary page before the row-group flush path.
424
0
  FlushPage(state);
425
0
  if (HasDictionary(state)) {
426
0
    FlushDictionary(state, state.stats_state.get());
427
0
  }
428
429
0
  for (auto &write_info : state.write_info) {
430
0
    D_ASSERT(write_info.page_header.uncompressed_page_size > 0);
431
0
    D_ASSERT(!write_info.prepared_header);
432
0
    D_ASSERT(!write_info.prepared_payload);
433
434
0
    write_info.prepared_header = writer.PrepareWrite(write_info.page_header);
435
0
    auto payload_buffer = make_uniq<ParquetPagePayloadBuffer>(
436
0
        write_info.compressed_size, std::move(write_info.temp_writer), std::move(write_info.compressed_buf));
437
0
    write_info.prepared_payload = writer.PrepareWriteData(std::move(payload_buffer));
438
0
  }
439
0
}
440
441
0
void PrimitiveColumnWriter::FinalizeWrite(ColumnWriterState &state_p) {
442
0
  auto &state = state_p.Cast<PrimitiveColumnWriterState>();
443
0
  auto &column_chunk = state.row_group.columns[state.col_idx];
444
445
0
  auto &column_writer = writer.GetWriter();
446
0
  auto start_offset = column_writer.GetTotalWritten();
447
0
  if (HasDictionary(state)) {
448
0
    column_chunk.meta_data.statistics.distinct_count = UnsafeNumericCast<int64_t>(DictionarySize(state));
449
0
    column_chunk.meta_data.statistics.__isset.distinct_count = true;
450
0
    column_chunk.meta_data.dictionary_page_offset = UnsafeNumericCast<int64_t>(column_writer.GetTotalWritten());
451
0
    column_chunk.meta_data.__isset.dictionary_page_offset = true;
452
0
  }
453
454
  // record the start position of the pages for this column
455
0
  column_chunk.meta_data.data_page_offset = 0;
456
0
  SetParquetStatistics(state, column_chunk);
457
458
  // write the individual pages to disk
459
0
  idx_t total_uncompressed_size = 0;
460
0
  for (auto &write_info : state.write_info) {
461
    // set the data page offset whenever we see the *first* data page
462
0
    if (column_chunk.meta_data.data_page_offset == 0 && (write_info.page_header.type == PageType::DATA_PAGE ||
463
0
                                                         write_info.page_header.type == PageType::DATA_PAGE_V2)) {
464
0
      column_chunk.meta_data.data_page_offset = UnsafeNumericCast<int64_t>(column_writer.GetTotalWritten());
465
0
    }
466
0
    D_ASSERT(write_info.page_header.uncompressed_page_size > 0);
467
0
    D_ASSERT(write_info.prepared_header);
468
0
    D_ASSERT(write_info.prepared_payload);
469
    // total uncompressed size in the column chunk includes the header size (!)
470
0
    total_uncompressed_size += write_info.prepared_header->Size();
471
0
    total_uncompressed_size += write_info.page_header.uncompressed_page_size;
472
0
    writer.WriteData(std::move(write_info.prepared_header));
473
0
    writer.WriteData(std::move(write_info.prepared_payload));
474
0
  }
475
0
  column_chunk.meta_data.total_compressed_size =
476
0
      UnsafeNumericCast<int64_t>(column_writer.GetTotalWritten() - start_offset);
477
0
  column_chunk.meta_data.total_uncompressed_size = UnsafeNumericCast<int64_t>(total_uncompressed_size);
478
0
  state.row_group.total_byte_size += column_chunk.meta_data.total_uncompressed_size;
479
480
0
  if (state.bloom_filter) {
481
0
    writer.BufferBloomFilter(state.col_idx, std::move(state.bloom_filter));
482
0
  }
483
  // finalize the stats
484
0
  writer.FlushColumnStats(state.col_idx, column_chunk, state.stats_state.get());
485
0
}
486
487
0
void PrimitiveColumnWriter::FlushDictionary(PrimitiveColumnWriterState &state, ColumnWriterStatistics *stats) {
488
0
  throw InternalException("This page does not have a dictionary");
489
0
}
490
491
0
idx_t PrimitiveColumnWriter::DictionarySize(PrimitiveColumnWriterState &state) {
492
0
  throw InternalException("This page does not have a dictionary");
493
0
}
494
495
void PrimitiveColumnWriter::WriteDictionary(PrimitiveColumnWriterState &state, unique_ptr<MemoryStream> temp_writer,
496
0
                                            idx_t row_count) {
497
0
  D_ASSERT(temp_writer);
498
0
  D_ASSERT(temp_writer->GetPosition() > 0);
499
500
0
  auto write_info = CreateDictionaryPageWriteInformation(temp_writer->GetPosition(), row_count);
501
0
  write_info.temp_writer = std::move(temp_writer);
502
503
  // compress the contents of the dictionary page
504
0
  CompressPage(*write_info.temp_writer, write_info.compressed_size, write_info.compressed_data,
505
0
               write_info.compressed_buf);
506
0
  write_info.page_header.compressed_page_size = UnsafeNumericCast<int32_t>(write_info.compressed_size);
507
508
0
  if (write_info.compressed_buf.IsSet()) {
509
    // if the data has been compressed, we no longer need the uncompressed data
510
0
    D_ASSERT(write_info.compressed_buf.get() == write_info.compressed_data);
511
0
    write_info.temp_writer.reset();
512
0
  }
513
514
  // insert the dictionary page as the first page to write for this column
515
0
  state.write_info.insert(state.write_info.begin(), std::move(write_info));
516
0
}
517
518
void PrimitiveColumnWriter::WriteDictionary(PrimitiveColumnWriterState &state,
519
0
                                            PrimitiveDictionaryTargetData target_data, idx_t row_count) {
520
0
  D_ASSERT(target_data.data.IsSet());
521
0
  D_ASSERT(target_data.size > 0);
522
523
0
  auto write_info = CreateDictionaryPageWriteInformation(target_data.size, row_count);
524
525
  // compress the contents of the dictionary page
526
0
  MemoryStream temp_writer(target_data.data.get(), target_data.data.GetSize());
527
0
  temp_writer.SetPosition(target_data.size);
528
0
  CompressPage(temp_writer, write_info.compressed_size, write_info.compressed_data, write_info.compressed_buf);
529
0
  write_info.page_header.compressed_page_size = UnsafeNumericCast<int32_t>(write_info.compressed_size);
530
531
0
  if (!write_info.compressed_buf.IsSet()) {
532
0
    D_ASSERT(write_info.compressed_data == target_data.data.get());
533
0
    write_info.compressed_buf = std::move(target_data.data);
534
0
  }
535
536
  // insert the dictionary page as the first page to write for this column
537
0
  state.write_info.insert(state.write_info.begin(), std::move(write_info));
538
0
}
539
540
0
idx_t PrimitiveColumnWriter::FinalizeSchema(vector<duckdb_parquet::SchemaElement> &schemas) {
541
0
  idx_t schema_idx = schemas.size();
542
543
0
  auto &schema = column_schema;
544
0
  schema.SetSchemaIndex(schema_idx);
545
546
0
  auto &repetition_type = schema.repetition_type;
547
0
  auto &name = schema.name;
548
0
  auto &field_id = schema.field_id;
549
0
  auto &type = schema.type;
550
0
  auto allow_geometry = schema.allow_geometry;
551
552
0
  duckdb_parquet::SchemaElement schema_element;
553
0
  schema_element.type = ParquetWriter::DuckDBTypeToParquetType(type, writer.WriteTimestampAsInt96());
554
0
  schema_element.repetition_type = repetition_type;
555
0
  schema_element.__isset.num_children = false;
556
0
  schema_element.__isset.type = true;
557
0
  schema_element.__isset.repetition_type = true;
558
0
  schema_element.name = name;
559
0
  if (field_id.IsValid()) {
560
0
    schema_element.__isset.field_id = true;
561
0
    schema_element.field_id = NumericCast<int32_t>(field_id.GetIndex());
562
0
  }
563
0
  ParquetWriter::SetSchemaProperties(type, schema_element, allow_geometry, writer.GetContext(),
564
0
                                     writer.WriteTimestampAsInt96(), writer.TimestampIsAdjustedToUTC());
565
0
  schemas.push_back(std::move(schema_element));
566
567
0
  D_ASSERT(child_writers.empty());
568
0
  return 1;
569
0
}
570
571
} // namespace duckdb