/src/rocksdb/table/block_based/block_builder.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | // |
10 | | // BlockBuilder generates blocks where keys are prefix-compressed: |
11 | | // |
12 | | // When we store a key, we drop the prefix shared with the previous |
13 | | // string. This helps reduce the space requirement significantly. |
14 | | // Furthermore, once every K keys, we do not apply the prefix |
15 | | // compression and store the entire key. We call this a "restart |
16 | | // point". The tail end of the block stores the offsets of all of the |
17 | | // restart points, and can be used to do a binary search when looking |
18 | | // for a particular key. Values are stored as-is (without compression) |
19 | | // immediately following the corresponding key. |
20 | | // |
21 | | // An entry for a particular key-value pair has the form: |
22 | | // shared_bytes: varint32 |
23 | | // unshared_bytes: varint32 |
24 | | // value_length: varint32 (NOTE1) |
25 | | // key_delta: char[unshared_bytes] |
26 | | // value: char[value_length] |
27 | | // shared_bytes == 0 (explicitly stored) for restart points. |
28 | | // |
29 | | // The trailer of the block has the form: |
30 | | // restarts: uint32[num_restarts] |
31 | | // num_restarts: uint32 |
32 | | // restarts[i] contains the offset within the block of the ith restart point. |
33 | | // |
34 | | // NOTE1: omitted for format_version >= 4 index blocks, because the value is |
35 | | // composed of one (shared_bytes > 0) or two (shared_bytes == 0) varints, whose |
36 | | // length is self-describing. |
37 | | |
38 | | #include "table/block_based/block_builder.h" |
39 | | |
40 | | #include <algorithm> |
41 | | #include <cassert> |
42 | | #include <cmath> |
43 | | |
44 | | #include "db/dbformat.h" |
45 | | #include "monitoring/statistics_impl.h" |
46 | | #include "rocksdb/comparator.h" |
47 | | #include "table/block_based/block_util.h" |
48 | | #include "table/block_based/data_block_footer.h" |
49 | | #include "util/coding.h" |
50 | | |
51 | | namespace ROCKSDB_NAMESPACE { |
52 | | |
53 | | namespace { |
54 | | |
55 | | // Tracks whether restart-point keys are uniformly distributed using Welford's |
56 | | // online algorithm to incrementally compute the coefficient of variation (CV) |
57 | | // of gaps between consecutive restart keys. |
58 | | class UniformDataTracker { |
59 | | public: |
60 | 0 | void AddKey(uint64_t key_value) { |
61 | 0 | if (num_keys_ > 0) { |
62 | 0 | double gap = static_cast<double>(key_value - prev_key_value_); |
63 | 0 | size_t gap_count = num_keys_; |
64 | 0 | double delta = gap - mean_; |
65 | 0 | mean_ += delta / static_cast<double>(gap_count); |
66 | 0 | double delta2 = gap - mean_; |
67 | 0 | m2_ += delta * delta2; |
68 | 0 | } |
69 | 0 | prev_key_value_ = key_value; |
70 | 0 | num_keys_++; |
71 | 0 | } |
72 | | |
73 | | // Returns the coefficient of variation (CV) of the key gaps, or -1.0 if |
74 | | // there are not enough data points to compute it. |
75 | 0 | double GetCV() const { |
76 | 0 | size_t gap_count = num_keys_ > 0 ? num_keys_ - 1 : 0; |
77 | 0 | if (gap_count < 2 || mean_ <= 0) { |
78 | 0 | return -1.0; |
79 | 0 | } |
80 | 0 | return std::sqrt(m2_ / static_cast<double>(gap_count)) / mean_; |
81 | 0 | } |
82 | | |
83 | | private: |
84 | | uint64_t prev_key_value_ = 0; |
85 | | size_t num_keys_ = 0; |
86 | | double mean_ = 0; |
87 | | double m2_ = 0; |
88 | | }; |
89 | | |
90 | | } // namespace |
91 | | |
92 | | BlockBuilder::BlockBuilder( |
93 | | int block_restart_interval, bool use_delta_encoding, |
94 | | bool use_value_delta_encoding, |
95 | | BlockBasedTableOptions::DataBlockIndexType index_type, |
96 | | double data_block_hash_table_util_ratio, size_t ts_sz, |
97 | | bool persist_user_defined_timestamps, bool is_user_key, |
98 | | bool use_separated_kv_storage, Statistics* statistics, |
99 | | double uniform_cv_threshold) |
100 | 109k | : block_restart_interval_(block_restart_interval), |
101 | 109k | use_delta_encoding_(use_delta_encoding), |
102 | 109k | use_value_delta_encoding_(use_value_delta_encoding), |
103 | 109k | strip_ts_sz_(persist_user_defined_timestamps ? 0 : ts_sz), |
104 | 109k | is_user_key_(is_user_key), |
105 | 109k | restarts_(1, 0), // First restart point is at offset 0 |
106 | 109k | counter_(0), |
107 | 109k | finished_(false), |
108 | 109k | is_uniform_(false), |
109 | 109k | uniform_cv_threshold_(uniform_cv_threshold), |
110 | 109k | statistics_(statistics), |
111 | 109k | use_separated_kv_storage_(use_separated_kv_storage) { |
112 | 109k | switch (index_type) { |
113 | 109k | case BlockBasedTableOptions::kDataBlockBinarySearch: |
114 | 109k | break; |
115 | 0 | case BlockBasedTableOptions::kDataBlockBinaryAndHash: |
116 | 0 | data_block_hash_index_builder_.Initialize( |
117 | 0 | data_block_hash_table_util_ratio); |
118 | 0 | break; |
119 | 0 | default: |
120 | 0 | assert(0); |
121 | 109k | } |
122 | 109k | assert(block_restart_interval_ >= 1); |
123 | 109k | estimate_ = sizeof(uint32_t) + sizeof(uint32_t) + |
124 | 109k | (use_separated_kv_storage_ ? sizeof(uint32_t) : 0); |
125 | 109k | } |
126 | | |
127 | 20.1k | void BlockBuilder::Reset() { |
128 | 20.1k | buffer_.clear(); |
129 | 20.1k | restarts_.resize(1); // First restart point is at offset 0 |
130 | 20.1k | assert(restarts_[0] == 0); |
131 | 20.1k | estimate_ = sizeof(uint32_t) + sizeof(uint32_t) + |
132 | 20.1k | (use_separated_kv_storage_ ? sizeof(uint32_t) : 0); |
133 | 20.1k | counter_ = 0; |
134 | 20.1k | finished_ = false; |
135 | 20.1k | is_uniform_ = false; |
136 | 20.1k | last_key_.clear(); |
137 | 20.1k | if (data_block_hash_index_builder_.Valid()) { |
138 | 0 | data_block_hash_index_builder_.Reset(); |
139 | 0 | } |
140 | 20.1k | values_buffer_.clear(); |
141 | | |
142 | | #ifndef NDEBUG |
143 | | add_with_last_key_called_ = false; |
144 | | #endif |
145 | 20.1k | } |
146 | | |
147 | 0 | void BlockBuilder::SwapAndReset(std::string& buffer) { |
148 | 0 | std::swap(buffer_, buffer); |
149 | 0 | Reset(); |
150 | 0 | } |
151 | | |
152 | | size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key, |
153 | 18.8k | const Slice& value) const { |
154 | 18.8k | size_t estimate = CurrentSizeEstimate(); |
155 | | // Note: this is an imprecise estimate as it accounts for the whole key size |
156 | | // instead of non-shared key size. |
157 | 18.8k | estimate += key.size(); |
158 | 18.8k | if (strip_ts_sz_ > 0) { |
159 | 0 | estimate -= strip_ts_sz_; |
160 | 0 | } |
161 | | // In value delta encoding we estimate the value delta size as half the full |
162 | | // value size since only the size field of block handle is encoded. |
163 | 18.8k | estimate += |
164 | 18.8k | !use_value_delta_encoding_ || (counter_ >= block_restart_interval_) |
165 | 18.8k | ? value.size() |
166 | 18.8k | : value.size() / 2; |
167 | | |
168 | 18.8k | if (counter_ >= block_restart_interval_) { |
169 | 335 | estimate += sizeof(uint32_t); // a new restart entry. |
170 | 335 | } |
171 | | |
172 | | // For separated KV storage, value_offset varint is written at restart points |
173 | 18.8k | if (use_separated_kv_storage_ && |
174 | 0 | (counter_ == 0 || counter_ >= block_restart_interval_)) { |
175 | 0 | estimate += VarintLength(values_buffer_.size()); |
176 | 0 | } |
177 | | |
178 | 18.8k | estimate += sizeof(int32_t); // varint for shared prefix length. |
179 | | // Note: this is an imprecise estimate as we will have to encoded size, one |
180 | | // for shared key and one for non-shared key. |
181 | 18.8k | estimate += VarintLength(key.size()); // varint for key length. |
182 | 18.8k | if (!use_value_delta_encoding_ || (counter_ >= block_restart_interval_)) { |
183 | 18.8k | estimate += VarintLength(value.size()); // varint for value length. |
184 | 18.8k | } |
185 | | |
186 | 18.8k | return estimate; |
187 | 18.8k | } |
188 | | |
189 | 76.0k | Slice BlockBuilder::Finish() { |
190 | 76.0k | is_uniform_ = ScanForUniformity(); |
191 | | |
192 | | // Append restart array |
193 | 76.0k | size_t values_buffer_offset = buffer_.size(); |
194 | | |
195 | 76.0k | if (use_separated_kv_storage_) { |
196 | 0 | buffer_.append(values_buffer_); |
197 | 0 | } |
198 | | |
199 | 244k | for (size_t i = 0; i < restarts_.size(); i++) { |
200 | 168k | PutFixed32(&buffer_, restarts_[i]); |
201 | 168k | } |
202 | | |
203 | 76.0k | DataBlockFooter footer; |
204 | 76.0k | footer.num_restarts = static_cast<uint32_t>(restarts_.size()); |
205 | 76.0k | footer.index_type = BlockBasedTableOptions::kDataBlockBinarySearch; |
206 | 76.0k | footer.is_uniform = is_uniform_; |
207 | 76.0k | if (data_block_hash_index_builder_.Valid() && |
208 | 0 | CurrentSizeEstimate() <= kMaxBlockSizeSupportedByHashIndex) { |
209 | 0 | data_block_hash_index_builder_.Finish(buffer_); |
210 | 0 | footer.index_type = BlockBasedTableOptions::kDataBlockBinaryAndHash; |
211 | 0 | } |
212 | | |
213 | 76.0k | if (use_separated_kv_storage_) { |
214 | 0 | footer.separated_kv = true; |
215 | 0 | footer.values_section_offset = static_cast<uint32_t>(values_buffer_offset); |
216 | 0 | } |
217 | 76.0k | footer.EncodeTo(&buffer_); |
218 | 76.0k | finished_ = true; |
219 | 76.0k | return Slice(buffer_); |
220 | 76.0k | } |
221 | | |
222 | | void BlockBuilder::Add(const Slice& key, const Slice& value, |
223 | | const Slice* const delta_value, |
224 | 881k | bool skip_delta_encoding) { |
225 | | // Ensure no unsafe mixing of Add and AddWithLastKey |
226 | 881k | assert(!add_with_last_key_called_); |
227 | | |
228 | 881k | AddWithLastKeyImpl(key, value, last_key_, delta_value, skip_delta_encoding, |
229 | 881k | buffer_.size()); |
230 | 881k | if (use_delta_encoding_) { |
231 | | // Update state |
232 | | // We used to just copy the changed data, but it appears to be |
233 | | // faster to just copy the whole thing. |
234 | 881k | last_key_.assign(key.data(), key.size()); |
235 | 881k | } |
236 | 881k | } |
237 | | |
238 | | void BlockBuilder::AddWithLastKey(const Slice& key, const Slice& value, |
239 | | const Slice& last_key_param, |
240 | | const Slice* const delta_value, |
241 | 38.8k | bool skip_delta_encoding) { |
242 | | // Ensure no unsafe mixing of Add and AddWithLastKey |
243 | 38.8k | assert(last_key_.empty()); |
244 | | #ifndef NDEBUG |
245 | | add_with_last_key_called_ = false; |
246 | | #endif |
247 | | |
248 | | // Here we make sure to use an empty `last_key` on first call after creation |
249 | | // or Reset. This is more convenient for the caller and we can be more |
250 | | // clever inside BlockBuilder. On this hot code path, we want to avoid |
251 | | // conditional jumps like `buffer_.empty() ? ... : ...` so we can use a |
252 | | // fast arithmetic operation instead, with an assertion to be sure our logic |
253 | | // is sound. |
254 | 38.8k | size_t buffer_size = buffer_.size(); |
255 | 38.8k | size_t last_key_size = last_key_param.size(); |
256 | 38.8k | assert(buffer_size == 0 || buffer_size >= last_key_size - strip_ts_sz_); |
257 | | |
258 | 38.8k | Slice last_key(last_key_param.data(), last_key_size * (buffer_size > 0)); |
259 | | |
260 | 38.8k | AddWithLastKeyImpl(key, value, last_key, delta_value, skip_delta_encoding, |
261 | 38.8k | buffer_size); |
262 | 38.8k | } |
263 | | |
264 | | inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key, |
265 | | const Slice& value, |
266 | | const Slice& last_key, |
267 | | const Slice* const delta_value, |
268 | | bool skip_delta_encoding, |
269 | 920k | size_t buffer_size) { |
270 | 920k | assert(!finished_); |
271 | 920k | assert(counter_ <= block_restart_interval_); |
272 | 920k | std::string key_buf; |
273 | 920k | std::string last_key_buf; |
274 | 920k | const Slice key_to_persist = MaybeStripTimestampFromKey(&key_buf, key); |
275 | | // For delta key encoding, the first key in each restart interval doesn't have |
276 | | // a last key to share bytes with. |
277 | 920k | const Slice last_key_persisted = |
278 | 920k | last_key.size() == 0 |
279 | 920k | ? last_key |
280 | 920k | : MaybeStripTimestampFromKey(&last_key_buf, last_key); |
281 | 920k | size_t shared = 0; // number of bytes shared with prev key |
282 | 920k | if (counter_ >= block_restart_interval_) { |
283 | | // Restart compression |
284 | 95.7k | restarts_.push_back(static_cast<uint32_t>(buffer_size)); |
285 | 95.7k | estimate_ += sizeof(uint32_t); |
286 | 95.7k | counter_ = 0; |
287 | 824k | } else if (use_delta_encoding_ && !skip_delta_encoding) { |
288 | | // See how much sharing to do with previous string |
289 | 824k | shared = key_to_persist.difference_offset(last_key_persisted); |
290 | 824k | } |
291 | | |
292 | 920k | const size_t non_shared = key_to_persist.size() - shared; |
293 | 920k | const size_t previous_value_offset = values_buffer_.size(); |
294 | 920k | if (use_value_delta_encoding_) { |
295 | 40.2k | if (use_separated_kv_storage_ && counter_ == 0) { |
296 | | // Add "<shared><non_shared><value_offset>" to buffer_ |
297 | 0 | PutVarint32(&buffer_, static_cast<uint32_t>(shared), |
298 | 0 | static_cast<uint32_t>(non_shared), |
299 | 0 | static_cast<uint32_t>(values_buffer_.size())); |
300 | 40.2k | } else { |
301 | | // Add "<shared><non_shared>" to buffer_ |
302 | 40.2k | PutVarint32(&buffer_, static_cast<uint32_t>(shared), |
303 | 40.2k | static_cast<uint32_t>(non_shared)); |
304 | 40.2k | } |
305 | 880k | } else { |
306 | 880k | if (use_separated_kv_storage_ && counter_ == 0) { |
307 | | // Add "<shared><non_shared><value_size><value_offset>" to buffer_ |
308 | 0 | PutVarint32(&buffer_, static_cast<uint32_t>(shared), |
309 | 0 | static_cast<uint32_t>(non_shared), |
310 | 0 | static_cast<uint32_t>(value.size()), |
311 | 0 | static_cast<uint32_t>(values_buffer_.size())); |
312 | 880k | } else { |
313 | | // Add "<shared><non_shared><value_size>" to buffer_ |
314 | 880k | PutVarint32(&buffer_, static_cast<uint32_t>(shared), |
315 | 880k | static_cast<uint32_t>(non_shared), |
316 | 880k | static_cast<uint32_t>(value.size())); |
317 | 880k | } |
318 | 880k | } |
319 | | |
320 | | // Add string delta to buffer_ |
321 | 920k | buffer_.append(key_to_persist.data() + shared, non_shared); |
322 | | |
323 | 920k | auto& values_buffer = use_separated_kv_storage_ ? values_buffer_ : buffer_; |
324 | | // Use value delta encoding only when the key has shared bytes. This would |
325 | | // simplify the decoding, where it can figure which decoding to use simply by |
326 | | // looking at the shared bytes size. |
327 | 920k | if (shared != 0 && use_value_delta_encoding_) { |
328 | 0 | assert(delta_value != nullptr); |
329 | 0 | values_buffer.append(delta_value->data(), delta_value->size()); |
330 | 920k | } else { |
331 | 920k | values_buffer.append(value.data(), value.size()); |
332 | 920k | } |
333 | | |
334 | | // TODO(yuzhangyu): make user defined timestamp work with block hash index. |
335 | 920k | if (data_block_hash_index_builder_.Valid()) { |
336 | | // Only data blocks should be using `kDataBlockBinaryAndHash` index type. |
337 | | // And data blocks should always be built with internal keys instead of |
338 | | // user keys. |
339 | 0 | assert(!is_user_key_); |
340 | 0 | data_block_hash_index_builder_.Add(ExtractUserKey(key), |
341 | 0 | restarts_.size() - 1); |
342 | 0 | } |
343 | | |
344 | 920k | counter_++; |
345 | 920k | estimate_ += buffer_.size() - buffer_size + values_buffer_.size() - |
346 | 920k | previous_value_offset; |
347 | 920k | } |
348 | | |
349 | | const Slice BlockBuilder::MaybeStripTimestampFromKey(std::string* key_buf, |
350 | 1.74M | const Slice& key) { |
351 | 1.74M | Slice stripped_key = key; |
352 | 1.74M | if (strip_ts_sz_ > 0) { |
353 | 0 | if (is_user_key_) { |
354 | 0 | stripped_key.remove_suffix(strip_ts_sz_); |
355 | 0 | } else { |
356 | 0 | StripTimestampFromInternalKey(key_buf, key, strip_ts_sz_); |
357 | 0 | stripped_key = *key_buf; |
358 | 0 | } |
359 | 0 | } |
360 | 1.74M | return stripped_key; |
361 | 1.74M | } |
362 | | |
363 | 0 | Slice BlockBuilder::GetRestartKey(uint32_t index, const char* limit) const { |
364 | 0 | assert(index < restarts_.size()); |
365 | 0 | const char* p = buffer_.data() + restarts_[index]; |
366 | 0 | uint32_t shared; |
367 | 0 | uint32_t non_shared; |
368 | | // When separated KV storage is enabled, restart point entries include an |
369 | | // extra value_offset varint that must be consumed to find the key delta. |
370 | 0 | uint32_t value_offset; |
371 | 0 | uint32_t* value_offset_ptr = |
372 | 0 | use_separated_kv_storage_ ? &value_offset : nullptr; |
373 | 0 | if (use_value_delta_encoding_) { |
374 | 0 | p = DecodeKeyV4()(p, limit, &shared, &non_shared, value_offset_ptr); |
375 | 0 | } else { |
376 | 0 | p = DecodeKey()(p, limit, &shared, &non_shared, value_offset_ptr); |
377 | 0 | } |
378 | 0 | assert(p != nullptr); |
379 | 0 | assert(shared == 0); |
380 | 0 | (void)shared; |
381 | 0 | return Slice(p, non_shared); |
382 | 0 | } |
383 | | |
384 | 76.0k | bool BlockBuilder::ScanForUniformity() const { |
385 | 76.0k | if (uniform_cv_threshold_ < 0 || restarts_.size() < 3) { |
386 | 76.0k | return false; |
387 | 76.0k | } |
388 | | |
389 | 0 | const char* limit = buffer_.data() + buffer_.size(); |
390 | |
|
391 | 0 | Slice first_key = GetRestartKey(0, limit); |
392 | 0 | Slice last_key = |
393 | 0 | GetRestartKey(static_cast<uint32_t>(restarts_.size() - 1), limit); |
394 | | |
395 | | // Keys must be long enough for ReadBe64FromKey which strips internal bytes |
396 | 0 | if (!is_user_key_ && (first_key.size() < kNumInternalBytes || |
397 | 0 | last_key.size() < kNumInternalBytes)) { |
398 | 0 | return false; |
399 | 0 | } |
400 | | |
401 | 0 | size_t prefix_len = first_key.difference_offset(last_key); |
402 | |
|
403 | 0 | UniformDataTracker tracker; |
404 | 0 | for (size_t i = 0; i < restarts_.size(); i++) { |
405 | 0 | Slice key = GetRestartKey(static_cast<uint32_t>(i), limit); |
406 | 0 | if (!is_user_key_ && key.size() < kNumInternalBytes) { |
407 | 0 | return false; |
408 | 0 | } |
409 | 0 | tracker.AddKey(ReadBe64FromKey(key, is_user_key_, prefix_len)); |
410 | 0 | } |
411 | | |
412 | 0 | double cv = tracker.GetCV(); |
413 | 0 | if (statistics_ != nullptr && cv >= 0) { |
414 | 0 | RecordInHistogram(statistics_, BLOCK_KEY_DISTRIBUTION_CV, |
415 | 0 | static_cast<uint64_t>(cv * 10000)); |
416 | 0 | } |
417 | |
|
418 | 0 | return cv >= 0 && cv < uniform_cv_threshold_; |
419 | 0 | } |
420 | | |
421 | | } // namespace ROCKSDB_NAMESPACE |