/src/duckdb/extension/parquet/include/parquet_writer.hpp
Line | Count | Source (jump to first uncovered line) |
1 | | //===----------------------------------------------------------------------===// |
2 | | // DuckDB |
3 | | // |
4 | | // parquet_writer.hpp |
5 | | // |
6 | | // |
7 | | //===----------------------------------------------------------------------===// |
8 | | |
9 | | #pragma once |
10 | | |
11 | | #include "duckdb.hpp" |
12 | | #include "duckdb/common/common.hpp" |
13 | | #include "duckdb/common/encryption_state.hpp" |
14 | | #include "duckdb/common/exception.hpp" |
15 | | #include "duckdb/common/mutex.hpp" |
16 | | #include "duckdb/common/atomic.hpp" |
17 | | #include "duckdb/common/serializer/buffered_file_writer.hpp" |
18 | | #include "duckdb/common/types/column/column_data_collection.hpp" |
19 | | #include "duckdb/function/copy_function.hpp" |
20 | | |
21 | | #include "parquet_statistics.hpp" |
22 | | #include "column_writer.hpp" |
23 | | #include "parquet_types.h" |
24 | | #include "geo_parquet.hpp" |
25 | | #include "writer/parquet_write_stats.hpp" |
26 | | #include "thrift/protocol/TCompactProtocol.h" |
27 | | |
28 | | namespace duckdb { |
29 | | class FileSystem; |
30 | | class FileOpener; |
31 | | class ParquetEncryptionConfig; |
32 | | class ParquetStatsAccumulator; |
33 | | |
34 | | class Serializer; |
35 | | class Deserializer; |
36 | | |
37 | | class ColumnWriterStatistics; |
38 | | struct CopyFunctionFileStatistics; |
39 | | |
40 | | struct PreparedRowGroup { |
41 | | duckdb_parquet::RowGroup row_group; |
42 | | vector<unique_ptr<ColumnWriterState>> states; |
43 | | }; |
44 | | |
45 | | struct FieldID; |
46 | | struct ChildFieldIDs { |
47 | | ChildFieldIDs(); |
48 | | ChildFieldIDs Copy() const; |
49 | | unique_ptr<case_insensitive_map_t<FieldID>> ids; |
50 | | |
51 | | void Serialize(Serializer &serializer) const; |
52 | | static ChildFieldIDs Deserialize(Deserializer &source); |
53 | | }; |
54 | | |
55 | | struct FieldID { |
56 | | static constexpr const auto DUCKDB_FIELD_ID = "__duckdb_field_id"; |
57 | | FieldID(); |
58 | | explicit FieldID(int32_t field_id); |
59 | | FieldID Copy() const; |
60 | | bool set; |
61 | | int32_t field_id; |
62 | | ChildFieldIDs child_field_ids; |
63 | | |
64 | | void Serialize(Serializer &serializer) const; |
65 | | static FieldID Deserialize(Deserializer &source); |
66 | | }; |
67 | | |
68 | | struct ParquetBloomFilterEntry { |
69 | | unique_ptr<ParquetBloomFilter> bloom_filter; |
70 | | idx_t row_group_idx; |
71 | | idx_t column_idx; |
72 | | }; |
73 | | |
74 | | enum class ParquetVersion : uint8_t { |
75 | | V1 = 1, //! Excludes DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, BYTE_STREAM_SPLIT |
76 | | V2 = 2, //! Includes the encodings above |
77 | | }; |
78 | | |
79 | | class ParquetWriter { |
80 | | public: |
81 | | ParquetWriter(ClientContext &context, FileSystem &fs, string file_name, vector<LogicalType> types, |
82 | | vector<string> names, duckdb_parquet::CompressionCodec::type codec, ChildFieldIDs field_ids, |
83 | | const vector<pair<string, string>> &kv_metadata, |
84 | | shared_ptr<ParquetEncryptionConfig> encryption_config, idx_t dictionary_size_limit, |
85 | | idx_t string_dictionary_page_size_limit, double bloom_filter_false_positive_ratio, |
86 | | int64_t compression_level, bool debug_use_openssl, ParquetVersion parquet_version); |
87 | | ~ParquetWriter(); |
88 | | |
89 | | public: |
90 | | void PrepareRowGroup(ColumnDataCollection &buffer, PreparedRowGroup &result); |
91 | | void FlushRowGroup(PreparedRowGroup &row_group); |
92 | | void Flush(ColumnDataCollection &buffer); |
93 | | void Finalize(); |
94 | | |
95 | | static duckdb_parquet::Type::type DuckDBTypeToParquetType(const LogicalType &duckdb_type); |
96 | | static void SetSchemaProperties(const LogicalType &duckdb_type, duckdb_parquet::SchemaElement &schema_ele); |
97 | | |
98 | 0 | ClientContext &GetContext() { |
99 | 0 | return context; |
100 | 0 | } |
101 | 0 | duckdb_apache::thrift::protocol::TProtocol *GetProtocol() { |
102 | 0 | return protocol.get(); |
103 | 0 | } |
104 | 0 | duckdb_parquet::CompressionCodec::type GetCodec() { |
105 | 0 | return codec; |
106 | 0 | } |
107 | 0 | duckdb_parquet::Type::type GetType(idx_t schema_idx) { |
108 | 0 | return file_meta_data.schema[schema_idx].type; |
109 | 0 | } |
110 | 0 | LogicalType GetSQLType(idx_t schema_idx) const { |
111 | 0 | return sql_types[schema_idx]; |
112 | 0 | } |
113 | 0 | BufferedFileWriter &GetWriter() { |
114 | 0 | return *writer; |
115 | 0 | } |
116 | 0 | idx_t FileSize() { |
117 | 0 | return total_written; |
118 | 0 | } |
119 | 0 | idx_t DictionarySizeLimit() const { |
120 | 0 | return dictionary_size_limit; |
121 | 0 | } |
122 | 0 | idx_t StringDictionaryPageSizeLimit() const { |
123 | 0 | return string_dictionary_page_size_limit; |
124 | 0 | } |
125 | 0 | double BloomFilterFalsePositiveRatio() const { |
126 | 0 | return bloom_filter_false_positive_ratio; |
127 | 0 | } |
128 | 0 | int64_t CompressionLevel() const { |
129 | 0 | return compression_level; |
130 | 0 | } |
131 | 0 | idx_t NumberOfRowGroups() { |
132 | 0 | return num_row_groups; |
133 | 0 | } |
134 | 0 | ParquetVersion GetParquetVersion() const { |
135 | 0 | return parquet_version; |
136 | 0 | } |
137 | 0 | const string &GetFileName() const { |
138 | 0 | return file_name; |
139 | 0 | } |
140 | | |
141 | | uint32_t Write(const duckdb_apache::thrift::TBase &object); |
142 | | uint32_t WriteData(const const_data_ptr_t buffer, const uint32_t buffer_size); |
143 | | |
144 | | GeoParquetFileMetadata &GetGeoParquetData(); |
145 | | |
146 | | static bool TryGetParquetType(const LogicalType &duckdb_type, |
147 | | optional_ptr<duckdb_parquet::Type::type> type = nullptr); |
148 | | |
149 | | void BufferBloomFilter(idx_t col_idx, unique_ptr<ParquetBloomFilter> bloom_filter); |
150 | | void SetWrittenStatistics(CopyFunctionFileStatistics &written_stats); |
151 | | void FlushColumnStats(idx_t col_idx, duckdb_parquet::ColumnChunk &chunk, |
152 | | optional_ptr<ColumnWriterStatistics> writer_stats); |
153 | | |
154 | | private: |
155 | | void GatherWrittenStatistics(); |
156 | | |
157 | | private: |
158 | | ClientContext &context; |
159 | | string file_name; |
160 | | vector<LogicalType> sql_types; |
161 | | vector<string> column_names; |
162 | | duckdb_parquet::CompressionCodec::type codec; |
163 | | ChildFieldIDs field_ids; |
164 | | shared_ptr<ParquetEncryptionConfig> encryption_config; |
165 | | idx_t dictionary_size_limit; |
166 | | idx_t string_dictionary_page_size_limit; |
167 | | double bloom_filter_false_positive_ratio; |
168 | | int64_t compression_level; |
169 | | bool debug_use_openssl; |
170 | | shared_ptr<EncryptionUtil> encryption_util; |
171 | | ParquetVersion parquet_version; |
172 | | vector<ParquetColumnSchema> column_schemas; |
173 | | |
174 | | unique_ptr<BufferedFileWriter> writer; |
175 | | //! Atomics to reduce contention when rotating writes to multiple Parquet files |
176 | | atomic<idx_t> total_written; |
177 | | atomic<idx_t> num_row_groups; |
178 | | std::shared_ptr<duckdb_apache::thrift::protocol::TProtocol> protocol; |
179 | | duckdb_parquet::FileMetaData file_meta_data; |
180 | | std::mutex lock; |
181 | | |
182 | | vector<unique_ptr<ColumnWriter>> column_writers; |
183 | | |
184 | | unique_ptr<GeoParquetFileMetadata> geoparquet_data; |
185 | | vector<ParquetBloomFilterEntry> bloom_filters; |
186 | | |
187 | | optional_ptr<CopyFunctionFileStatistics> written_stats; |
188 | | unique_ptr<ParquetStatsAccumulator> stats_accumulator; |
189 | | }; |
190 | | |
191 | | } // namespace duckdb |