Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/table/block_based/block_builder.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
//
10
// BlockBuilder generates blocks where keys are prefix-compressed:
11
//
12
// When we store a key, we drop the prefix shared with the previous
13
// string.  This helps reduce the space requirement significantly.
14
// Furthermore, once every K keys, we do not apply the prefix
15
// compression and store the entire key.  We call this a "restart
16
// point".  The tail end of the block stores the offsets of all of the
17
// restart points, and can be used to do a binary search when looking
18
// for a particular key.  Values are stored as-is (without compression)
19
// immediately following the corresponding key.
20
//
21
// An entry for a particular key-value pair has the form:
22
//     shared_bytes: varint32
23
//     unshared_bytes: varint32
24
//     value_length: varint32 (NOTE1)
25
//     key_delta: char[unshared_bytes]
26
//     value: char[value_length]
27
// shared_bytes == 0 (explicitly stored) for restart points.
28
//
29
// The trailer of the block has the form:
30
//     restarts: uint32[num_restarts]
31
//     num_restarts: uint32
32
// restarts[i] contains the offset within the block of the ith restart point.
33
//
34
// NOTE1: omitted for format_version >= 4 index blocks, because the value is
35
// composed of one (shared_bytes > 0) or two (shared_bytes == 0) varints, whose
36
// length is self-describing.
37
38
#include "table/block_based/block_builder.h"
39
40
#include <algorithm>
41
#include <cassert>
42
#include <cmath>
43
44
#include "db/dbformat.h"
45
#include "monitoring/statistics_impl.h"
46
#include "rocksdb/comparator.h"
47
#include "table/block_based/block_util.h"
48
#include "table/block_based/data_block_footer.h"
49
#include "util/coding.h"
50
51
namespace ROCKSDB_NAMESPACE {
52
53
namespace {
54
55
// Tracks whether restart-point keys are uniformly distributed using Welford's
56
// online algorithm to incrementally compute the coefficient of variation (CV)
57
// of gaps between consecutive restart keys.
58
class UniformDataTracker {
59
 public:
60
0
  void AddKey(uint64_t key_value) {
61
0
    if (num_keys_ > 0) {
62
0
      double gap = static_cast<double>(key_value - prev_key_value_);
63
0
      size_t gap_count = num_keys_;
64
0
      double delta = gap - mean_;
65
0
      mean_ += delta / static_cast<double>(gap_count);
66
0
      double delta2 = gap - mean_;
67
0
      m2_ += delta * delta2;
68
0
    }
69
0
    prev_key_value_ = key_value;
70
0
    num_keys_++;
71
0
  }
72
73
  // Returns the coefficient of variation (CV) of the key gaps, or -1.0 if
74
  // there are not enough data points to compute it.
75
0
  double GetCV() const {
76
0
    size_t gap_count = num_keys_ > 0 ? num_keys_ - 1 : 0;
77
0
    if (gap_count < 2 || mean_ <= 0) {
78
0
      return -1.0;
79
0
    }
80
0
    return std::sqrt(m2_ / static_cast<double>(gap_count)) / mean_;
81
0
  }
82
83
 private:
84
  uint64_t prev_key_value_ = 0;
85
  size_t num_keys_ = 0;
86
  double mean_ = 0;
87
  double m2_ = 0;
88
};
89
90
}  // namespace
91
92
BlockBuilder::BlockBuilder(
93
    int block_restart_interval, bool use_delta_encoding,
94
    bool use_value_delta_encoding,
95
    BlockBasedTableOptions::DataBlockIndexType index_type,
96
    double data_block_hash_table_util_ratio, size_t ts_sz,
97
    bool persist_user_defined_timestamps, bool is_user_key,
98
    bool use_separated_kv_storage, Statistics* statistics,
99
    double uniform_cv_threshold)
100
109k
    : block_restart_interval_(block_restart_interval),
101
109k
      use_delta_encoding_(use_delta_encoding),
102
109k
      use_value_delta_encoding_(use_value_delta_encoding),
103
109k
      strip_ts_sz_(persist_user_defined_timestamps ? 0 : ts_sz),
104
109k
      is_user_key_(is_user_key),
105
109k
      restarts_(1, 0),  // First restart point is at offset 0
106
109k
      counter_(0),
107
109k
      finished_(false),
108
109k
      is_uniform_(false),
109
109k
      uniform_cv_threshold_(uniform_cv_threshold),
110
109k
      statistics_(statistics),
111
109k
      use_separated_kv_storage_(use_separated_kv_storage) {
112
109k
  switch (index_type) {
113
109k
    case BlockBasedTableOptions::kDataBlockBinarySearch:
114
109k
      break;
115
0
    case BlockBasedTableOptions::kDataBlockBinaryAndHash:
116
0
      data_block_hash_index_builder_.Initialize(
117
0
          data_block_hash_table_util_ratio);
118
0
      break;
119
0
    default:
120
0
      assert(0);
121
109k
  }
122
109k
  assert(block_restart_interval_ >= 1);
123
109k
  estimate_ = sizeof(uint32_t) + sizeof(uint32_t) +
124
109k
              (use_separated_kv_storage_ ? sizeof(uint32_t) : 0);
125
109k
}
126
127
20.1k
void BlockBuilder::Reset() {
128
20.1k
  buffer_.clear();
129
20.1k
  restarts_.resize(1);  // First restart point is at offset 0
130
20.1k
  assert(restarts_[0] == 0);
131
20.1k
  estimate_ = sizeof(uint32_t) + sizeof(uint32_t) +
132
20.1k
              (use_separated_kv_storage_ ? sizeof(uint32_t) : 0);
133
20.1k
  counter_ = 0;
134
20.1k
  finished_ = false;
135
20.1k
  is_uniform_ = false;
136
20.1k
  last_key_.clear();
137
20.1k
  if (data_block_hash_index_builder_.Valid()) {
138
0
    data_block_hash_index_builder_.Reset();
139
0
  }
140
20.1k
  values_buffer_.clear();
141
142
#ifndef NDEBUG
143
  add_with_last_key_called_ = false;
144
#endif
145
20.1k
}
146
147
0
void BlockBuilder::SwapAndReset(std::string& buffer) {
148
0
  std::swap(buffer_, buffer);
149
0
  Reset();
150
0
}
151
152
size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key,
153
18.8k
                                         const Slice& value) const {
154
18.8k
  size_t estimate = CurrentSizeEstimate();
155
  // Note: this is an imprecise estimate as it accounts for the whole key size
156
  // instead of non-shared key size.
157
18.8k
  estimate += key.size();
158
18.8k
  if (strip_ts_sz_ > 0) {
159
0
    estimate -= strip_ts_sz_;
160
0
  }
161
  // In value delta encoding we estimate the value delta size as half the full
162
  // value size since only the size field of block handle is encoded.
163
18.8k
  estimate +=
164
18.8k
      !use_value_delta_encoding_ || (counter_ >= block_restart_interval_)
165
18.8k
          ? value.size()
166
18.8k
          : value.size() / 2;
167
168
18.8k
  if (counter_ >= block_restart_interval_) {
169
335
    estimate += sizeof(uint32_t);  // a new restart entry.
170
335
  }
171
172
  // For separated KV storage, value_offset varint is written at restart points
173
18.8k
  if (use_separated_kv_storage_ &&
174
0
      (counter_ == 0 || counter_ >= block_restart_interval_)) {
175
0
    estimate += VarintLength(values_buffer_.size());
176
0
  }
177
178
18.8k
  estimate += sizeof(int32_t);  // varint for shared prefix length.
179
  // Note: this is an imprecise estimate as we will have to encoded size, one
180
  // for shared key and one for non-shared key.
181
18.8k
  estimate += VarintLength(key.size());  // varint for key length.
182
18.8k
  if (!use_value_delta_encoding_ || (counter_ >= block_restart_interval_)) {
183
18.8k
    estimate += VarintLength(value.size());  // varint for value length.
184
18.8k
  }
185
186
18.8k
  return estimate;
187
18.8k
}
188
189
76.0k
Slice BlockBuilder::Finish() {
190
76.0k
  is_uniform_ = ScanForUniformity();
191
192
  // Append restart array
193
76.0k
  size_t values_buffer_offset = buffer_.size();
194
195
76.0k
  if (use_separated_kv_storage_) {
196
0
    buffer_.append(values_buffer_);
197
0
  }
198
199
244k
  for (size_t i = 0; i < restarts_.size(); i++) {
200
168k
    PutFixed32(&buffer_, restarts_[i]);
201
168k
  }
202
203
76.0k
  DataBlockFooter footer;
204
76.0k
  footer.num_restarts = static_cast<uint32_t>(restarts_.size());
205
76.0k
  footer.index_type = BlockBasedTableOptions::kDataBlockBinarySearch;
206
76.0k
  footer.is_uniform = is_uniform_;
207
76.0k
  if (data_block_hash_index_builder_.Valid() &&
208
0
      CurrentSizeEstimate() <= kMaxBlockSizeSupportedByHashIndex) {
209
0
    data_block_hash_index_builder_.Finish(buffer_);
210
0
    footer.index_type = BlockBasedTableOptions::kDataBlockBinaryAndHash;
211
0
  }
212
213
76.0k
  if (use_separated_kv_storage_) {
214
0
    footer.separated_kv = true;
215
0
    footer.values_section_offset = static_cast<uint32_t>(values_buffer_offset);
216
0
  }
217
76.0k
  footer.EncodeTo(&buffer_);
218
76.0k
  finished_ = true;
219
76.0k
  return Slice(buffer_);
220
76.0k
}
221
222
void BlockBuilder::Add(const Slice& key, const Slice& value,
223
                       const Slice* const delta_value,
224
881k
                       bool skip_delta_encoding) {
225
  // Ensure no unsafe mixing of Add and AddWithLastKey
226
881k
  assert(!add_with_last_key_called_);
227
228
881k
  AddWithLastKeyImpl(key, value, last_key_, delta_value, skip_delta_encoding,
229
881k
                     buffer_.size());
230
881k
  if (use_delta_encoding_) {
231
    // Update state
232
    // We used to just copy the changed data, but it appears to be
233
    // faster to just copy the whole thing.
234
881k
    last_key_.assign(key.data(), key.size());
235
881k
  }
236
881k
}
237
238
void BlockBuilder::AddWithLastKey(const Slice& key, const Slice& value,
239
                                  const Slice& last_key_param,
240
                                  const Slice* const delta_value,
241
38.8k
                                  bool skip_delta_encoding) {
242
  // Ensure no unsafe mixing of Add and AddWithLastKey
243
38.8k
  assert(last_key_.empty());
244
#ifndef NDEBUG
245
  add_with_last_key_called_ = false;
246
#endif
247
248
  // Here we make sure to use an empty `last_key` on first call after creation
249
  // or Reset. This is more convenient for the caller and we can be more
250
  // clever inside BlockBuilder. On this hot code path, we want to avoid
251
  // conditional jumps like `buffer_.empty() ? ... : ...` so we can use a
252
  // fast arithmetic operation instead, with an assertion to be sure our logic
253
  // is sound.
254
38.8k
  size_t buffer_size = buffer_.size();
255
38.8k
  size_t last_key_size = last_key_param.size();
256
38.8k
  assert(buffer_size == 0 || buffer_size >= last_key_size - strip_ts_sz_);
257
258
38.8k
  Slice last_key(last_key_param.data(), last_key_size * (buffer_size > 0));
259
260
38.8k
  AddWithLastKeyImpl(key, value, last_key, delta_value, skip_delta_encoding,
261
38.8k
                     buffer_size);
262
38.8k
}
263
264
inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key,
265
                                             const Slice& value,
