Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/wide/wide_column_serialization.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/wide/wide_column_serialization.h"
7
8
#include <cassert>
9
#include <cstring>
10
11
#include "db/blob/blob_fetcher.h"
12
#include "db/blob/blob_index.h"
13
#include "db/blob/prefetch_buffer_collection.h"
14
#include "db/wide/wide_columns_helper.h"
15
#include "rocksdb/slice.h"
16
#include "util/autovector.h"
17
#include "util/coding.h"
18
19
namespace ROCKSDB_NAMESPACE {
20
21
Status WideColumnSerialization::BuildBlobIndexMap(
22
    size_t num_columns,
23
    const std::vector<std::pair<size_t, BlobIndex>>& blob_columns,
24
0
    std::vector<const BlobIndex*>& blob_index_map) {
25
0
  if (Status s = ValidateWideColumnLimit(num_columns, "Too many wide columns");
26
0
      !s.ok()) {
27
0
    return s;
28
0
  }
29
30
0
  blob_index_map.assign(num_columns, nullptr);
31
0
  for (const auto& blob_col : blob_columns) {
32
0
    if (blob_col.first >= blob_index_map.size()) {
33
0
      return Status::InvalidArgument("Blob column index out of range");
34
0
    }
35
0
    blob_index_map[blob_col.first] = &blob_col.second;
36
0
  }
37
38
0
  return Status::OK();
39
0
}
40
41
bool WideColumnSerialization::ContainsBlobType(const char* type_bytes,
42
0
                                               uint32_t num_columns) {
43
0
  for (uint32_t i = 0; i < num_columns; ++i) {
44
0
    if (static_cast<uint8_t>(type_bytes[i]) == kTypeBlobIndex) {
45
0
      return true;
46
0
    }
47
0
  }
48
0
  return false;
49
0
}
50
51
Status WideColumnSerialization::Serialize(const WideColumns& columns,
52
0
                                          std::string& output) {
53
0
  const size_t num_columns = columns.size();
54
55
0
  if (Status sv = ValidateWideColumnLimit(num_columns, "Too many wide columns");
56
0
      !sv.ok()) {
57
0
    return sv;
58
0
  }
59
60
0
  PutVarint32(&output, kVersion1);
61
62
0
  PutVarint32(&output, static_cast<uint32_t>(num_columns));
63
64
0
  const Slice* prev_name = nullptr;
65
66
0
  for (size_t i = 0; i < columns.size(); ++i) {
67
0
    const WideColumn& column = columns[i];
68
69
0
    const Slice& name = column.name();
70
0
    if (Status s_name =
71
0
            ValidateWideColumnLimit(name.size(), "Wide column name too long");
72
0
        !s_name.ok()) {
73
0
      return s_name;
74
0
    }
75
76
0
    if (prev_name) {
77
0
      if (Status so = ValidateColumnOrder(*prev_name, name); !so.ok()) {
78
0
        return so;
79
0
      }
80
0
    }
81
82
0
    const Slice& value = column.value();
83
0
    if (Status s_val =
84
0
            ValidateWideColumnLimit(value.size(), "Wide column value too long");
85
0
        !s_val.ok()) {
86
0
      return s_val;
87
0
    }
88
89
0
    PutLengthPrefixedSlice(&output, name);
90
0
    PutVarint32(&output, static_cast<uint32_t>(value.size()));
91
92
0
    prev_name = &name;
93
0
  }
94
95
0
  for (const auto& column : columns) {
96
0
    const Slice& value = column.value();
97
98
0
    output.append(value.data(), value.size());
99
0
  }
100
101
0
  return Status::OK();
102
0
}
103
104
template <typename GetName, typename GetValue>
105
Status WideColumnSerialization::SerializeV2Impl(
106
    size_t num_columns,
107
    const std::vector<std::pair<size_t, BlobIndex>>& blob_columns,
108
0
    std::string& output, GetName get_name, GetValue get_value) {
109
0
  std::vector<const BlobIndex*> blob_index_map;
110
0
  if (Status s = BuildBlobIndexMap(num_columns, blob_columns, blob_index_map);
111
0
      !s.ok()) {
112
0
    return s;
113
0
  }
114
0
  assert(blob_index_map.size() == num_columns);
115
116
  // First pass: validate column ordering, compute sizes, serialize blob
117
  // indices, and build column types.
118
0
  std::vector<std::string> serialized_blob_indices(num_columns);
119
0
  std::vector<uint32_t> name_sizes(num_columns);
120
0
  std::vector<uint32_t> value_sizes(num_columns);
121
0
  std::string column_types;
122
0
  column_types.reserve(num_columns);
123
124
0
  Slice prev_name_storage;
125
0
  bool has_prev = false;
126
0
  uint32_t name_sizes_bytes = 0;
127
0
  uint32_t names_bytes = 0;
128
0
  uint32_t total_value_sizes_bytes = 0;
129
0
  uint32_t total_values_bytes = 0;
130
131
0
  for (size_t i = 0; i < num_columns; ++i) {
132
0
    const Slice name = get_name(i);
133
0
    const Slice value = get_value(i);
134
135
0
    if (Status sn =
136
0
            ValidateWideColumnLimit(name.size(), "Wide column name too long");
137
0
        !sn.ok()) {
138
0
      return sn;
139
0
    }
140
141
0
    if (has_prev) {
142
0
      if (Status so = ValidateColumnOrder(prev_name_storage, name); !so.ok()) {
143
0
        return so;
144
0
      }
145
0
    }
146
147
0
    name_sizes[i] = static_cast<uint32_t>(name.size());
148
0
    name_sizes_bytes += VarintLength(name_sizes[i]);
149
0
    names_bytes += name_sizes[i];
150
151
0
    if (blob_index_map[i] != nullptr) {
152
0
      const BlobIndex* blob_idx = blob_index_map[i];
153
0
      blob_idx->EncodeTo(&serialized_blob_indices[i]);
154
0
      value_sizes[i] = static_cast<uint32_t>(serialized_blob_indices[i].size());
155
0
      column_types.push_back(static_cast<char>(kTypeBlobIndex));
156
0
    } else {
157
0
      if (Status svl = ValidateWideColumnLimit(value.size(),
158
0
                                               "Wide column value too long");
159
0
          !svl.ok()) {
160
0
        return svl;
161
0
      }
162
0
      value_sizes[i] = static_cast<uint32_t>(value.size());
163
0
      column_types.push_back(static_cast<char>(kTypeValue));
164
0
    }
165
166
0
    total_value_sizes_bytes += VarintLength(value_sizes[i]);
167
0
    total_values_bytes += value_sizes[i];
168
169
0
    prev_name_storage = name;
170
0
    has_prev = true;
171
0
  }
172
173
  // Second pass: write all V2 sections to output.
174
  // Pre-allocate output string.
175
0
  const size_t total_size =
176
0
      VarintLength(kVersion2) +
177
0
      VarintLength(static_cast<uint32_t>(num_columns)) +
178
0
      num_columns +  // column types
179
0
      VarintLength(name_sizes_bytes) + VarintLength(total_value_sizes_bytes) +
180
0
      VarintLength(names_bytes) + name_sizes_bytes + total_value_sizes_bytes +
181
0
      names_bytes + total_values_bytes;
182
183
0
  const size_t base_offset = output.size();
184
0
  output.reserve(base_offset + total_size);
185
186
  // Sections 1-3: header, skip info, column types
187
0
  PutVarint32(&output, kVersion2);
188
0
  PutVarint32(&output, static_cast<uint32_t>(num_columns));
189
0
  PutVarint32(&output, name_sizes_bytes);
190
0
  PutVarint32(&output, total_value_sizes_bytes);
191
0
  PutVarint32(&output, names_bytes);
192
0
  output.append(column_types);
193
194
  // Sections 4-7: resize to final size, then write all 4 sections in a
195
  // single loop using independent pointers. Each section's start offset is
196
  // known from the sizes computed in the first pass.
197
0
  if (num_columns == 0) {
198
0
    return Status::OK();
199
0
  }
200
201
0
  const size_t sec4_offset = output.size();
202
0
  output.resize(base_offset + total_size);
203
204
0
  char* s4 = &output[sec4_offset];          // section 4: name sizes
205
0
  char* s5 = s4 + name_sizes_bytes;         // section 5: value sizes
206
0
  char* s6 = s5 + total_value_sizes_bytes;  // section 6: names
207
0
  char* s7 = s6 + names_bytes;              // section 7: values
208
209
0
  for (size_t i = 0; i < num_columns; ++i) {
210
0
    s4 = EncodeVarint32(s4, name_sizes[i]);
211
0
    s5 = EncodeVarint32(s5, value_sizes[i]);
212
213
0
    memcpy(s6, get_name(i).data(), name_sizes[i]);
214
0
    s6 += name_sizes[i];
215
216
0
    if (blob_index_map[i] != nullptr) {
217
0
      memcpy(s7, serialized_blob_indices[i].data(), value_sizes[i]);
218
0
    } else {
219
0
      memcpy(s7, get_value(i).data(), value_sizes[i]);
220
0
    }
221
0
    s7 += value_sizes[i];
222
0
  }
223
224
0
  return Status::OK();
225
0
}
Unexecuted instantiation: wide_column_serialization.cc:rocksdb::Status rocksdb::WideColumnSerialization::SerializeV2Impl<rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_0, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_1>(unsigned long, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_0, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > >, std::__1::allocator<std::__1::pair<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_1)
Unexecuted instantiation: wide_column_serialization.cc:rocksdb::Status rocksdb::WideColumnSerialization::SerializeV2Impl<rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_0, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_1>(unsigned long, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_0, rocksdb::WideColumnSerialization::SerializeV2(std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<std::__1::pair<unsigned long, rocksdb::BlobIndex>, std::__1::allocator<std::__1::pair<unsigned long, rocksdb::BlobIndex> > > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&)::$_1)
226
227
Status WideColumnSerialization::SerializeV2(
228
    const std::vector<std::pair<std::string, std::string>>& columns,
229
    const std::vector<std::pair<size_t, BlobIndex>>& blob_columns,
230
0
    std::string& output) {
231
0
  return SerializeV2Impl(
232
0
      columns.size(), blob_columns, output,
233
0
      [&](size_t i) { return Slice(columns[i].first); },
234
0
      [&](size_t i) { return Slice(columns[i].second); });
235
0
}
236
237
Status WideColumnSerialization::SerializeV2(
238
    const WideColumns& columns,
239
    const std::vector<std::pair<size_t, BlobIndex>>& blob_columns,
240
0
    std::string& output) {
241
0
  return SerializeV2Impl(
242
0
      columns.size(), blob_columns, output,
243
0
      [&](size_t i) { return columns[i].name(); },
244
0
      [&](size_t i) { return columns[i].value(); });
245
0
}
246
247
Status WideColumnSerialization::DeserializeV1(
248
0
    Slice& input, uint32_t num_columns, std::vector<WideColumn>& columns) {
249
0
  columns.reserve(num_columns);
250
251
0
  autovector<uint32_t, 16> column_value_sizes;
252
0
  column_value_sizes.reserve(num_columns);
253
254
0
  for (uint32_t i = 0; i < num_columns; ++i) {
255
0
    Slice name;
256
0
    if (!GetLengthPrefixedSlice(&input, &name)) {
257
0
      return Status::Corruption("Error decoding wide column name");
258
0
    }
259
260
0
    if (!columns.empty()) {
261
0
      if (Status so = ValidateColumnOrder(columns.back().name(), name);
262
0
          !so.ok()) {
263
0
        return so;
264
0
      }
265
0
    }
266
267
0
    columns.emplace_back(name, Slice());
268
269
0
    uint32_t value_size = 0;
270
0
    if (!GetVarint32(&input, &value_size)) {
271
0
      return Status::Corruption("Error decoding wide column value size");
272
0
    }
273
274
0
    column_value_sizes.emplace_back(value_size);
275
0
  }
276
277
0
  const Slice data(input);
278
0
  size_t pos = 0;
279
280
0
  for (uint32_t i = 0; i < num_columns; ++i) {
281
0
    const uint32_t value_size = column_value_sizes[i];
282
283
0
    if (pos + value_size > data.size()) {
284
0
      return Status::Corruption("Error decoding wide column value payload");
285
0
    }
286
287
0
    columns[i].value() = Slice(data.data() + pos, value_size);
288
289
0
    pos += value_size;
290
0
  }
291
292
0
  return Status::OK();
293
0
}
294
295
Status WideColumnSerialization::DeserializeV2Impl(
296
    Slice& input, uint32_t num_columns, std::vector<WideColumn>& columns,
297
0
    std::vector<ValueType>& column_types) {
298
  // Section 2: SKIP INFO (3 varints)
299
0
  uint32_t name_sizes_bytes = 0;
300
0
  uint32_t value_sizes_bytes = 0;
301
0
  uint32_t names_bytes = 0;
302
0
  if (!GetVarint32(&input, &name_sizes_bytes)) {
303
0
    return Status::Corruption("Error decoding wide column name sizes bytes");
304
0
  }
305
0
  if (!GetVarint32(&input, &value_sizes_bytes)) {
306
0
    return Status::Corruption("Error decoding wide column value sizes bytes");
307
0
  }
308
0
  if (!GetVarint32(&input, &names_bytes)) {
309
0
    return Status::Corruption("Error decoding wide column names bytes");
310
0
  }
311
312
  // Section 3: COLUMN TYPES (N bytes, each is a ValueType)
313
0
  if (input.size() < num_columns) {
314
0
    return Status::Corruption("Error decoding wide column types");
315
0
  }
316
0
  column_types.resize(num_columns);
317
0
  for (uint32_t i = 0; i < num_columns; ++i) {
318
0
    column_types[i] = static_cast<ValueType>(input[i]);
319
0
    if (!IsValidColumnValueType(column_types[i])) {
320
0
      return Status::Corruption("Unsupported wide column ValueType");
321
0
    }
322
0
  }
323
0
  input.remove_prefix(num_columns);
324
325
  // Validate that sections 4-6 fit in the remaining input
326
0
  const size_t metadata_size =
327
0
      name_sizes_bytes + value_sizes_bytes + names_bytes;
328
0
  if (input.size() < metadata_size) {
329
0
    return Status::Corruption("Error decoding wide column sections");
330
0
  }
331
332
  // Set up 4 pointers into sections 4-7 for single-loop parsing.
333
  // Skip info gives us exact boundaries for each section.
334
0
  const char* s4 = input.data();  // section 4: name sizes
335
0
  const char* s4_limit = s4 + name_sizes_bytes;
336
0
  const char* s5 = s4_limit;  // section 5: value sizes
337
0
  const char* s5_limit = s5 + value_sizes_bytes;
338
0
  const char* s6 = s5_limit;          // section 6: names
339
0
  const char* s7 = s6 + names_bytes;  // section 7: values
340
0
  const char* input_end = input.data() + input.size();
341
342
0
  columns.reserve(num_columns);
343
0
  size_t name_pos = 0;
344
0
  size_t value_pos = 0;
345
346
0
  for (uint32_t i = 0; i < num_columns; ++i) {
347
    // Decode name size from section 4
348
0
    uint32_t ns = 0;
349
0
    const char* s4_next = GetVarint32Ptr(s4, s4_limit, &ns);
350
0
    if (s4_next == nullptr) {
351
0
      return Status::Corruption("Error decoding wide column name size");
352
0
    }
353
0
    s4 = s4_next;
354
355
    // Decode value size from section 5
356
0
    uint32_t vs = 0;
357
0
    const char* s5_next = GetVarint32Ptr(s5, s5_limit, &vs);
358
0
    if (s5_next == nullptr) {
359
0
      return Status::Corruption("Error decoding wide column value size");
360
0
    }
361
0
    s5 = s5_next;
362
363
    // Read name from section 6
364
0
    if (name_pos + ns > names_bytes) {
365
0
      return Status::Corruption("Error decoding wide column name");
366
0
    }
367
0
    Slice name(s6 + name_pos, ns);
368
369
0
    if (!columns.empty()) {
370
0
      if (Status so = ValidateColumnOrder(columns.back().name(), name);
371
0
          !so.ok()) {
372
0
        return so;
373
0
      }
374
0
    }
375
376
    // Read value from section 7
377
0
    if (s7 + value_pos + vs > input_end) {
378
0
      return Status::Corruption("Error decoding wide column value payload");
379
0
    }
380
381
0
    columns.emplace_back(name, Slice(s7 + value_pos, vs));
382
0
    name_pos += ns;
383
0
    value_pos += vs;
384
0
  }
385
386
0
  return Status::OK();
387
0
}
388
389
Status WideColumnSerialization::Deserialize(Slice& input,
390
0
                                            WideColumns& columns) {
391
0
  assert(columns.empty());
392
393
  // Reuse DeserializeV2, then reject any blob references.
394
0
  std::vector<std::pair<size_t, BlobIndex>> blob_columns;
395
0
  if (Status s = DeserializeV2(input, columns, blob_columns); !s.ok()) {
396
0
    return s;
397
0
  }
398
399
0
  if (!blob_columns.empty()) {
400
0
    return Status::NotSupported(
401
0
        "Wide column contains blob references. Use DeserializeV2.");
402
0
  }
403
404
0
  return Status::OK();
405
0
}
406
407
Status WideColumnSerialization::DeserializeV2(
408
    Slice& input, std::vector<WideColumn>& columns,
409
0
    std::vector<std::pair<size_t, BlobIndex>>& blob_columns) {
410
0
  assert(columns.empty());
411
0
  assert(blob_columns.empty());
412
413
0
  uint32_t version = 0;
414
0
  if (!GetVarint32(&input, &version)) {
415
0
    return Status::Corruption("Error decoding wide column version");
416
0
  }
417
418
0
  if (version > kVersion2) {
419
0
    return Status::NotSupported("Unsupported wide column version");
420
0
  }
421
422
0
  uint32_t num_columns = 0;
423
0
  if (!GetVarint32(&input, &num_columns)) {
424
0
    return Status::Corruption("Error decoding number of wide columns");
425
0
  }
426
427
0
  if (!num_columns) {
428
0
    return Status::OK();
429
0
  }
430
431
0
  if (version >= kVersion2) {
432
    // V2 layout: parse columns and extract blob column info
433
0
    std::vector<ValueType> column_types;
434
435
0
    if (Status s = DeserializeV2Impl(input, num_columns, columns, column_types);
436
0
        !s.ok()) {
437
0
      return s;
438
0
    }
439
0
    assert(column_types.size() == num_columns);
440
0
    assert(columns.size() == num_columns);
441
442
    // Decode blob indices from value data
443
0
    for (uint32_t i = 0; i < num_columns; ++i) {
444
0
      if (column_types[i] == kTypeBlobIndex) {
445
0
        BlobIndex blob_idx;
446
0
        Slice blob_slice = columns[i].value();
447
0
        if (Status bs = blob_idx.DecodeFrom(blob_slice); !bs.ok()) {
448
0
          return Status::Corruption("Error decoding blob index in wide column");
449
0
        }
450
0
        blob_columns.emplace_back(i, blob_idx);
451
0
      }
452
0
    }
453
0
  } else {
454
0
    return DeserializeV1(input, num_columns, columns);
455
0
  }
456
457
0
  return Status::OK();
458
0
}
459
460
Status WideColumnSerialization::HasBlobColumns(const Slice& input,
461
0
                                               bool& has_blob_columns) {
462
0
  has_blob_columns = false;
463
464
0
  Slice input_ref = input;
465
466
0
  uint32_t version = 0;
467
0
  if (!GetVarint32(&input_ref, &version)) {
468
0
    return Status::Corruption("Error decoding wide column version");
469
0
  }
470
471
  // Version 1 never has blob columns
472
0
  if (version < kVersion2) {
473
0
    return Status::OK();
474
0
  }
475
476
0
  uint32_t num_columns = 0;
477
0
  if (!GetVarint32(&input_ref, &num_columns)) {
478
0
    return Status::Corruption("Error decoding number of wide columns");
479
0
  }
480
481
0
  if (!num_columns) {
482
0
    return Status::OK();
483
0
  }
484
485
  // V2: Skip over SKIP INFO (3 varints) to reach COLUMN TYPES section.
486
0
  uint32_t unused_name_sizes_bytes = 0;
487
0
  uint32_t unused_value_sizes_bytes = 0;
488
0
  uint32_t unused_names_bytes = 0;
489
0
  if (!GetVarint32(&input_ref, &unused_name_sizes_bytes) ||
490
0
      !GetVarint32(&input_ref, &unused_value_sizes_bytes) ||
491
0
      !GetVarint32(&input_ref, &unused_names_bytes)) {
492
0
    return Status::Corruption("Error decoding wide column skip info");
493
0
  }
494
0
  if (input_ref.size() < num_columns) {
495
0
    return Status::Corruption("Error decoding wide column types");
496
0
  }
497
0
  has_blob_columns = ContainsBlobType(input_ref.data(), num_columns);
498
499
0
  return Status::OK();
500
0
}
501
502
Status WideColumnSerialization::GetVersion(const Slice& input,
503
0
                                           uint32_t& version) {
504
0
  Slice input_ref = input;
505
506
0
  version = 0;
507
0
  if (!GetVarint32(&input_ref, &version)) {
508
0
    return Status::Corruption("Error decoding wide column version");
509
0
  }
510
511
0
  return Status::OK();
512
0
}
513
514
Status WideColumnSerialization::GetValueOfDefaultColumn(Slice& input,
515
0
                                                        Slice& value) {
516
0
  Slice input_ref = input;
517
518
0
  uint32_t version = 0;
519
0
  if (!GetVarint32(&input_ref, &version)) {
520
0
    return Status::Corruption("Error decoding wide column version");
521
0
  }
522
523
0
  if (version > kVersion2) {
524
0
    return Status::NotSupported("Unsupported wide column version");
525
0
  }
526
527
0
  uint32_t num_columns = 0;
528
0
  if (!GetVarint32(&input_ref, &num_columns)) {
529
0
    return Status::Corruption("Error decoding number of wide columns");
530
0
  }
531
532
0
  if (!num_columns) {
533
0
    value.clear();
534
0
    return Status::OK();
535
0
  }
536
537
0
  if (version >= kVersion2) {
538
    // V2 fast path: use skip info to jump directly to values without
539
    // scanning through variable-length sections.
540
541
    // Read SKIP INFO (3 varints, immediately after header)
542
0
    uint32_t name_sizes_bytes = 0;
543
0
    uint32_t value_sizes_bytes = 0;
544
0
    uint32_t names_bytes = 0;
545
0
    if (!GetVarint32(&input_ref, &name_sizes_bytes)) {
546
0
      return Status::Corruption("Error decoding wide column name sizes bytes");
547
0
    }
548
0
    if (!GetVarint32(&input_ref, &value_sizes_bytes)) {
549
0
      return Status::Corruption("Error decoding wide column value sizes bytes");
550
0
    }
551
0
    if (!GetVarint32(&input_ref, &names_bytes)) {
552
0
      return Status::Corruption("Error decoding wide column names bytes");
553
0
    }
554
555
    // Read COLUMN TYPES (N bytes)
556
0
    if (input_ref.size() < num_columns) {
557
0
      return Status::Corruption("Error decoding wide column types");
558
0
    }
559
    // Check if default column (index 0) is a blob reference
560
0
    if (static_cast<uint8_t>(input_ref[0]) == kTypeBlobIndex) {
561
0
      return Status::NotSupported(
562
0
          "Wide column contains blob references. Use DeserializeV2.");
563
0
    }
564
0
    input_ref.remove_prefix(num_columns);
565
566
    // Peek first name size from NAME SIZES section
567
0
    if (input_ref.size() < name_sizes_bytes) {
568
0
      return Status::Corruption("Error decoding wide column name sizes");
569
0
    }
570
0
    Slice name_sizes_section(input_ref.data(), name_sizes_bytes);
571
0
    uint32_t first_name_size = 0;
572
0
    if (!GetVarint32(&name_sizes_section, &first_name_size)) {
573
0
      return Status::Corruption("Error decoding wide column name size");
574
0
    }
575
0
    input_ref.remove_prefix(name_sizes_bytes);
576
577
    // Peek first value size from VALUE SIZES section
578
0
    if (input_ref.size() < value_sizes_bytes) {
579
0
      return Status::Corruption("Error decoding wide column value sizes");
580
0
    }
581
0
    Slice value_sizes_section(input_ref.data(), value_sizes_bytes);
582
0
    uint32_t first_value_size = 0;
583
0
    if (!GetVarint32(&value_sizes_section, &first_value_size)) {
584
0
      return Status::Corruption("Error decoding wide column value size");
585
0
    }
586
    // Skip entire VALUE SIZES section using value_sizes_bytes
587
0
    input_ref.remove_prefix(value_sizes_bytes);
588
589
    // Check if the first column is the default column (empty name)
590
0
    if (first_name_size != 0) {
591
0
      value.clear();
592
0
      return Status::OK();
593
0
    }
594
595
    // Skip NAMES section
596
0
    if (input_ref.size() < names_bytes) {
597
0
      return Status::Corruption("Error decoding wide column names");
598
0
    }
599
0
    input_ref.remove_prefix(names_bytes);
600
601
    // Read the first value from VALUES section
602
0
    if (input_ref.size() < first_value_size) {
603
0
      return Status::Corruption("Error decoding wide column value payload");
604
0
    }
605
0
    value = Slice(input_ref.data(), first_value_size);
606
0
    return Status::OK();
607
0
  }
608
609
  // V1 fallback: full deserialization
610
0
  WideColumns columns;
611
612
0
  if (Status s = Deserialize(input, columns); !s.ok()) {
613
0
    return s;
614
0
  }
615
616
0
  if (!WideColumnsHelper::HasDefaultColumn(columns)) {
617
0
    value.clear();
618
0
    return Status::OK();
619
0
  }
620
621
0
  value = WideColumnsHelper::GetDefaultColumn(columns);
622
623
0
  return Status::OK();
624
0
}
625
626
Status WideColumnSerialization::ResolveEntityBlobColumns(
627
    const Slice& entity_value, const Slice& user_key,
628
    const BlobFetcher* blob_fetcher, PrefetchBufferCollection* prefetch_buffers,
629
    std::string& resolved_entity, bool& resolved, uint64_t* total_bytes_read,
630
0
    uint64_t* num_blobs_resolved) {
631
0
  assert(blob_fetcher);
632
633
0
  resolved = false;
634
635
0
  std::vector<WideColumn> columns;
636
0
  std::vector<std::pair<size_t, BlobIndex>> blob_columns;
637
638
0
  Slice input_copy = entity_value;
639
0
  if (Status s = DeserializeV2(input_copy, columns, blob_columns); !s.ok()) {
640
0
    return s;
641
0
  }
642
643
0
  if (blob_columns.empty()) {
644
0
    return Status::OK();
645
0
  }
646
647
0
  resolved = true;
648
649
  // Fetch each blob value
650
0
  std::vector<std::string> resolved_blob_values;
651
0
  resolved_blob_values.reserve(blob_columns.size());
652
653
0
  for (const auto& blob_col : blob_columns) {
654
0
    const BlobIndex& blob_idx = blob_col.second;
655
656
0
    if (blob_idx.IsInlined()) {
657
0
      resolved_blob_values.emplace_back(blob_idx.value().data(),
658
0
                                        blob_idx.value().size());
659
0
      continue;
660
0
    }
661
662
0
    FilePrefetchBuffer* prefetch_buffer =
663
0
        prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer(
664
0
                               blob_idx.file_number())
665
0
                         : nullptr;
666
667
0
    uint64_t bytes_read = 0;
668
669
0
    PinnableSlice blob_value;
670
0
    const Status fetch_s = blob_fetcher->FetchBlob(
671
0
        user_key, blob_idx, prefetch_buffer, &blob_value, &bytes_read);
672
0
    if (!fetch_s.ok()) {
673
0
      return fetch_s;
674
0
    }
675
676
0
    resolved_blob_values.emplace_back(blob_value.data(), blob_value.size());
677
678
0
    if (total_bytes_read) {
679
0
      *total_bytes_read += bytes_read;
680
0
    }
681
0
  }
682
683
0
  if (num_blobs_resolved) {
684
0
    *num_blobs_resolved += blob_columns.size();
685
0
  }
686
687
0
  return SerializeResolvedEntity(columns, blob_columns, resolved_blob_values,
688
0
                                 resolved_entity);
689
0
}
690
691
Status WideColumnSerialization::GetValueOfDefaultColumnResolvingBlobs(
692
    const Slice& entity_value, const Slice& user_key,
693
0
    const BlobFetcher* blob_fetcher, PinnableSlice& result, bool& resolved) {
694
0
  assert(blob_fetcher);
695
696
0
  resolved = false;
697
698
0
  std::vector<WideColumn> columns;
699
0
  std::vector<std::pair<size_t, BlobIndex>> blob_columns;
700
701
0
  Slice input_copy = entity_value;
702
0
  if (Status s = DeserializeV2(input_copy, columns, blob_columns); !s.ok()) {
703
0
    return s;
704
0
  }
705
706
  // The default column (empty name) is always at index 0 when present
707
  // (columns are sorted by name).
708
0
  if (columns.empty() || columns[0].name() != kDefaultWideColumnName) {
709
0
    result.PinSelf(Slice());
710
0
    return Status::OK();
711
0
  }
712
713
  // Check if the default column (index 0) is a blob reference
714
0
  for (const auto& blob_col : blob_columns) {
715
0
    if (blob_col.first == 0) {
716
0
      const BlobIndex& blob_idx = blob_col.second;
717
718
0
      resolved = true;
719
720
0
      if (blob_idx.IsInlined()) {
721
0
        result.PinSelf(blob_idx.value());
722
0
        return Status::OK();
723
0
      }
724
725
0
      return blob_fetcher->FetchBlob(user_key, blob_idx,
726
0
                                     nullptr /* prefetch_buffer */, &result,
727
0
                                     nullptr /* bytes_read */);
728
0
    }
729
0
  }
730
731
  // Default column is inline
732
0
  result.PinSelf(columns[0].value());
733
0
  return Status::OK();
734
0
}
735
736
Status WideColumnSerialization::SerializeResolvedEntity(
737
    const std::vector<WideColumn>& columns,
738
    const std::vector<std::pair<size_t, BlobIndex>>& blob_columns,
739
0
    const std::vector<std::string>& resolved_blob_values, std::string& output) {
740
0
  assert(blob_columns.size() == resolved_blob_values.size());
741
742
  // blob_columns is sorted by column index and typically small, so use a
743
  // linear scan with a cursor instead of an unordered_map.
744
0
  size_t blob_cursor = 0;
745
746
  // Build result columns with resolved blob values
747
0
  WideColumns result_columns;
748
0
  result_columns.reserve(columns.size());
749
750
0
  for (size_t i = 0; i < columns.size(); ++i) {
751
0
    if (blob_cursor < blob_columns.size() &&
752
0
        blob_columns[blob_cursor].first == i) {
753
      // This is a blob column - use the resolved value
754
0
      result_columns.emplace_back(columns[i].name(),
755
0
                                  Slice(resolved_blob_values[blob_cursor]));
756
0
      ++blob_cursor;
757
0
    } else {
758
      // This is an inline column - use the original value
759
0
      result_columns.emplace_back(columns[i].name(), columns[i].value());
760
0
    }
761
0
  }
762
763
  // Serialize using V1 format (all values inline)
764
0
  return Serialize(result_columns, output);
765
0
}
766
767
}  // namespace ROCKSDB_NAMESPACE