/src/duckdb/extension/parquet/include/parquet_dbp_decoder.hpp
Line | Count | Source |
1 | | //===----------------------------------------------------------------------===// |
2 | | // DuckDB |
3 | | // |
4 | | // parquet_dbp_deccoder.hpp |
5 | | // |
6 | | // |
7 | | //===----------------------------------------------------------------------===// |
8 | | |
9 | | #pragma once |
10 | | |
11 | | #include "decode_utils.hpp" |
12 | | |
13 | | namespace duckdb { |
14 | | |
15 | | class DbpDecoder { |
16 | | public: |
17 | | DbpDecoder(const data_ptr_t buffer, const uint32_t buffer_len) |
18 | 0 | : buffer_(buffer, buffer_len), |
19 | | //<block size in values> <number of miniblocks in a block> <total value count> <first value> |
20 | 0 | block_size_in_values(ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_)), |
21 | 0 | number_of_miniblocks_per_block(DecodeNumberOfMiniblocksPerBlock(buffer_)), |
22 | 0 | number_of_values_in_a_miniblock(block_size_in_values / number_of_miniblocks_per_block), |
23 | 0 | total_value_count(ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_)), |
24 | 0 | previous_value(ParquetDecodeUtils::ZigzagToInt(ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_))), |
25 | | // init state to something sane |
26 | 0 | is_first_value(true), read_values(0), min_delta(NumericLimits<int64_t>::Maximum()), |
27 | 0 | miniblock_index(number_of_miniblocks_per_block - 1), list_of_bitwidths_of_miniblocks(nullptr), |
28 | 0 | miniblock_offset(number_of_values_in_a_miniblock), |
29 | 0 | unpacked_data_offset(BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) { |
30 | 0 | if (!(block_size_in_values % number_of_miniblocks_per_block == 0 && |
31 | 0 | number_of_values_in_a_miniblock % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE == 0)) { |
32 | 0 | throw InvalidInputException("Parquet file has invalid block sizes for DELTA_BINARY_PACKED"); |
33 | 0 | } |
34 | 0 | } |
35 | | |
36 | 0 | ByteBuffer BufferPtr() const { |
37 | 0 | return buffer_; |
38 | 0 | } |
39 | | |
40 | 0 | uint64_t TotalValues() const { |
41 | 0 | return total_value_count; |
42 | 0 | } |
43 | | |
44 | | template <typename T> |
45 | 0 | void GetBatch(const data_ptr_t target_values_ptr, const idx_t batch_size) { |
46 | 0 | if (read_values + batch_size > total_value_count) { |
47 | 0 | throw std::runtime_error("DBP decode did not find enough values"); |
48 | 0 | } |
49 | 0 | read_values += batch_size; |
50 | 0 | GetBatchInternal<T>(target_values_ptr, batch_size); |
51 | 0 | } Unexecuted instantiation: void duckdb::DbpDecoder::GetBatch<int>(unsigned char*, unsigned long) Unexecuted instantiation: void duckdb::DbpDecoder::GetBatch<long>(unsigned char*, unsigned long) Unexecuted instantiation: void duckdb::DbpDecoder::GetBatch<unsigned int>(unsigned char*, unsigned long) |
52 | | |
53 | | template <class T> |
54 | 0 | void Skip(idx_t skip_count) { |
55 | 0 | if (read_values + skip_count > total_value_count) { |
56 | 0 | throw std::runtime_error("DBP decode did not find enough values"); |
57 | 0 | } |
58 | 0 | read_values += skip_count; |
59 | 0 | GetBatchInternal<T, true>(nullptr, skip_count); |
60 | 0 | } Unexecuted instantiation: void duckdb::DbpDecoder::Skip<int>(unsigned long) Unexecuted instantiation: void duckdb::DbpDecoder::Skip<long>(unsigned long) |
61 | | |
62 | 0 | void Finalize() { |
63 | 0 | if (miniblock_offset == number_of_values_in_a_miniblock) { |
64 | 0 | return; |
65 | 0 | } |
66 | 0 | auto data = make_unsafe_uniq_array<int64_t>(number_of_values_in_a_miniblock); |
67 | 0 | GetBatchInternal<int64_t>(data_ptr_cast(data.get()), number_of_values_in_a_miniblock - miniblock_offset); |
68 | 0 | } |
69 | | |
70 | | private: |
71 | 0 | static idx_t DecodeNumberOfMiniblocksPerBlock(ByteBuffer &buffer) { |
72 | 0 | auto res = ParquetDecodeUtils::VarintDecode<uint64_t>(buffer); |
73 | 0 | if (res == 0) { |
74 | 0 | throw InvalidInputException( |
75 | 0 | "Parquet file has invalid number of miniblocks per block for DELTA_BINARY_PACKED"); |
76 | 0 | } |
77 | 0 | return res; |
78 | 0 | } |
79 | | |
80 | | template <typename T, bool SKIP_READ = false> |
81 | 0 | void GetBatchInternal(const data_ptr_t target_values_ptr, const idx_t batch_size) { |
82 | 0 | if (batch_size == 0) { |
83 | 0 | return; |
84 | 0 | } |
85 | 0 | D_ASSERT(target_values_ptr || SKIP_READ); |
86 | |
|
87 | 0 | T *target_values = nullptr; |
88 | 0 | if (!SKIP_READ) { |
89 | 0 | target_values = reinterpret_cast<T *>(target_values_ptr); |
90 | 0 | } |
91 | 0 | idx_t target_values_offset = 0; |
92 | 0 | if (is_first_value) { |
93 | 0 | if (!SKIP_READ) { |
94 | 0 | target_values[0] = static_cast<T>(previous_value); |
95 | 0 | } |
96 | 0 | target_values_offset++; |
97 | 0 | is_first_value = false; |
98 | 0 | } |
99 | |
|
100 | 0 | while (target_values_offset < batch_size) { |
101 | | // Copy over any remaining data |
102 | 0 | const idx_t next = MinValue(batch_size - target_values_offset, |
103 | 0 | BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE - unpacked_data_offset); |
104 | 0 | if (next != 0) { |
105 | 0 | for (idx_t i = 0; i < next; i++) { |
106 | 0 | const auto &unpacked_value = unpacked_data[unpacked_data_offset + i]; |
107 | 0 | auto current_value = static_cast<T>(static_cast<uint64_t>(previous_value) + |
108 | 0 | static_cast<uint64_t>(min_delta) + unpacked_value); |
109 | 0 | if (!SKIP_READ) { |
110 | 0 | target_values[target_values_offset + i] = current_value; |
111 | 0 | } |
112 | 0 | previous_value = static_cast<int64_t>(current_value); |
113 | 0 | } |
114 | 0 | target_values_offset += next; |
115 | 0 | unpacked_data_offset += next; |
116 | 0 | continue; |
117 | 0 | } |
118 | | |
119 | | // Move to next miniblock / block |
120 | 0 | D_ASSERT(unpacked_data_offset == BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE); |
121 | 0 | D_ASSERT(miniblock_index < number_of_miniblocks_per_block); |
122 | 0 | D_ASSERT(miniblock_offset <= number_of_values_in_a_miniblock); |
123 | 0 | if (miniblock_offset == number_of_values_in_a_miniblock) { |
124 | 0 | miniblock_offset = 0; |
125 | 0 | if (++miniblock_index == number_of_miniblocks_per_block) { |
126 | | // <min delta> <list of bitwidths of miniblocks> <miniblocks> |
127 | 0 | min_delta = ParquetDecodeUtils::ZigzagToInt(ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_)); |
128 | 0 | buffer_.available(number_of_miniblocks_per_block); |
129 | 0 | list_of_bitwidths_of_miniblocks = buffer_.ptr; |
130 | 0 | buffer_.unsafe_inc(number_of_miniblocks_per_block); |
131 | 0 | miniblock_index = 0; |
132 | 0 | } |
133 | 0 | } |
134 | | |
135 | | // Unpack from current miniblock |
136 | 0 | ParquetDecodeUtils::BitUnpackAligned(buffer_, unpacked_data, |
137 | 0 | BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE, |
138 | 0 | list_of_bitwidths_of_miniblocks[miniblock_index]); |
139 | 0 | unpacked_data_offset = 0; |
140 | 0 | miniblock_offset += BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE; |
141 | 0 | } |
142 | 0 | } Unexecuted instantiation: void duckdb::DbpDecoder::GetBatchInternal<long, false>(unsigned char*, unsigned long) Unexecuted instantiation: void duckdb::DbpDecoder::GetBatchInternal<int, false>(unsigned char*, unsigned long) Unexecuted instantiation: void duckdb::DbpDecoder::GetBatchInternal<int, true>(unsigned char*, unsigned long) Unexecuted instantiation: void duckdb::DbpDecoder::GetBatchInternal<long, true>(unsigned char*, unsigned long) Unexecuted instantiation: void duckdb::DbpDecoder::GetBatchInternal<unsigned int, false>(unsigned char*, unsigned long) |
143 | | |
144 | | private: |
145 | | ByteBuffer buffer_; |
146 | | const idx_t block_size_in_values; |
147 | | const idx_t number_of_miniblocks_per_block; |
148 | | const idx_t number_of_values_in_a_miniblock; |
149 | | const idx_t total_value_count; |
150 | | int64_t previous_value; |
151 | | |
152 | | bool is_first_value; |
153 | | idx_t read_values; |
154 | | |
155 | | //! Block stuff |
156 | | int64_t min_delta; |
157 | | idx_t miniblock_index; |
158 | | bitpacking_width_t *list_of_bitwidths_of_miniblocks; |
159 | | idx_t miniblock_offset; |
160 | | uint64_t unpacked_data[BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE]; |
161 | | idx_t unpacked_data_offset; |
162 | | }; |
163 | | } // namespace duckdb |