/src/rocksdb/db/wide/wide_column_serialization.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/wide/wide_column_serialization.h" |
7 | | |
8 | | #include <cassert> |
9 | | #include <cstring> |
10 | | |
11 | | #include "db/blob/blob_fetcher.h" |
12 | | #include "db/blob/blob_index.h" |
13 | | #include "db/blob/prefetch_buffer_collection.h" |
14 | | #include "db/wide/wide_columns_helper.h" |
15 | | #include "rocksdb/slice.h" |
16 | | #include "util/autovector.h" |
17 | | #include "util/coding.h" |
18 | | |
19 | | namespace ROCKSDB_NAMESPACE { |
20 | | |
21 | | Status WideColumnSerialization::BuildBlobIndexMap( |
22 | | size_t num_columns, |
23 | | const std::vector<std::pair<size_t, BlobIndex>>& blob_columns, |
24 | 0 | std::vector<const BlobIndex*>& blob_index_map) { |
25 | 0 | if (Status s = ValidateWideColumnLimit(num_columns, "Too many wide columns"); |
26 | 0 | !s.ok()) { |
27 | 0 | return s; |
28 | 0 | } |
29 | | |
30 | 0 | blob_index_map.assign(num_columns, nullptr); |
31 | 0 | for (const auto& blob_col : blob_columns) { |
32 | 0 | if (blob_col.first >= blob_index_map.size()) { |
33 | 0 | return Status::InvalidArgument("Blob column index out of range"); |
34 | 0 | } |
35 | 0 | blob_index_map[blob_col.first] = &blob_col.second; |
36 | 0 | } |
37 | | |
38 | 0 | return Status::OK(); |
39 | 0 | } |
40 | | |
41 | | bool WideColumnSerialization::ContainsBlobType(const char* type_bytes, |
42 | 0 | uint32_t num_columns) { |
43 | 0 | for (uint32_t i = 0; i < num_columns; ++i) { |
44 | 0 | if (static_cast<uint8_t>(type_bytes[i]) == kTypeBlobIndex) { |
45 | 0 | return true; |
46 | 0 | } |
47 | 0 | } |
48 | 0 | return false; |
49 | 0 | } |
50 | | |
51 | | Status WideColumnSerialization::Serialize(const WideColumns& columns, |
52 | 0 | std::string& output) { |
53 | 0 | const size_t num_columns = columns.size(); |
54 | |
|
55 | 0 | if (Status sv = ValidateWideColumnLimit(num_columns, "Too many wide columns"); |
56 | 0 | !sv.ok()) { |
57 | 0 | return sv; |
58 | 0 | } |
59 | | |
60 | 0 | PutVarint32(&output, kVersion1); |
61 | |
|
62 | 0 | PutVarint32(&output, static_cast<uint32_t>(num_columns)); |
63 | |
|
64 | 0 | const Slice* prev_name = nullptr; |
65 | |
|
66 | 0 | for (size_t i = 0; i < columns.size(); ++i) { |
67 | 0 | const WideColumn& column = columns[i]; |
68 | |
|
69 | 0 | const Slice& name = column.name(); |
70 | 0 | if (Status s_name = |
71 | 0 | ValidateWideColumnLimit(name.size(), "Wide column name too long"); |
72 | 0 | !s_name.ok()) { |
73 | 0 | return s_name; |
74 | 0 | } |
75 | | |
76 | 0 | if (prev_name) { |
77 | 0 | if (Status so = ValidateColumnOrder(*prev_name, name); !so.ok()) { |
78 | 0 | return so; |
79 | 0 | } |
80 | 0 | } |
81 | | |
82 | 0 | const Slice& value = column.value(); |
83 | 0 | if (Status s_val = |
84 | 0 | ValidateWideColumnLimit(value.size(), "Wide column value too long"); |
85 | 0 | !s_val.ok()) { |
86 | 0 | return s_val; |
87 | 0 | } |
88 | | |
89 | 0 | PutLengthPrefixedSlice(&output, name); |
90 | 0 | PutVarint32(&output, static_cast<uint32_t>(value.size())); |
91 | |
|
92 | 0 | prev_name = &name; |
93 | 0 | } |
94 | | |
95 | 0 | for (const auto& column : columns) { |
96 | 0 | const Slice& value = column.value(); |
97 | |
|
98 | 0 | output.append(value.data(), value.size()); |
99 | 0 | } |
100 | |
|
101 | 0 | return Status::OK(); |
102 | 0 | } |
103 | | |
104 | | template <typename GetName, typename GetValue> |
105 | | Status WideColumnSerialization::SerializeV2Impl( |
106 | | size_t num_columns, |
107 | | const std::vector<std::pair<size_t, BlobIndex>>& blob_columns, |
108 | 0 | std::string& output, GetName get_name, GetValue get_value) { |
109 | 0 | std::vector<const BlobIndex*> blob_index_map; |
110 | 0 | if (Status s = BuildBlobIndexMap(num_columns, blob_columns, blob_index_map); |
111 | 0 | !s.ok()) { |
112 | 0 | return s; |
113 | 0 | } |
114 | 0 | assert(blob_index_map.size() == num_columns); |
115 | | |
116 | | // First pass: validate column ordering, compute sizes, serialize blob |
117 | | // indices, and build column types. |
118 | 0 | std::vector<std::string> serialized_blob_indices(num_columns); |
119 | 0 | std::vector<uint32_t> name_sizes(num_columns); |
120 | 0 | std::vector<uint32_t> value_sizes(num_columns); |
121 | 0 | std::string column_types; |
122 | 0 | column_types.reserve(num_columns); |
123 | |
|
124 | 0 | Slice prev_name_storage; |
125 | 0 | bool has_prev = false; |
126 | 0 | uint32_t name_sizes_bytes = 0; |
127 | 0 | uint32_t names_bytes = 0; |
128 | 0 | uint32_t total_value_sizes_bytes = 0; |
129 | 0 | uint32_t total_values_bytes = 0; |
130 | |
|
131 | 0 | for (size_t i = 0; i < num_columns; ++i) { |
132 | 0 | const Slice name = get_name(i); |
133 | 0 | const Slice value = get_value(i); |
134 | |
|
135 | 0 | if (Status sn = |
136 | 0 | ValidateWideColumnLimit(name.size(), "Wide column name too long"); |
137 | 0 | !sn.ok()) { |
138 | 0 | return sn; |
139 | 0 | } |
140 | | |
141 | 0 | if (has_prev) { |
142 | 0 | if (Status so = ValidateColumnOrder(prev_name_storage, name); !so.ok()) { |
143 | 0 | return so; |
144 | 0 | } |
145 | 0 | } |
146 | | |
147 | 0 | name_sizes[i] = static_cast<uint32_t>(name.size()); |
148 | 0 | name_sizes_bytes += VarintLength(name_sizes[i]); |
149 | 0 | names_bytes += name_sizes[i]; |
150 | |
|
151 | 0 | if (blob_index_map[i] != nullptr) { |
152 | 0 | const BlobIndex* blob_idx = blob_index_map[i]; |
153 | 0 | blob_idx->EncodeTo(&serialized_blob_indices[i]); |
154 | 0 | value_sizes[i] = static_cast<uint32_t>(serialized_blob_indices[i].size()); |
155 | 0 | column_types.push_back(static_cast<char>(kTypeBlobIndex)); |
156 | 0 | } else { |
157 | 0 | if (Status svl = ValidateWideColumnLimit(value.size(), |
158 | 0 | "Wide column value too long"); |
159 | 0 | !svl.ok()) { |
160 | 0 | return svl; |
161 | 0 | } |
162 | 0 | value_sizes[i] = static_cast<uint32_t>(value.size()); |
163 | 0 | column_types.push_back(static_cast<char>(kTypeValue)); |
164 | 0 | } |
165 | | |
166 | 0 | total_value_sizes_bytes += VarintLength(value_sizes[i]); |
167 | 0 | total_values_bytes += value_sizes[i]; |
168 | |
|
169 | 0 | prev_name_storage = name; |
170 | 0 | has_prev = true; |
171 | 0 | } |
172 | | |
173 | | // Second pass: write all V2 sections to output. |
174 | | // Pre-allocate output string. |
175 | 0 | const size_t total_size = |
176 | 0 | VarintLength(kVersion2) + |
177 | 0 | VarintLength(static_cast<uint32_t>(num_columns)) + |
178 | 0 | num_columns + // column types |
179 | 0 | VarintLength(name_sizes_bytes) + VarintLength(total_value_sizes_bytes) + |
180 | 0 | VarintLength(names_bytes) + name_sizes_bytes + total_value_sizes_bytes + |
181 | 0 | names_bytes + total_values_bytes; |
182 | |
|
183 | 0 | const size_t base_offset = output.size(); |
184 | 0 | output.reserve(base_offset + total_size); |
185 | | |
186 | | // Sections 1-3: header, skip info, column types |
187 | 0 | PutVarint32(&output, kVersion2); |
188 | 0 | PutVarint32(&output, static_cast<uint32_t>(num_columns)); |
189 | 0 | PutVarint32(&output, name_sizes_bytes); |
190 | 0 | PutVarint32(&output, total_value_sizes_bytes); |
191 | 0 | PutVarint32(&output, names_bytes); |
192 | 0 | output.append(column_types); |
193 | | |
194 | | // Sections 4-7: resize to final size, then write all 4 sections in a |
195 | | // single loop using independent pointers. Each section's start offset is |
196 | | // known from the sizes computed in the first pass. |
197 | 0 | if (num_columns == 0) { |
198 | 0 | return Status::OK(); |
199 | 0 | } |
200 | | |
201 | 0 | const size_t sec4_offset = output.size(); |
202 | 0 | output.resize(base_offset + total_size); |
203 | |
|
204 | 0 | char* s4 = &output[sec4_offset]; // section 4: name sizes |
205 | 0 | char* s5 = s4 + name_sizes_bytes; // section 5: value sizes |
206 | 0 | char* s6 = s5 + total_value_sizes_bytes; // section 6: names |
207 | 0 | char* s7 = s6 + names_bytes; // section 7: values |
208 | |
|
209 | 0 | for (size_t i = 0; i < num_columns; ++i) { |
210 | 0 | s4 = EncodeVarint32(s4, name_sizes[i]); |
211 | 0 | s5 = EncodeVarint32(s5, value_sizes[i]); |
212 | |
|
213 | 0 | memcpy(s6, get_name(i).data(), name_sizes[i]); |
214 | 0 | s6 += name_sizes[i]; |
215 | |
|
216 | 0 | if (blob_index_map[i] != nullptr) { |
217 | 0 | memcpy(s7, serialized_blob_indices[i].data(), value_sizes[i]); |
218 | 0 | } else { |
219 | 0 | memcpy(s7, get_value(i).data(), value_sizes[i]); |
220 | 0 | } |
221 | 0 | s7 += value_sizes[i]; |
222 | 0 | } |
223 | |
|
224 | 0 | return Status::OK(); |
225 | 0 | } Unexecuted instantiation: wide_column_serialization.cc:rocksdb::Status rocksdb::WideColumnSerialization::SerializeV2Impl<rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_0, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_1>(unsigned long, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_0, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_1) Unexecuted instantiation: wide_column_serialization.cc:rocksdb::Status rocksdb::WideColumnSerialization::SerializeV2Impl<rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_0, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_1>(unsigned long, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_0, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_1) |
226 | | |
227 | | Status WideColumnSerialization::SerializeV2( |
228 | | const std::vector<std::pair<std::string, std::string>>& columns, |
229 | | const std::vector<std::pair<size_t, BlobIndex>>& blob_columns, |
230 | 0 | std::string& output) { |
231 | 0 | return SerializeV2Impl( |
232 | 0 | columns.size(), blob_columns, output, |
233 | 0 | [&](size_t i) { return Slice(columns[i].first); }, |
234 | 0 | [&](size_t i) { return Slice(columns[i].second); }); |
235 | 0 | } |
236 | | |
237 | | Status WideColumnSerialization::SerializeV2( |
238 | | const WideColumns& columns, |
239 | | const std::vector<std::pair<size_t, BlobIndex>>& blob_columns, |
240 | 0 | std::string& output) { |
241 | 0 | return SerializeV2Impl( |
242 | 0 | columns.size(), blob_columns, output, |
243 | 0 | [&](size_t i) { return columns[i].name(); }, |
244 | 0 | [&](size_t i) { return columns[i].value(); }); |
245 | 0 | } |
246 | | |
247 | | Status WideColumnSerialization::DeserializeV1( |
248 | 0 | Slice& input, uint32_t num_columns, std::vector<WideColumn>& columns) { |
249 | 0 | columns.reserve(num_columns); |
250 | |
|
251 | 0 | autovector<uint32_t, 16> column_value_sizes; |
252 | 0 | column_value_sizes.reserve(num_columns); |
253 | |
|
254 | 0 | for (uint32_t i = 0; i < num_columns; ++i) { |
255 | 0 | Slice name; |
256 | 0 | if (!GetLengthPrefixedSlice(&input, &name)) { |
257 | 0 | return Status::Corruption("Error decoding wide column name"); |
258 | 0 | } |
259 | | |
260 | 0 | if (!columns.empty()) { |
261 | 0 | if (Status so = ValidateColumnOrder(columns.back().name(), name); |
262 | 0 | !so.ok()) { |
263 | 0 | return so; |
264 | 0 | } |
265 | 0 | } |
266 | | |
267 | 0 | columns.emplace_back(name, Slice()); |
268 | |
|
269 | 0 | uint32_t value_size = 0; |
270 | 0 | if (!GetVarint32(&input, &value_size)) { |
271 | 0 | return Status::Corruption("Error decoding wide column value size"); |
272 | 0 | } |
273 | | |
274 | 0 | column_value_sizes.emplace_back(value_size); |
275 | 0 | } |
276 | | |
277 | 0 | const Slice data(input); |
278 | 0 | size_t pos = 0; |
279 | |
|
280 | 0 | for (uint32_t i = 0; i < num_columns; ++i) { |
281 | 0 | const uint32_t value_size = column_value_sizes[i]; |
282 | |
|
283 | 0 | if (pos + value_size > data.size()) { |
284 | 0 | return Status::Corruption("Error decoding wide column value payload"); |
285 | 0 | } |
286 | | |
287 | 0 | columns[i].value() = Slice(data.data() + pos, value_size); |
288 | |
|
289 | 0 | pos += value_size; |
290 | 0 | } |
291 | | |
292 | 0 | return Status::OK(); |
293 | 0 | } |
294 | | |
295 | | Status WideColumnSerialization::DeserializeV2Impl( |
296 | | Slice& input, uint32_t num_columns, std::vector<WideColumn>& columns, |
297 | 0 | std::vector<ValueType>& column_types) { |
298 | | // Section 2: SKIP INFO (3 varints) |
299 | 0 | uint32_t name_sizes_bytes = 0; |
300 | 0 | uint32_t value_sizes_bytes = 0; |
301 | 0 | uint32_t names_bytes = 0; |
302 | 0 | if (!GetVarint32(&input, &name_sizes_bytes)) { |
303 | 0 | return Status::Corruption("Error decoding wide column name sizes bytes"); |
304 | 0 | } |
305 | 0 | if (!GetVarint32(&input, &value_sizes_bytes)) { |
306 | 0 | return Status::Corruption("Error decoding wide column value sizes bytes"); |
307 | 0 | } |
308 | 0 | if (!GetVarint32(&input, &names_bytes)) { |
309 | 0 | return Status::Corruption("Error decoding wide column names bytes"); |
310 | 0 | } |
311 | | |
312 | | // Section 3: COLUMN TYPES (N bytes, each is a ValueType) |
313 | 0 | if (input.size() < num_columns) { |
314 | 0 | return Status::Corruption("Error decoding wide column types"); |
315 | 0 | } |
316 | 0 | column_types.resize(num_columns); |
317 | 0 | for (uint32_t i = 0; i < num_columns; ++i) { |
318 | 0 | column_types[i] = static_cast<ValueType>(input[i]); |
319 | 0 | if (!IsValidColumnValueType(column_types[i])) { |
320 | 0 | return Status::Corruption("Unsupported wide column ValueType"); |
321 | 0 | } |
322 | 0 | } |
323 | 0 | input.remove_prefix(num_columns); |
324 | | |
325 | | // Validate that sections 4-6 fit in the remaining input |
326 | 0 | const size_t metadata_size = |
327 | 0 | name_sizes_bytes + value_sizes_bytes + names_bytes; |
328 | 0 | if (input.size() < metadata_size) { |
329 | 0 | return Status::Corruption("Error decoding wide column sections"); |
330 | 0 | } |
331 | | |
332 | | // Set up 4 pointers into sections 4-7 for single-loop parsing. |
333 | | // Skip info gives us exact boundaries for each section. |
334 | 0 | const char* s4 = input.data(); // section 4: name sizes |
335 | 0 | const char* s4_limit = s4 + name_sizes_bytes; |
336 | 0 | const char* s5 = s4_limit; // section 5: value sizes |
337 | 0 | const char* s5_limit = s5 + value_sizes_bytes; |
338 | 0 | const char* s6 = s5_limit; // section 6: names |
339 | 0 | const char* s7 = s6 + names_bytes; // section 7: values |
340 | 0 | const char* input_end = input.data() + input.size(); |
341 | |
|
342 | 0 | columns.reserve(num_columns); |
343 | 0 | size_t name_pos = 0; |
344 | 0 | size_t value_pos = 0; |
345 | |
|
346 | 0 | for (uint32_t i = 0; i < num_columns; ++i) { |
347 | | // Decode name size from section 4 |
348 | 0 | uint32_t ns = 0; |
349 | 0 | const char* s4_next = GetVarint32Ptr(s4, s4_limit, &ns); |
350 | 0 | if (s4_next == nullptr) { |
351 | 0 | return Status::Corruption("Error decoding wide column name size"); |
352 | 0 | } |
353 | 0 | s4 = s4_next; |
354 | | |
355 | | // Decode value size from section 5 |
356 | 0 | uint32_t vs = 0; |
357 | 0 | const char* s5_next = GetVarint32Ptr(s5, s5_limit, &vs); |
358 | 0 | if (s5_next == nullptr) { |
359 | 0 | return Status::Corruption("Error decoding wide column value size"); |
360 | 0 | } |
361 | 0 | s5 = s5_next; |
362 | | |
363 | | // Read name from section 6 |
364 | 0 | if (name_pos + ns > names_bytes) { |
365 | 0 | return Status::Corruption("Error decoding wide column name"); |
366 | 0 | } |
367 | 0 | Slice name(s6 + name_pos, ns); |
368 | |
|
369 | 0 | if (!columns.empty()) { |
370 | 0 | if (Status so = ValidateColumnOrder(columns.back().name(), name); |
371 | 0 | !so.ok()) { |
372 | 0 | return so; |
373 | 0 | } |
374 | 0 | } |
375 | | |
376 | | // Read value from section 7 |
377 | 0 | if (s7 + value_pos + vs > input_end) { |
378 | 0 | return Status::Corruption("Error decoding wide column value payload"); |
379 | 0 | } |
380 | | |
381 | 0 | columns.emplace_back(name, Slice(s7 + value_pos, vs)); |
382 | 0 | name_pos += ns; |
383 | 0 | value_pos += vs; |
384 | 0 | } |
385 | | |
386 | 0 | return Status::OK(); |
387 | 0 | } |
388 | | |
389 | | Status WideColumnSerialization::Deserialize(Slice& input, |
390 | 0 | WideColumns& columns) { |
391 | 0 | assert(columns.empty()); |
392 | | |
393 | | // Reuse DeserializeV2, then reject any blob references. |
394 | 0 | std::vector<std::pair<size_t, BlobIndex>> blob_columns; |
395 | 0 | if (Status s = DeserializeV2(input, columns, blob_columns); !s.ok()) { |
396 | 0 | return s; |
397 | 0 | } |
398 | | |
399 | 0 | if (!blob_columns.empty()) { |
400 | 0 | return Status::NotSupported( |
401 | 0 | "Wide column contains blob references. Use DeserializeV2."); |
402 | 0 | } |
403 | | |
404 | 0 | return Status::OK(); |
405 | 0 | } |
406 | | |
407 | | Status WideColumnSerialization::DeserializeV2( |
408 | | Slice& input, std::vector<WideColumn>& columns, |
409 | 0 | std::vector<std::pair<size_t, BlobIndex>>& blob_columns) { |
410 | 0 | assert(columns.empty()); |
411 | 0 | assert(blob_columns.empty()); |
412 | |
|
413 | 0 | uint32_t version = 0; |
414 | 0 | if (!GetVarint32(&input, &version)) { |
415 | 0 | return Status::Corruption("Error decoding wide column version"); |
416 | 0 | } |
417 | | |
418 | 0 | if (version > kVersion2) { |
419 | 0 | return Status::NotSupported("Unsupported wide column version"); |
420 | 0 | } |
421 | | |
422 | 0 | uint32_t num_columns = 0; |
423 | 0 | if (!GetVarint32(&input, &num_columns)) { |
424 | 0 | return Status::Corruption("Error decoding number of wide columns"); |
425 | 0 | } |
426 | | |
427 | 0 | if (!num_columns) { |
428 | 0 | return Status::OK(); |
429 | 0 | } |
430 | | |
431 | 0 | if (version >= kVersion2) { |
432 | | // V2 layout: parse columns and extract blob column info |
433 | 0 | std::vector<ValueType> column_types; |
434 | |
|
435 | 0 | if (Status s = DeserializeV2Impl(input, num_columns, columns, column_types); |
436 | 0 | !s.ok()) { |
437 | 0 | return s; |
438 | 0 | } |
439 | 0 | assert(column_types.size() == num_columns); |
440 | 0 | assert(columns.size() == num_columns); |
441 | | |
442 | | // Decode blob indices from value data |
443 | 0 | for (uint32_t i = 0; i < num_columns; ++i) { |
444 | 0 | if (column_types[i] == kTypeBlobIndex) { |
445 | 0 | BlobIndex blob_idx; |
446 | 0 | Slice blob_slice = columns[i].value(); |
447 | 0 | if (Status bs = blob_idx.DecodeFrom(blob_slice); !bs.ok()) { |
448 | 0 | return Status::Corruption("Error decoding blob index in wide column"); |
449 | 0 | } |
450 | 0 | blob_columns.emplace_back(i, blob_idx); |
451 | 0 | } |
452 | 0 | } |
453 | 0 | } else { |
454 | 0 | return DeserializeV1(input, num_columns, columns); |
455 | 0 | } |
456 | | |
457 | 0 | return Status::OK(); |
458 | 0 | } |
459 | | |
460 | | Status WideColumnSerialization::HasBlobColumns(const Slice& input, |
461 | 0 | bool& has_blob_columns) { |
462 | 0 | has_blob_columns = false; |
463 | |
|
464 | 0 | Slice input_ref = input; |
465 | |
|
466 | 0 | uint32_t version = 0; |
467 | 0 | if (!GetVarint32(&input_ref, &version)) { |
468 | 0 | return Status::Corruption("Error decoding wide column version"); |
469 | 0 | } |
470 | | |
471 | | // Version 1 never has blob columns |
472 | 0 | if (version < kVersion2) { |
473 | 0 | return Status::OK(); |
474 | 0 | } |
475 | | |
476 | 0 | uint32_t num_columns = 0; |
477 | 0 | if (!GetVarint32(&input_ref, &num_columns)) { |
478 | 0 | return Status::Corruption("Error decoding number of wide columns"); |
479 | 0 | } |
480 | | |
481 | 0 | if (!num_columns) { |
482 | 0 | return Status::OK(); |
483 | 0 | } |
484 | | |
485 | | // V2: Skip over SKIP INFO (3 varints) to reach COLUMN TYPES section. |
486 | 0 | uint32_t unused_name_sizes_bytes = 0; |
487 | 0 | uint32_t unused_value_sizes_bytes = 0; |
488 | 0 | uint32_t unused_names_bytes = 0; |
489 | 0 | if (!GetVarint32(&input_ref, &unused_name_sizes_bytes) || |
490 | 0 | !GetVarint32(&input_ref, &unused_value_sizes_bytes) || |
491 | 0 | !GetVarint32(&input_ref, &unused_names_bytes)) { |
492 | 0 | return Status::Corruption("Error decoding wide column skip info"); |
493 | 0 | } |
494 | 0 | if (input_ref.size() < num_columns) { |
495 | 0 | return Status::Corruption("Error decoding wide column types"); |
496 | 0 | } |
497 | 0 | has_blob_columns = ContainsBlobType(input_ref.data(), num_columns); |
498 | |
|
499 | 0 | return Status::OK(); |
500 | 0 | } |
501 | | |
502 | | Status WideColumnSerialization::GetVersion(const Slice& input, |
503 | 0 | uint32_t& version) { |
504 | 0 | Slice input_ref = input; |
505 | |
|
506 | 0 | version = 0; |
507 | 0 | if (!GetVarint32(&input_ref, &version)) { |
508 | 0 | return Status::Corruption("Error decoding wide column version"); |
509 | 0 | } |
510 | | |
511 | 0 | return Status::OK(); |
512 | 0 | } |
513 | | |
514 | | Status WideColumnSerialization::GetValueOfDefaultColumn(Slice& input, |
515 | 0 | Slice& value) { |
516 | 0 | Slice input_ref = input; |
517 | |
|
518 | 0 | uint32_t version = 0; |
519 | 0 | if (!GetVarint32(&input_ref, &version)) { |
520 | 0 | return Status::Corruption("Error decoding wide column version"); |
521 | 0 | } |
522 | | |
523 | 0 | if (version > kVersion2) { |
524 | 0 | return Status::NotSupported("Unsupported wide column version"); |
525 | 0 | } |
526 | | |
527 | 0 | uint32_t num_columns = 0; |
528 | 0 | if (!GetVarint32(&input_ref, &num_columns)) { |
529 | 0 | return Status::Corruption("Error decoding number of wide columns"); |
530 | 0 | } |
531 | | |
532 | 0 | if (!num_columns) { |
533 | 0 | value.clear(); |
534 | 0 | return Status::OK(); |
535 | 0 | } |
536 | | |
537 | 0 | if (version >= kVersion2) { |
538 | | // V2 fast path: use skip info to jump directly to values without |
539 | | // scanning through variable-length sections. |
540 | | |
541 | | // Read SKIP INFO (3 varints, immediately after header) |
542 | 0 | uint32_t name_sizes_bytes = 0; |
543 | 0 | uint32_t value_sizes_bytes = 0; |
544 | 0 | uint32_t names_bytes = 0; |
545 | 0 | if (!GetVarint32(&input_ref, &name_sizes_bytes)) { |
546 | 0 | return Status::Corruption("Error decoding wide column name sizes bytes"); |
547 | 0 | } |
548 | 0 | if (!GetVarint32(&input_ref, &value_sizes_bytes)) { |
549 | 0 | return Status::Corruption("Error decoding wide column value sizes bytes"); |
550 | 0 | } |
551 | 0 | if (!GetVarint32(&input_ref, &names_bytes)) { |
552 | 0 | return Status::Corruption("Error decoding wide column names bytes"); |
553 | 0 | } |
554 | | |
555 | | // Read COLUMN TYPES (N bytes) |
556 | 0 | if (input_ref.size() < num_columns) { |
557 | 0 | return Status::Corruption("Error decoding wide column types"); |
558 | 0 | } |
559 | | // Check if default column (index 0) is a blob reference |
560 | 0 | if (static_cast<uint8_t>(input_ref[0]) == kTypeBlobIndex) { |
561 | 0 | return Status::NotSupported( |
562 | 0 | "Wide column contains blob references. Use DeserializeV2."); |
563 | 0 | } |
564 | 0 | input_ref.remove_prefix(num_columns); |
565 | | |
566 | | // Peek first name size from NAME SIZES section |
567 | 0 | if (input_ref.size() < name_sizes_bytes) { |
568 | 0 | return Status::Corruption("Error decoding wide column name sizes"); |
569 | 0 | } |
570 | 0 | Slice name_sizes_section(input_ref.data(), name_sizes_bytes); |
571 | 0 | uint32_t first_name_size = 0; |
572 | 0 | if (!GetVarint32(&name_sizes_section, &first_name_size)) { |
573 | 0 | return Status::Corruption("Error decoding wide column name size"); |
574 | 0 | } |
575 | 0 | input_ref.remove_prefix(name_sizes_bytes); |
576 | | |
577 | | // Peek first value size from VALUE SIZES section |
578 | 0 | if (input_ref.size() < value_sizes_bytes) { |
579 | 0 | return Status::Corruption("Error decoding wide column value sizes"); |
580 | 0 | } |
581 | 0 | Slice value_sizes_section(input_ref.data(), value_sizes_bytes); |
582 | 0 | uint32_t first_value_size = 0; |
583 | 0 | if (!GetVarint32(&value_sizes_section, &first_value_size)) { |
584 | 0 | return Status::Corruption("Error decoding wide column value size"); |
585 | 0 | } |
586 | | // Skip entire VALUE SIZES section using value_sizes_bytes |
587 | 0 | input_ref.remove_prefix(value_sizes_bytes); |
588 | | |
589 | | // Check if the first column is the default column (empty name) |
590 | 0 | if (first_name_size != 0) { |
591 | 0 | value.clear(); |
592 | 0 | return Status::OK(); |
593 | 0 | } |
594 | | |
595 | | // Skip NAMES section |
596 | 0 | if (input_ref.size() < names_bytes) { |
597 | 0 | return Status::Corruption("Error decoding wide column names"); |
598 | 0 | } |
599 | 0 | input_ref.remove_prefix(names_bytes); |
600 | | |
601 | | // Read the first value from VALUES section |
602 | 0 | if (input_ref.size() < first_value_size) { |
603 | 0 | return Status::Corruption("Error decoding wide column value payload"); |
604 | 0 | } |
605 | 0 | value = Slice(input_ref.data(), first_value_size); |
606 | 0 | return Status::OK(); |
607 | 0 | } |
608 | | |
609 | | // V1 fallback: full deserialization |
610 | 0 | WideColumns columns; |
611 | |
|
612 | 0 | if (Status s = Deserialize(input, columns); !s.ok()) { |
613 | 0 | return s; |
614 | 0 | } |
615 | | |
616 | 0 | if (!WideColumnsHelper::HasDefaultColumn(columns)) { |
617 | 0 | value.clear(); |
618 | 0 | return Status::OK(); |
619 | 0 | } |
620 | | |
621 | 0 | value = WideColumnsHelper::GetDefaultColumn(columns); |
622 | |
|
623 | 0 | return Status::OK(); |
624 | 0 | } |
625 | | |
626 | | Status WideColumnSerialization::ResolveEntityBlobColumns( |
627 | | const Slice& entity_value, const Slice& user_key, |
628 | | const BlobFetcher* blob_fetcher, PrefetchBufferCollection* prefetch_buffers, |
629 | | std::string& resolved_entity, bool& resolved, uint64_t* total_bytes_read, |
630 | 0 | uint64_t* num_blobs_resolved) { |
631 | 0 | assert(blob_fetcher); |
632 | |
|
633 | 0 | resolved = false; |
634 | |
|
635 | 0 | std::vector<WideColumn> columns; |
636 | 0 | std::vector<std::pair<size_t, BlobIndex>> blob_columns; |
637 | |
|
638 | 0 | Slice input_copy = entity_value; |
639 | 0 | if (Status s = DeserializeV2(input_copy, columns, blob_columns); !s.ok()) { |
640 | 0 | return s; |
641 | 0 | } |
642 | | |
643 | 0 | if (blob_columns.empty()) { |
644 | 0 | return Status::OK(); |
645 | 0 | } |
646 | | |
647 | 0 | resolved = true; |
648 | | |
649 | | // Fetch each blob value |
650 | 0 | std::vector<std::string> resolved_blob_values; |
651 | 0 | resolved_blob_values.reserve(blob_columns.size()); |
652 | |
|
653 | 0 | for (const auto& blob_col : blob_columns) { |
654 | 0 | const BlobIndex& blob_idx = blob_col.second; |
655 | |
|
656 | 0 | if (blob_idx.IsInlined()) { |
657 | 0 | resolved_blob_values.emplace_back(blob_idx.value().data(), |
658 | 0 | blob_idx.value().size()); |
659 | 0 | continue; |
660 | 0 | } |
661 | | |
662 | 0 | FilePrefetchBuffer* prefetch_buffer = |
663 | 0 | prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer( |
664 | 0 | blob_idx.file_number()) |
665 | 0 | : nullptr; |
666 | |
|
667 | 0 | uint64_t bytes_read = 0; |
668 | |
|
669 | 0 | PinnableSlice blob_value; |
670 | 0 | const Status fetch_s = blob_fetcher->FetchBlob( |
671 | 0 | user_key, blob_idx, prefetch_buffer, &blob_value, &bytes_read); |
672 | 0 | if (!fetch_s.ok()) { |
673 | 0 | return fetch_s; |
674 | 0 | } |
675 | | |
676 | 0 | resolved_blob_values.emplace_back(blob_value.data(), blob_value.size()); |
677 | |
|
678 | 0 | if (total_bytes_read) { |
679 | 0 | *total_bytes_read += bytes_read; |
680 | 0 | } |
681 | 0 | } |
682 | | |
683 | 0 | if (num_blobs_resolved) { |
684 | 0 | *num_blobs_resolved += blob_columns.size(); |
685 | 0 | } |
686 | |
|
687 | 0 | return SerializeResolvedEntity(columns, blob_columns, resolved_blob_values, |
688 | 0 | resolved_entity); |
689 | 0 | } |
690 | | |
691 | | Status WideColumnSerialization::GetValueOfDefaultColumnResolvingBlobs( |
692 | | const Slice& entity_value, const Slice& user_key, |
693 | 0 | const BlobFetcher* blob_fetcher, PinnableSlice& result, bool& resolved) { |
694 | 0 | assert(blob_fetcher); |
695 | |
|
696 | 0 | resolved = false; |
697 | |
|
698 | 0 | std::vector<WideColumn> columns; |
699 | 0 | std::vector<std::pair<size_t, BlobIndex>> blob_columns; |
700 | |
|
701 | 0 | Slice input_copy = entity_value; |
702 | 0 | if (Status s = DeserializeV2(input_copy, columns, blob_columns); !s.ok()) { |
703 | 0 | return s; |
704 | 0 | } |
705 | | |
706 | | // The default column (empty name) is always at index 0 when present |
707 | | // (columns are sorted by name). |
708 | 0 | if (columns.empty() || columns[0].name() != kDefaultWideColumnName) { |
709 | 0 | result.PinSelf(Slice()); |
710 | 0 | return Status::OK(); |
711 | 0 | } |
712 | | |
713 | | // Check if the default column (index 0) is a blob reference |
714 | 0 | for (const auto& blob_col : blob_columns) { |
715 | 0 | if (blob_col.first == 0) { |
716 | 0 | const BlobIndex& blob_idx = blob_col.second; |
717 | |
|
718 | 0 | resolved = true; |
719 | |
|
720 | 0 | if (blob_idx.IsInlined()) { |
721 | 0 | result.PinSelf(blob_idx.value()); |
722 | 0 | return Status::OK(); |
723 | 0 | } |
724 | | |
725 | 0 | return blob_fetcher->FetchBlob(user_key, blob_idx, |
726 | 0 | nullptr /* prefetch_buffer */, &result, |
727 | 0 | nullptr /* bytes_read */); |
728 | 0 | } |
729 | 0 | } |
730 | | |
731 | | // Default column is inline |
732 | 0 | result.PinSelf(columns[0].value()); |
733 | 0 | return Status::OK(); |
734 | 0 | } |
735 | | |
736 | | Status WideColumnSerialization::SerializeResolvedEntity( |
737 | | const std::vector<WideColumn>& columns, |
738 | | const std::vector<std::pair<size_t, BlobIndex>>& blob_columns, |
739 | 0 | const std::vector<std::string>& resolved_blob_values, std::string& output) { |
740 | 0 | assert(blob_columns.size() == resolved_blob_values.size()); |
741 | | |
742 | | // blob_columns is sorted by column index and typically small, so use a |
743 | | // linear scan with a cursor instead of an unordered_map. |
744 | 0 | size_t blob_cursor = 0; |
745 | | |
746 | | // Build result columns with resolved blob values |
747 | 0 | WideColumns result_columns; |
748 | 0 | result_columns.reserve(columns.size()); |
749 | |
|
750 | 0 | for (size_t i = 0; i < columns.size(); ++i) { |
751 | 0 | if (blob_cursor < blob_columns.size() && |
752 | 0 | blob_columns[blob_cursor].first == i) { |
753 | | // This is a blob column - use the resolved value |
754 | 0 | result_columns.emplace_back(columns[i].name(), |
755 | 0 | Slice(resolved_blob_values[blob_cursor])); |
756 | 0 | ++blob_cursor; |
757 | 0 | } else { |
758 | | // This is an inline column - use the original value |
759 | 0 | result_columns.emplace_back(columns[i].name(), columns[i].value()); |
760 | 0 | } |
761 | 0 | } |
762 | | |
763 | | // Serialize using V1 format (all values inline) |
764 | 0 | return Serialize(result_columns, output); |
765 | 0 | } |
766 | | |
767 | | } // namespace ROCKSDB_NAMESPACE |