Coverage Report

Created: 2025-06-24 07:53

/src/duckdb/extension/parquet/include/parquet_writer.hpp
Line
Count
Source (jump to first uncovered line)
1
//===----------------------------------------------------------------------===//
2
//                         DuckDB
3
//
4
// parquet_writer.hpp
5
//
6
//
7
//===----------------------------------------------------------------------===//
8
9
#pragma once
10
11
#include "duckdb.hpp"
12
#include "duckdb/common/common.hpp"
13
#include "duckdb/common/encryption_state.hpp"
14
#include "duckdb/common/exception.hpp"
15
#include "duckdb/common/mutex.hpp"
16
#include "duckdb/common/atomic.hpp"
17
#include "duckdb/common/serializer/buffered_file_writer.hpp"
18
#include "duckdb/common/types/column/column_data_collection.hpp"
19
#include "duckdb/function/copy_function.hpp"
20
21
#include "parquet_statistics.hpp"
22
#include "column_writer.hpp"
23
#include "parquet_types.h"
24
#include "geo_parquet.hpp"
25
#include "writer/parquet_write_stats.hpp"
26
#include "thrift/protocol/TCompactProtocol.h"
27
28
namespace duckdb {
29
class FileSystem;
30
class FileOpener;
31
class ParquetEncryptionConfig;
32
class ParquetStatsAccumulator;
33
34
class Serializer;
35
class Deserializer;
36
37
class ColumnWriterStatistics;
38
struct CopyFunctionFileStatistics;
39
40
struct PreparedRowGroup {
41
  duckdb_parquet::RowGroup row_group;
42
  vector<unique_ptr<ColumnWriterState>> states;
43
};
44
45
struct FieldID;
46
struct ChildFieldIDs {
47
  ChildFieldIDs();
48
  ChildFieldIDs Copy() const;
49
  unique_ptr<case_insensitive_map_t<FieldID>> ids;
50
51
  void Serialize(Serializer &serializer) const;
52
  static ChildFieldIDs Deserialize(Deserializer &source);
53
};
54
55
struct FieldID {
56
  static constexpr const auto DUCKDB_FIELD_ID = "__duckdb_field_id";
57
  FieldID();
58
  explicit FieldID(int32_t field_id);
59
  FieldID Copy() const;
60
  bool set;
61
  int32_t field_id;
62
  ChildFieldIDs child_field_ids;
63
64
  void Serialize(Serializer &serializer) const;
65
  static FieldID Deserialize(Deserializer &source);
66
};
67
68
struct ParquetBloomFilterEntry {
69
  unique_ptr<ParquetBloomFilter> bloom_filter;
70
  idx_t row_group_idx;
71
  idx_t column_idx;
72
};
73
74
enum class ParquetVersion : uint8_t {
75
  V1 = 1, //! Excludes DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, BYTE_STREAM_SPLIT
76
  V2 = 2, //! Includes the encodings above
77
};
78
79
class ParquetWriter {
80
public:
81
  ParquetWriter(ClientContext &context, FileSystem &fs, string file_name, vector<LogicalType> types,
82
                vector<string> names, duckdb_parquet::CompressionCodec::type codec, ChildFieldIDs field_ids,
83
                const vector<pair<string, string>> &kv_metadata,
84
                shared_ptr<ParquetEncryptionConfig> encryption_config, idx_t dictionary_size_limit,
85
                idx_t string_dictionary_page_size_limit, double bloom_filter_false_positive_ratio,
86
                int64_t compression_level, bool debug_use_openssl, ParquetVersion parquet_version);
87
  ~ParquetWriter();
88
89
public:
90
  void PrepareRowGroup(ColumnDataCollection &buffer, PreparedRowGroup &result);
91
  void FlushRowGroup(PreparedRowGroup &row_group);
92
  void Flush(ColumnDataCollection &buffer);
93
  void Finalize();
94
95
  static duckdb_parquet::Type::type DuckDBTypeToParquetType(const LogicalType &duckdb_type);
96
  static void SetSchemaProperties(const LogicalType &duckdb_type, duckdb_parquet::SchemaElement &schema_ele);
97
98
0
  ClientContext &GetContext() {
99
0
    return context;
100
0
  }
101
0
  duckdb_apache::thrift::protocol::TProtocol *GetProtocol() {
102
0
    return protocol.get();
103
0
  }
104
0
  duckdb_parquet::CompressionCodec::type GetCodec() {
105
0
    return codec;
106
0
  }
107
0
  duckdb_parquet::Type::type GetType(idx_t schema_idx) {
108
0
    return file_meta_data.schema[schema_idx].type;
109
0
  }
110
0
  LogicalType GetSQLType(idx_t schema_idx) const {
111
0
    return sql_types[schema_idx];
112
0
  }
113
0
  BufferedFileWriter &GetWriter() {
114
0
    return *writer;
115
0
  }
116
0
  idx_t FileSize() {
117
0
    return total_written;
118
0
  }
119
0
  idx_t DictionarySizeLimit() const {
120
0
    return dictionary_size_limit;
121
0
  }
122
0
  idx_t StringDictionaryPageSizeLimit() const {
123
0
    return string_dictionary_page_size_limit;
124
0
  }
125
0
  double BloomFilterFalsePositiveRatio() const {
126
0
    return bloom_filter_false_positive_ratio;
127
0
  }
128
0
  int64_t CompressionLevel() const {
129
0
    return compression_level;
130
0
  }
131
0
  idx_t NumberOfRowGroups() {
132
0
    return num_row_groups;
133
0
  }
134
0
  ParquetVersion GetParquetVersion() const {
135
0
    return parquet_version;
136
0
  }
137
0
  const string &GetFileName() const {
138
0
    return file_name;
139
0
  }
140
141
  uint32_t Write(const duckdb_apache::thrift::TBase &object);
142
  uint32_t WriteData(const const_data_ptr_t buffer, const uint32_t buffer_size);
143
144
  GeoParquetFileMetadata &GetGeoParquetData();
145
146
  static bool TryGetParquetType(const LogicalType &duckdb_type,
147
                                optional_ptr<duckdb_parquet::Type::type> type = nullptr);
148
149
  void BufferBloomFilter(idx_t col_idx, unique_ptr<ParquetBloomFilter> bloom_filter);
150
  void SetWrittenStatistics(CopyFunctionFileStatistics &written_stats);
151
  void FlushColumnStats(idx_t col_idx, duckdb_parquet::ColumnChunk &chunk,
152
                        optional_ptr<ColumnWriterStatistics> writer_stats);
153
154
private:
155
  void GatherWrittenStatistics();
156
157
private:
158
  ClientContext &context;
159
  string file_name;
160
  vector<LogicalType> sql_types;
161
  vector<string> column_names;
162
  duckdb_parquet::CompressionCodec::type codec;
163
  ChildFieldIDs field_ids;
164
  shared_ptr<ParquetEncryptionConfig> encryption_config;
165
  idx_t dictionary_size_limit;
166
  idx_t string_dictionary_page_size_limit;
167
  double bloom_filter_false_positive_ratio;
168
  int64_t compression_level;
169
  bool debug_use_openssl;
170
  shared_ptr<EncryptionUtil> encryption_util;
171
  ParquetVersion parquet_version;
172
  vector<ParquetColumnSchema> column_schemas;
173
174
  unique_ptr<BufferedFileWriter> writer;
175
  //! Atomics to reduce contention when rotating writes to multiple Parquet files
176
  atomic<idx_t> total_written;
177
  atomic<idx_t> num_row_groups;
178
  std::shared_ptr<duckdb_apache::thrift::protocol::TProtocol> protocol;
179
  duckdb_parquet::FileMetaData file_meta_data;
180
  std::mutex lock;
181
182
  vector<unique_ptr<ColumnWriter>> column_writers;
183
184
  unique_ptr<GeoParquetFileMetadata> geoparquet_data;
185
  vector<ParquetBloomFilterEntry> bloom_filters;
186
187
  optional_ptr<CopyFunctionFileStatistics> written_stats;
188
  unique_ptr<ParquetStatsAccumulator> stats_accumulator;
189
};
190
191
} // namespace duckdb