/src/duckdb/extension/parquet/include/parquet_dbp_encoder.hpp
Line | Count | Source |
1 | | //===----------------------------------------------------------------------===// |
2 | | // DuckDB |
3 | | // |
4 | | // parquet_dbp_encoder.hpp |
5 | | // |
6 | | // |
7 | | //===----------------------------------------------------------------------===// |
8 | | |
9 | | #pragma once |
10 | | |
11 | | #include "decode_utils.hpp" |
12 | | |
13 | | namespace duckdb { |
14 | | |
15 | | class DbpEncoder { |
16 | | private: |
17 | | static constexpr uint64_t BLOCK_SIZE_IN_VALUES = 2048; |
18 | | static constexpr uint64_t NUMBER_OF_MINIBLOCKS_IN_A_BLOCK = 8; |
19 | | static constexpr uint64_t NUMBER_OF_VALUES_IN_A_MINIBLOCK = BLOCK_SIZE_IN_VALUES / NUMBER_OF_MINIBLOCKS_IN_A_BLOCK; |
20 | | |
21 | | public: |
22 | 0 | explicit DbpEncoder(const idx_t total_value_count_p) : total_value_count(total_value_count_p), count(0) { |
23 | 0 | } |
24 | | |
25 | | public: |
26 | | template <class T> |
27 | 0 | void BeginWrite(WriteStream &writer, const T &first_value) { |
28 | 0 | throw InternalException("DbpEncoder should only be used with integers"); |
29 | 0 | } Unexecuted instantiation: void duckdb::DbpEncoder::BeginWrite<duckdb::Int96>(duckdb::WriteStream&, duckdb::Int96 const&) Unexecuted instantiation: void duckdb::DbpEncoder::BeginWrite<double>(duckdb::WriteStream&, double const&) Unexecuted instantiation: void duckdb::DbpEncoder::BeginWrite<float>(duckdb::WriteStream&, float const&) Unexecuted instantiation: void duckdb::DbpEncoder::BeginWrite<duckdb::string_t>(duckdb::WriteStream&, duckdb::string_t const&) Unexecuted instantiation: void duckdb::DbpEncoder::BeginWrite<duckdb::ParquetUUIDTargetType>(duckdb::WriteStream&, duckdb::ParquetUUIDTargetType const&) Unexecuted instantiation: void duckdb::DbpEncoder::BeginWrite<duckdb::ParquetIntervalTargetType>(duckdb::WriteStream&, duckdb::ParquetIntervalTargetType const&) |
30 | | |
31 | | template <class T> |
32 | 0 | void WriteValue(WriteStream &writer, const T &value) { |
33 | 0 | throw InternalException("DbpEncoder should only be used with integers"); |
34 | 0 | } Unexecuted instantiation: void duckdb::DbpEncoder::WriteValue<duckdb::Int96>(duckdb::WriteStream&, duckdb::Int96 const&) Unexecuted instantiation: void duckdb::DbpEncoder::WriteValue<double>(duckdb::WriteStream&, double const&) Unexecuted instantiation: void duckdb::DbpEncoder::WriteValue<float>(duckdb::WriteStream&, float const&) Unexecuted instantiation: void duckdb::DbpEncoder::WriteValue<duckdb::string_t>(duckdb::WriteStream&, duckdb::string_t const&) Unexecuted instantiation: void duckdb::DbpEncoder::WriteValue<duckdb::ParquetUUIDTargetType>(duckdb::WriteStream&, duckdb::ParquetUUIDTargetType const&) Unexecuted instantiation: void duckdb::DbpEncoder::WriteValue<duckdb::ParquetIntervalTargetType>(duckdb::WriteStream&, duckdb::ParquetIntervalTargetType const&) |
35 | | |
36 | 0 | void FinishWrite(WriteStream &writer) { |
37 | 0 | if (count + block_count != total_value_count) { |
38 | 0 | throw InternalException("value count mismatch when writing DELTA_BINARY_PACKED"); |
39 | 0 | } |
40 | 0 | if (block_count != 0) { |
41 | 0 | WriteBlock(writer); |
42 | 0 | } |
43 | 0 | } |
44 | | |
45 | | private: |
46 | 0 | void BeginWriteInternal(WriteStream &writer, const int64_t &first_value) { |
47 | | // <block size in values> <number of miniblocks in a block> <total value count> <first value> |
48 | | |
49 | | // the block size is a multiple of 128; it is stored as a ULEB128 int |
50 | 0 | ParquetDecodeUtils::VarintEncode(BLOCK_SIZE_IN_VALUES, writer); |
51 | | // the miniblock count per block is a divisor of the block size such that their quotient, |
52 | | // the number of values in a miniblock, is a multiple of 32 |
53 | 0 | static_assert(BLOCK_SIZE_IN_VALUES % NUMBER_OF_MINIBLOCKS_IN_A_BLOCK == 0 && |
54 | 0 | NUMBER_OF_VALUES_IN_A_MINIBLOCK % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE == 0, |
55 | 0 | "invalid block sizes for DELTA_BINARY_PACKED"); |
56 | | // it is stored as a ULEB128 int |
57 | 0 | ParquetDecodeUtils::VarintEncode(NUMBER_OF_MINIBLOCKS_IN_A_BLOCK, writer); |
58 | | // the total value count is stored as a ULEB128 int |
59 | 0 | ParquetDecodeUtils::VarintEncode(total_value_count, writer); |
60 | | // the first value is stored as a zigzag ULEB128 int |
61 | 0 | ParquetDecodeUtils::VarintEncode(ParquetDecodeUtils::IntToZigzag(first_value), writer); |
62 | | |
63 | | // initialize |
64 | 0 | if (total_value_count != 0) { |
65 | 0 | count++; |
66 | 0 | } |
67 | 0 | previous_value = first_value; |
68 | |
|
69 | 0 | min_delta = NumericLimits<int64_t>::Maximum(); |
70 | 0 | block_count = 0; |
71 | 0 | } |
72 | | |
73 | 0 | void WriteValueInternal(WriteStream &writer, const int64_t &value) { |
74 | | // 1. Compute the differences between consecutive elements. For the first element in the block, |
75 | | // use the last element in the previous block or, in the case of the first block, |
76 | | // use the first value of the whole sequence, stored in the header. |
77 | | |
78 | | // Subtractions in steps 1) and 2) may incur signed arithmetic overflow, |
79 | | // and so will the corresponding additions when decoding. |
80 | | // Overflow should be allowed and handled as wrapping around in 2’s complement notation |
81 | | // so that the original values are correctly restituted. |
82 | | // This may require explicit care in some programming languages |
83 | | // (for example by doing all arithmetic in the unsigned domain). |
84 | 0 | const auto delta = static_cast<int64_t>(static_cast<uint64_t>(value) - static_cast<uint64_t>(previous_value)); |
85 | 0 | previous_value = value; |
86 | | // Compute the frame of reference (the minimum of the deltas in the block). |
87 | 0 | min_delta = MinValue(min_delta, delta); |
88 | | // append. if block is full, write it out |
89 | 0 | data[block_count++] = delta; |
90 | 0 | if (block_count == BLOCK_SIZE_IN_VALUES) { |
91 | 0 | WriteBlock(writer); |
92 | 0 | } |
93 | 0 | } |
94 | | |
95 | 0 | void WriteBlock(WriteStream &writer) { |
96 | 0 | D_ASSERT(count + block_count == total_value_count || block_count == BLOCK_SIZE_IN_VALUES); |
97 | 0 | const auto number_of_miniblocks = |
98 | 0 | (block_count + NUMBER_OF_VALUES_IN_A_MINIBLOCK - 1) / NUMBER_OF_VALUES_IN_A_MINIBLOCK; |
99 | 0 | for (idx_t miniblock_idx = 0; miniblock_idx < number_of_miniblocks; miniblock_idx++) { |
100 | 0 | for (idx_t i = 0; i < NUMBER_OF_VALUES_IN_A_MINIBLOCK; i++) { |
101 | 0 | const idx_t index = miniblock_idx * NUMBER_OF_VALUES_IN_A_MINIBLOCK + i; |
102 | 0 | auto &value = data[index]; |
103 | 0 | if (index < block_count) { |
104 | | // 2. Compute the frame of reference (the minimum of the deltas in the block). |
105 | | // Subtract this min delta from all deltas in the block. |
106 | | // This guarantees that all values are non-negative. |
107 | 0 | D_ASSERT(min_delta <= value); |
108 | 0 | value = static_cast<int64_t>(static_cast<uint64_t>(value) - static_cast<uint64_t>(min_delta)); |
109 | 0 | } else { |
110 | | // If there are not enough values to fill the last miniblock, we pad the miniblock |
111 | | // so that its length is always the number of values in a full miniblock multiplied by the bit |
112 | | // width. The values of the padding bits should be zero, but readers must accept paddings consisting |
113 | | // of arbitrary bits as well. |
114 | 0 | value = 0; |
115 | 0 | } |
116 | 0 | } |
117 | 0 | } |
118 | |
|
119 | 0 | for (idx_t miniblock_idx = 0; miniblock_idx < NUMBER_OF_MINIBLOCKS_IN_A_BLOCK; miniblock_idx++) { |
120 | 0 | auto &width = list_of_bitwidths_of_miniblocks[miniblock_idx]; |
121 | 0 | if (miniblock_idx < number_of_miniblocks) { |
122 | 0 | const auto src = &data[miniblock_idx * NUMBER_OF_VALUES_IN_A_MINIBLOCK]; |
123 | 0 | width = BitpackingPrimitives::MinimumBitWidth(reinterpret_cast<uint64_t *>(src), |
124 | 0 | NUMBER_OF_VALUES_IN_A_MINIBLOCK); |
125 | 0 | D_ASSERT(width <= sizeof(int64_t) * 8); |
126 | 0 | } else { |
127 | | // If, in the last block, less than <number of miniblocks in a block> miniblocks are needed to store the |
128 | | // values, the bytes storing the bit widths of the unneeded miniblocks are still present, their value |
129 | | // should be zero, but readers must accept arbitrary values as well. There are no additional padding |
130 | | // bytes for the miniblock bodies though, as if their bit widths were 0 (regardless of the actual byte |
131 | | // values). The reader knows when to stop reading by keeping track of the number of values read. |
132 | 0 | width = 0; |
133 | 0 | } |
134 | 0 | } |
135 | | |
136 | | // 3. Encode the frame of reference (min delta) as a zigzag ULEB128 int |
137 | | // followed by the bit widths of the miniblocks |
138 | | // and the delta values (minus the min delta) bit-packed per miniblock. |
139 | | // <min delta> <list of bitwidths of miniblocks> <miniblocks> |
140 | | |
141 | | // the min delta is a zigzag ULEB128 int (we compute a minimum as we need positive integers for bit packing) |
142 | 0 | ParquetDecodeUtils::VarintEncode(ParquetDecodeUtils::IntToZigzag(min_delta), writer); |
143 | | // the bitwidth of each block is stored as a byte |
144 | 0 | writer.WriteData(list_of_bitwidths_of_miniblocks, NUMBER_OF_MINIBLOCKS_IN_A_BLOCK); |
145 | | // each miniblock is a list of bit packed ints according to the bit width stored at the beginning of the block |
146 | 0 | for (idx_t miniblock_idx = 0; miniblock_idx < number_of_miniblocks; miniblock_idx++) { |
147 | 0 | const auto src = &data[miniblock_idx * NUMBER_OF_VALUES_IN_A_MINIBLOCK]; |
148 | 0 | const auto &width = list_of_bitwidths_of_miniblocks[miniblock_idx]; |
149 | 0 | memset(data_packed, 0, sizeof(data_packed)); |
150 | 0 | ParquetDecodeUtils::BitPackAligned(reinterpret_cast<uint64_t *>(src), data_packed, |
151 | 0 | NUMBER_OF_VALUES_IN_A_MINIBLOCK, width); |
152 | 0 | const auto write_size = NUMBER_OF_VALUES_IN_A_MINIBLOCK * width / 8; |
153 | | #ifdef DEBUG |
154 | | // immediately verify that unpacking yields the input data |
155 | | int64_t verification_data[NUMBER_OF_VALUES_IN_A_MINIBLOCK]; |
156 | | ByteBuffer byte_buffer(data_ptr_cast(data_packed), write_size); |
157 | | bitpacking_width_t bitpack_pos = 0; |
158 | | ParquetDecodeUtils::BitUnpack(byte_buffer, bitpack_pos, reinterpret_cast<uint64_t *>(verification_data), |
159 | | NUMBER_OF_VALUES_IN_A_MINIBLOCK, width); |
160 | | for (idx_t i = 0; i < NUMBER_OF_VALUES_IN_A_MINIBLOCK; i++) { |
161 | | D_ASSERT(src[i] == verification_data[i]); |
162 | | } |
163 | | #endif |
164 | 0 | writer.WriteData(data_packed, write_size); |
165 | 0 | } |
166 | |
|
167 | 0 | count += block_count; |
168 | |
|
169 | 0 | min_delta = NumericLimits<int64_t>::Maximum(); |
170 | 0 | block_count = 0; |
171 | 0 | } |
172 | | |
173 | | private: |
174 | | //! Overall fields |
175 | | const idx_t total_value_count; |
176 | | idx_t count; |
177 | | int64_t previous_value; |
178 | | |
179 | | //! Block-specific fields |
180 | | int64_t min_delta; |
181 | | int64_t data[BLOCK_SIZE_IN_VALUES]; |
182 | | idx_t block_count; |
183 | | |
184 | | //! Bitpacking fields |
185 | | bitpacking_width_t list_of_bitwidths_of_miniblocks[NUMBER_OF_MINIBLOCKS_IN_A_BLOCK]; |
186 | | data_t data_packed[NUMBER_OF_VALUES_IN_A_MINIBLOCK * sizeof(int64_t)]; |
187 | | }; |
188 | | |
189 | | template <> |
190 | 0 | inline void DbpEncoder::BeginWrite(WriteStream &writer, const int32_t &first_value) { |
191 | 0 | BeginWriteInternal(writer, first_value); |
192 | 0 | } |
193 | | |
194 | | template <> |
195 | 0 | inline void DbpEncoder::BeginWrite(WriteStream &writer, const int64_t &first_value) { |
196 | 0 | BeginWriteInternal(writer, first_value); |
197 | 0 | } |
198 | | |
199 | | template <> |
200 | 0 | inline void DbpEncoder::BeginWrite(WriteStream &writer, const uint32_t &first_value) { |
201 | 0 | BeginWriteInternal(writer, first_value); |
202 | 0 | } |
203 | | |
204 | | template <> |
205 | 0 | inline void DbpEncoder::BeginWrite(WriteStream &writer, const uint64_t &first_value) { |
206 | 0 | BeginWriteInternal(writer, first_value); |
207 | 0 | } |
208 | | |
209 | | template <> |
210 | 0 | inline void DbpEncoder::WriteValue(WriteStream &writer, const int32_t &first_value) { |
211 | 0 | WriteValueInternal(writer, first_value); |
212 | 0 | } |
213 | | |
214 | | template <> |
215 | 0 | inline void DbpEncoder::WriteValue(WriteStream &writer, const int64_t &first_value) { |
216 | 0 | WriteValueInternal(writer, first_value); |
217 | 0 | } |
218 | | |
219 | | template <> |
220 | 0 | inline void DbpEncoder::WriteValue(WriteStream &writer, const uint32_t &first_value) { |
221 | 0 | WriteValueInternal(writer, first_value); |
222 | 0 | } |
223 | | |
224 | | template <> |
225 | 0 | inline void DbpEncoder::WriteValue(WriteStream &writer, const uint64_t &first_value) { |
226 | 0 | WriteValueInternal(writer, first_value); |
227 | 0 | } |
228 | | |
229 | | } // namespace duckdb |