266
                                             const Slice& last_key,
267
                                             const Slice* const delta_value,
268
                                             bool skip_delta_encoding,
269
920k
                                             size_t buffer_size) {
270
920k
  assert(!finished_);
271
920k
  assert(counter_ <= block_restart_interval_);
272
920k
  std::string key_buf;
273
920k
  std::string last_key_buf;
274
920k
  const Slice key_to_persist = MaybeStripTimestampFromKey(&key_buf, key);
275
  // For delta key encoding, the first key in each restart interval doesn't have
276
  // a last key to share bytes with.
277
920k
  const Slice last_key_persisted =
278
920k
      last_key.size() == 0
279
920k
          ? last_key
280
920k
          : MaybeStripTimestampFromKey(&last_key_buf, last_key);
281
920k
  size_t shared = 0;  // number of bytes shared with prev key
282
920k
  if (counter_ >= block_restart_interval_) {
283
    // Restart compression
284
95.7k
    restarts_.push_back(static_cast<uint32_t>(buffer_size));
285
95.7k
    estimate_ += sizeof(uint32_t);
286
95.7k
    counter_ = 0;
287
824k
  } else if (use_delta_encoding_ && !skip_delta_encoding) {
288
    // See how much sharing to do with previous string
289
824k
    shared = key_to_persist.difference_offset(last_key_persisted);
290
824k
  }
291
292
920k
  const size_t non_shared = key_to_persist.size() - shared;
293
920k
  const size_t previous_value_offset = values_buffer_.size();
294
920k
  if (use_value_delta_encoding_) {
295
40.2k
    if (use_separated_kv_storage_ && counter_ == 0) {
296
      // Add "<shared><non_shared><value_offset>" to buffer_
297
0
      PutVarint32(&buffer_, static_cast<uint32_t>(shared),
298
0
                  static_cast<uint32_t>(non_shared),
299
0
                  static_cast<uint32_t>(values_buffer_.size()));
300
40.2k
    } else {
301
      // Add "<shared><non_shared>" to buffer_
302
40.2k
      PutVarint32(&buffer_, static_cast<uint32_t>(shared),
303
40.2k
                  static_cast<uint32_t>(non_shared));
304
40.2k
    }
305
880k
  } else {
306
880k
    if (use_separated_kv_storage_ && counter_ == 0) {
307
      // Add "<shared><non_shared><value_size><value_offset>" to buffer_
308
0
      PutVarint32(&buffer_, static_cast<uint32_t>(shared),
309
0
                  static_cast<uint32_t>(non_shared),
310
0
                  static_cast<uint32_t>(value.size()),
311
0
                  static_cast<uint32_t>(values_buffer_.size()));
312
880k
    } else {
313
      // Add "<shared><non_shared><value_size>" to buffer_
314
880k
      PutVarint32(&buffer_, static_cast<uint32_t>(shared),
315
880k
                  static_cast<uint32_t>(non_shared),
316
880k
                  static_cast<uint32_t>(value.size()));
317
880k
    }
318
880k
  }
319
320
  // Add string delta to buffer_
321
920k
  buffer_.append(key_to_persist.data() + shared, non_shared);
322
323
920k
  auto& values_buffer = use_separated_kv_storage_ ? values_buffer_ : buffer_;
324
  // Use value delta encoding only when the key has shared bytes. This would
325
  // simplify the decoding, where it can figure which decoding to use simply by
326
  // looking at the shared bytes size.
327
920k
  if (shared != 0 && use_value_delta_encoding_) {
328
0
    assert(delta_value != nullptr);
329
0
    values_buffer.append(delta_value->data(), delta_value->size());
330
920k
  } else {
331
920k
    values_buffer.append(value.data(), value.size());
332
920k
  }
333
334
  // TODO(yuzhangyu): make user defined timestamp work with block hash index.
335
920k
  if (data_block_hash_index_builder_.Valid()) {
336
    // Only data blocks should be using `kDataBlockBinaryAndHash` index type.
337
    // And data blocks should always be built with internal keys instead of
338
    // user keys.
339
0
    assert(!is_user_key_);
340
0
    data_block_hash_index_builder_.Add(ExtractUserKey(key),
341
0
                                       restarts_.size() - 1);
342
0
  }
343
344
920k
  counter_++;
345
920k
  estimate_ += buffer_.size() - buffer_size + values_buffer_.size() -
346
920k
               previous_value_offset;
347
920k
}
348
349
const Slice BlockBuilder::MaybeStripTimestampFromKey(std::string* key_buf,
350
1.74M
                                                     const Slice& key) {
351
1.74M
  Slice stripped_key = key;
352
1.74M
  if (strip_ts_sz_ > 0) {
353
0
    if (is_user_key_) {
354
0
      stripped_key.remove_suffix(strip_ts_sz_);
355
0
    } else {
356
0
      StripTimestampFromInternalKey(key_buf, key, strip_ts_sz_);
357
0
      stripped_key = *key_buf;
358
0
    }
359
0
  }
360
1.74M
  return stripped_key;
361
1.74M
}
362
363
0
Slice BlockBuilder::GetRestartKey(uint32_t index, const char* limit) const {
364
0
  assert(index < restarts_.size());
365
0
  const char* p = buffer_.data() + restarts_[index];
366
0
  uint32_t shared;
367
0
  uint32_t non_shared;
368
  // When separated KV storage is enabled, restart point entries include an
369
  // extra value_offset varint that must be consumed to find the key delta.
370
0
  uint32_t value_offset;
371
0
  uint32_t* value_offset_ptr =
372
0
      use_separated_kv_storage_ ? &value_offset : nullptr;
373
0
  if (use_value_delta_encoding_) {
374
0
    p = DecodeKeyV4()(p, limit, &shared, &non_shared, value_offset_ptr);
375
0
  } else {
376
0
    p = DecodeKey()(p, limit, &shared, &non_shared, value_offset_ptr);
377
0
  }
378
0
  assert(p != nullptr);
379
0
  assert(shared == 0);
380
0
  (void)shared;
381
0
  return Slice(p, non_shared);
382
0
}
383
384
76.0k
bool BlockBuilder::ScanForUniformity() const {
385
76.0k
  if (uniform_cv_threshold_ < 0 || restarts_.size() < 3) {
386
76.0k
    return false;
387
76.0k
  }
388
389
0
  const char* limit = buffer_.data() + buffer_.size();
390
391
0
  Slice first_key = GetRestartKey(0, limit);
392
0
  Slice last_key =
393
0
      GetRestartKey(static_cast<uint32_t>(restarts_.size() - 1), limit);
394
395
  // Keys must be long enough for ReadBe64FromKey which strips internal bytes
396
0
  if (!is_user_key_ && (first_key.size() < kNumInternalBytes ||
397
0
                        last_key.size() < kNumInternalBytes)) {
398
0
    return false;
399
0
  }
400
401
0
  size_t prefix_len = first_key.difference_offset(last_key);
402
403
0
  UniformDataTracker tracker;
404
0
  for (size_t i = 0; i < restarts_.size(); i++) {
405
0
    Slice key = GetRestartKey(static_cast<uint32_t>(i), limit);
406
0
    if (!is_user_key_ && key.size() < kNumInternalBytes) {
407
0
      return false;
408
0
    }
409
0
    tracker.AddKey(ReadBe64FromKey(key, is_user_key_, prefix_len));
410
0
  }
411
412
0
  double cv = tracker.GetCV();
413
0
  if (statistics_ != nullptr && cv >= 0) {
414
0
    RecordInHistogram(statistics_, BLOCK_KEY_DISTRIBUTION_CV,
415
0
                      static_cast<uint64_t>(cv * 10000));
416
0
  }
417
418
0
  return cv >= 0 && cv < uniform_cv_threshold_;
419
0
}
420
421
}  // namespace ROCKSDB_NAMESPACE