Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "io"
11 : "sync"
12 : "unsafe"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
18 : "golang.org/x/exp/rand"
19 : )
20 :
21 : // Value blocks are supported in TableFormatPebblev3.
22 : //
23 : // 1. Motivation and overview
24 : //
25 : // Value blocks are a mechanism designed for sstables storing MVCC data, where
26 : // there can be many versions of a key that need to be kept, but only the
27 : // latest value is typically read (see the documentation for Comparer.Split
28 : // regarding MVCC keys). The goal is faster reads. Unlike Pebble versions,
29 : // which can be eagerly thrown away (except when there are snapshots), MVCC
30 : // versions are long-lived (e.g. default CockroachDB garbage collection
31 : // threshold for older versions is 24 hours) and can significantly slow down
32 : // reads. We have seen CockroachDB production workloads with very slow reads
33 : // due to:
34 : // - 100s of versions for each key in a table.
35 : //
36 : // - Tables with mostly MVCC garbage consisting of 2 versions per key -- a
37 : // real key-value pair, followed by a key-value pair whose value (usually
38 : // with zero byte length) indicates it is an MVCC tombstone.
39 : //
40 : // The value blocks mechanism attempts to improve read throughput in these
41 : // cases when the key size is smaller than the value sizes of older versions.
42 : // This is done by moving the value of an older version to a value block in a
43 : // different part of the sstable. This improves spatial locality of the data
44 : // being read by the workload, which increases caching effectiveness.
45 : //
46 : // Additionally, even when the key size is not smaller than the value of older
47 : // versions (e.g. secondary indexes in CockroachDB), TableFormatPebblev3
48 : // stores the result of key comparisons done at write time inside the sstable,
49 : // which makes stepping from one key prefix to the next prefix (i.e., skipping
50 : // over older versions of a MVCC key) more efficient by avoiding key
51 : // comparisons and key decoding. See the results in
52 : // https://github.com/cockroachdb/pebble/pull/2149 and more details in the
53 : // comment inside BenchmarkIteratorScanNextPrefix. These improvements are also
54 : // visible in end-to-end CockroachDB tests, as outlined in
55 : // https://github.com/cockroachdb/cockroach/pull/96652.
56 : //
57 : // In TableFormatPebblev3, each SET has a one byte value prefix that tells us
58 : // whether the value is in-place or in a value block. This 1 byte prefix
59 : // encodes additional information:
60 : //
61 : // - ShortAttribute: This is an attribute of the value. Currently, CockroachDB
62 : // uses it to represent whether the value is a tombstone or not. This avoids
63 : // the need to fetch a value from the value block if the caller only wants
64 : // to figure out whether it is an MVCC tombstone. The length of the value is
65 : // another attribute that the caller can be interested in, and it is also
66 : // accessible without reading the value in the value block (see the value
67 : // handle in the details section).
68 : //
69 : // - SET-same-prefix: this enables the aforementioned optimization when
70 : // stepping from one key prefix to the next key prefix.
71 : //
72 : // We further optimize this iteration over prefixes by using the restart
73 : // points in a block to encode whether the SET at a restart point has the same
74 : // prefix since the last restart point. This allows us to skip over restart
75 : // points within the same block. See the comment in blockWriter, and how both
76 : // SET-same-prefix and the restart point information is used in
77 : // blockIter.nextPrefixV3.
78 : //
79 : // This flexibility of values that are in-place or in value blocks requires
80 : // flexibility in the iterator interface. The InternalIterator interface
81 : // returns a LazyValue instead of a byte slice. Additionally, pebble.Iterator
82 : // allows the caller to ask for a LazyValue. See lazy_value.go for details,
83 : // including the memory lifetime management.
84 : //
85 : // For historical discussions about this feature, see the issue
86 : // https://github.com/cockroachdb/pebble/issues/1170 and the prototype in
87 : // https://github.com/cockroachdb/pebble/pull/1443.
88 : //
89 : // The code in this file mainly covers value block and related encodings. We
90 : // discuss these in the next section.
91 : //
92 : // 2. Details
93 : //
94 : // Note that the notion of the latest value is local to the sstable. It is
95 : // possible that that latest value has been deleted by a sstable in a higher
96 : // level, and what is the latest value from the perspective of the whole LSM
97 : // is an older MVCC version. This only affects performance and not
98 : // correctness. This local knowledge is also why we continue to store these
99 : // older versions in the same sstable -- we need to be able to conveniently
100 : // read them. The code in this file is agnostic to the policy regarding what
101 : // should be stored in value blocks -- it allows even the latest MVCC version
102 : // to be stored in a value block. The policy decision in made in the
103 : // sstable.Writer. See Writer.makeAddPointDecisionV3.
104 : //
105 : // Data blocks contain two kinds of SET keys: those with in-place values and
106 : // those with a value handle. To distinguish these two cases we use a single
107 : // byte prefix (valuePrefix). This single byte prefix is split into multiple
108 : // parts, where nb represents information that is encoded in n bits.
109 : //
110 : // +---------------+--------------------+-----------+--------------------+
111 : // | value-kind 2b | SET-same-prefix 1b | unused 2b | short-attribute 3b |
112 : // +---------------+--------------------+-----------+--------------------+
113 : //
114 : // The 2 bit value-kind specifies whether this is an in-place value or a value
115 : // handle pointing to a value block. We use 2 bits here for future
116 : // representation of values that are in separate files. The 1 bit
117 : // SET-same-prefix is true if this key is a SET and is immediately preceded by
118 : // a SET that shares the same prefix. The 3 bit short-attribute is described
119 : // in base.ShortAttribute -- it stores user-defined attributes about the
120 : // value. It is unused for in-place values.
121 : //
122 : // Value Handle and Value Blocks:
123 : // valueHandles refer to values in value blocks. Value blocks are simpler than
124 : // normal data blocks (that contain key-value pairs, and allow for binary
125 : // search), which makes them cheap for value retrieval purposes. A valueHandle
126 : // is a tuple (valueLen, blockNum, offsetInBlock), where blockNum is the 0
127 : // indexed value block number and offsetInBlock is the byte offset in that
128 : // block containing the value. The valueHandle.valueLen is included since
129 : // there are multiple use cases in CockroachDB that need the value length but
130 : // not the value, for which we can avoid reading the value in the value block
131 : // (see
132 : // https://github.com/cockroachdb/pebble/issues/1170#issuecomment-958203245).
133 : //
134 : // A value block has a checksum like other blocks, and is optionally
135 : // compressed. An uncompressed value block is a sequence of values with no
136 : // separator or length (we rely on the valueHandle to demarcate). The
137 : // valueHandle.offsetInBlock points to the value, of length
138 : // valueHandle.valueLen. While writing a sstable, all the (possibly
139 : // compressed) value blocks need to be held in-memory until they can be
140 : // written. Value blocks are placed after the "meta rangedel" and "meta range
141 : // key" blocks since value blocks are considered less likely to be read.
142 : //
143 : // Meta Value Index Block:
144 : // Since the (key, valueHandle) pair are written before there is any knowledge
145 : // of the byte offset of the value block in the file, or its compressed
146 : // length, we need another lookup to map the valueHandle.blockNum to the
147 : // information needed to read it from the file. This information is provided
148 : // by the "value index block". The "value index block" is referred to by the
149 : // metaindex block. The design intentionally avoids making the "value index
150 : // block" a general purpose key-value block, since each caller wants to lookup
151 : // the information for a particular blockNum (there is no need for SeekGE
152 : // etc.). Instead, this index block stores a sequence of (blockNum,
153 : // blockOffset, blockLength) tuples, where the blockNums are consecutive
154 : // integers, and the tuples are encoded with a fixed width encoding. This
155 : // allows a reader to find the tuple for block K by looking at the offset
156 : // K*fixed-width. The fixed width for each field is decided by looking at the
157 : // maximum value of each of these fields. As a concrete example of a large
158 : // sstable with many value blocks, we constructed a 100MB sstable with many
159 : // versions and had 2475 value blocks (~32KB each). This sstable had this
160 : // tuple encoded using 2+4+2=8 bytes, which means the uncompressed value index
161 : // block was 2475*8=~19KB, which is modest. Therefore, we don't support more
162 : // than one value index block. Consider the example of 2 byte blockNum, 4 byte
163 : // blockOffset and 2 byte blockLen. The value index block will look like:
164 : //
165 : // +---------------+------------------+---------------+
166 : // | blockNum (2B) | blockOffset (4B) | blockLen (2B) |
167 : // +---------------+------------------+---------------+
168 : // | 0 | 7,123,456 | 30,000 |
169 : // +---------------+------------------+---------------+
170 : // | 1 | 7,153,456 | 20,000 |
171 : // +---------------+------------------+---------------+
172 : // | 2 | 7,173,456 | 25,567 |
173 : // +---------------+------------------+---------------+
174 : // | .... | ... | ... |
175 : //
176 : //
177 : // The metaindex block contains the valueBlocksIndexHandle which in addition
178 : // to the BlockHandle also specifies the widths of these tuple fields. In the
179 : // above example, the
180 : // valueBlockIndexHandle.{blockNumByteLength,blockOffsetByteLength,blockLengthByteLength}
181 : // will be (2,4,2).
182 :
183 : // valueHandle is stored with a key when the value is in a value block. This
184 : // handle is the pointer to that value.
185 : type valueHandle struct {
186 : valueLen uint32
187 : blockNum uint32
188 : offsetInBlock uint32
189 : }
190 :
191 : // valuePrefix is the single byte prefix for either the in-place value or the
192 : // encoded valueHandle. It encoded multiple kinds of information.
193 : type valuePrefix byte
194 :
195 : const (
196 : // 2 most-significant bits of valuePrefix encodes the value-kind.
197 : valueKindMask valuePrefix = '\xC0'
198 : valueKindIsValueHandle valuePrefix = '\x80'
199 : valueKindIsInPlaceValue valuePrefix = '\x00'
200 :
201 : // 1 bit indicates SET has same key prefix as immediately preceding key that
202 : // is also a SET. If the immediately preceding key in the same block is a
203 : // SET, AND this bit is 0, the prefix must have changed.
204 : //
205 : // Note that the current policy of only storing older MVCC versions in value
206 : // blocks means that valueKindIsValueHandle => SET has same prefix. But no
207 : // code should rely on this behavior. Also, SET has same prefix does *not*
208 : // imply valueKindIsValueHandle.
209 : setHasSameKeyPrefixMask valuePrefix = '\x20'
210 :
211 : // 3 least-significant bits for the user-defined base.ShortAttribute.
212 : // Undefined for valueKindIsInPlaceValue.
213 : userDefinedShortAttributeMask valuePrefix = '\x07'
214 : )
215 :
216 : // valueHandle fields are varint encoded, so maximum 5 bytes each, plus 1 byte
217 : // for the valuePrefix. This could alternatively be group varint encoded, but
218 : // experiments were inconclusive
219 : // (https://github.com/cockroachdb/pebble/pull/1443#issuecomment-1270298802).
220 : const valueHandleMaxLen = 5*3 + 1
221 :
222 : // Assert blockHandleLikelyMaxLen >= valueHandleMaxLen.
223 : const _ = uint(blockHandleLikelyMaxLen - valueHandleMaxLen)
224 :
225 1 : func encodeValueHandle(dst []byte, v valueHandle) int {
226 1 : n := 0
227 1 : n += binary.PutUvarint(dst[n:], uint64(v.valueLen))
228 1 : n += binary.PutUvarint(dst[n:], uint64(v.blockNum))
229 1 : n += binary.PutUvarint(dst[n:], uint64(v.offsetInBlock))
230 1 : return n
231 1 : }
232 :
233 1 : func makePrefixForValueHandle(setHasSameKeyPrefix bool, attribute base.ShortAttribute) valuePrefix {
234 1 : prefix := valueKindIsValueHandle | valuePrefix(attribute)
235 1 : if setHasSameKeyPrefix {
236 1 : prefix = prefix | setHasSameKeyPrefixMask
237 1 : }
238 1 : return prefix
239 : }
240 :
241 1 : func makePrefixForInPlaceValue(setHasSameKeyPrefix bool) valuePrefix {
242 1 : prefix := valueKindIsInPlaceValue
243 1 : if setHasSameKeyPrefix {
244 1 : prefix = prefix | setHasSameKeyPrefixMask
245 1 : }
246 1 : return prefix
247 : }
248 :
249 1 : func isValueHandle(b valuePrefix) bool {
250 1 : return b&valueKindMask == valueKindIsValueHandle
251 1 : }
252 :
253 : // REQUIRES: isValueHandle(b)
254 1 : func getShortAttribute(b valuePrefix) base.ShortAttribute {
255 1 : return base.ShortAttribute(b & userDefinedShortAttributeMask)
256 1 : }
257 :
258 1 : func setHasSamePrefix(b valuePrefix) bool {
259 1 : return b&setHasSameKeyPrefixMask == setHasSameKeyPrefixMask
260 1 : }
261 :
262 1 : func decodeLenFromValueHandle(src []byte) (uint32, []byte) {
263 1 : ptr := unsafe.Pointer(&src[0])
264 1 : var v uint32
265 1 : if a := *((*uint8)(ptr)); a < 128 {
266 1 : v = uint32(a)
267 1 : src = src[1:]
268 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
269 0 : v = uint32(b)<<7 | uint32(a)
270 0 : src = src[2:]
271 0 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
272 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
273 0 : src = src[3:]
274 0 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
275 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
276 0 : src = src[4:]
277 0 : } else {
278 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
279 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
280 0 : src = src[5:]
281 0 : }
282 1 : return v, src
283 : }
284 :
285 1 : func decodeRemainingValueHandle(src []byte) valueHandle {
286 1 : var vh valueHandle
287 1 : ptr := unsafe.Pointer(&src[0])
288 1 : // Manually inlined uvarint decoding. Saves ~25% in benchmarks. Unrolling
289 1 : // a loop for i:=0; i<2; i++, saves ~6%.
290 1 : var v uint32
291 1 : if a := *((*uint8)(ptr)); a < 128 {
292 1 : v = uint32(a)
293 1 : ptr = unsafe.Pointer(uintptr(ptr) + 1)
294 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
295 0 : v = uint32(b)<<7 | uint32(a)
296 0 : ptr = unsafe.Pointer(uintptr(ptr) + 2)
297 0 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
298 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
299 0 : ptr = unsafe.Pointer(uintptr(ptr) + 3)
300 0 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
301 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
302 0 : ptr = unsafe.Pointer(uintptr(ptr) + 4)
303 0 : } else {
304 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
305 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
306 0 : ptr = unsafe.Pointer(uintptr(ptr) + 5)
307 0 : }
308 1 : vh.blockNum = v
309 1 :
310 1 : if a := *((*uint8)(ptr)); a < 128 {
311 1 : v = uint32(a)
312 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
313 1 : v = uint32(b)<<7 | uint32(a)
314 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
315 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
316 0 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
317 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
318 0 : } else {
319 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
320 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
321 0 : }
322 1 : vh.offsetInBlock = v
323 1 :
324 1 : return vh
325 : }
326 :
327 0 : func decodeValueHandle(src []byte) valueHandle {
328 0 : valLen, src := decodeLenFromValueHandle(src)
329 0 : vh := decodeRemainingValueHandle(src)
330 0 : vh.valueLen = valLen
331 0 : return vh
332 0 : }
333 :
334 : // valueBlocksIndexHandle is placed in the metaindex if there are any value
335 : // blocks. If there are no value blocks, there is no value blocks index, and
336 : // no entry in the metaindex. Note that the lack of entry in the metaindex
337 : // should not be used to ascertain whether the values are prefixed, since the
338 : // former is an emergent property of the data that was written and not known
339 : // until all the key-value pairs in the sstable are written.
340 : type valueBlocksIndexHandle struct {
341 : h BlockHandle
342 : blockNumByteLength uint8
343 : blockOffsetByteLength uint8
344 : blockLengthByteLength uint8
345 : }
346 :
347 : const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3
348 :
349 : // Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen.
350 : const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen)
351 :
352 1 : func encodeValueBlocksIndexHandle(dst []byte, v valueBlocksIndexHandle) int {
353 1 : n := encodeBlockHandle(dst, v.h)
354 1 : dst[n] = v.blockNumByteLength
355 1 : n++
356 1 : dst[n] = v.blockOffsetByteLength
357 1 : n++
358 1 : dst[n] = v.blockLengthByteLength
359 1 : n++
360 1 : return n
361 1 : }
362 :
363 1 : func decodeValueBlocksIndexHandle(src []byte) (valueBlocksIndexHandle, int, error) {
364 1 : var vbih valueBlocksIndexHandle
365 1 : var n int
366 1 : vbih.h, n = decodeBlockHandle(src)
367 1 : if n <= 0 {
368 0 : return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
369 0 : }
370 1 : if len(src) != n+3 {
371 0 : return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
372 0 : }
373 1 : vbih.blockNumByteLength = src[n]
374 1 : vbih.blockOffsetByteLength = src[n+1]
375 1 : vbih.blockLengthByteLength = src[n+2]
376 1 : return vbih, n + 3, nil
377 : }
378 :
379 : type valueBlocksAndIndexStats struct {
380 : numValueBlocks uint64
381 : numValuesInValueBlocks uint64
382 : // Includes both value blocks and value index block.
383 : valueBlocksAndIndexSize uint64
384 : }
385 :
386 : // valueBlockWriter writes a sequence of value blocks, and the value blocks
387 : // index, for a sstable.
388 : type valueBlockWriter struct {
389 : // The configured uncompressed block size and size threshold
390 : blockSize, blockSizeThreshold int
391 : // Configured compression.
392 : compression Compression
393 : // checksummer with configured checksum type.
394 : checksummer checksummer
395 : // Block finished callback.
396 : blockFinishedFunc func(compressedSize int)
397 :
398 : // buf is the current block being written to (uncompressed).
399 : buf *blockBuffer
400 : // compressedBuf is used for compressing the block.
401 : compressedBuf *blockBuffer
402 : // Sequence of blocks that are finished.
403 : blocks []blockAndHandle
404 : // Cumulative value block bytes written so far.
405 : totalBlockBytes uint64
406 : numValues uint64
407 : }
408 :
409 : type blockAndHandle struct {
410 : block *blockBuffer
411 : handle BlockHandle
412 : compressed bool
413 : }
414 :
415 : type blockBuffer struct {
416 : b []byte
417 : }
418 :
419 : // Pool of block buffers that should be roughly the blockSize.
420 : var uncompressedValueBlockBufPool = sync.Pool{
421 1 : New: func() interface{} {
422 1 : return &blockBuffer{}
423 1 : },
424 : }
425 :
426 : // Pool of block buffers for compressed value blocks. These may widely vary in
427 : // size based on compression ratios.
428 : var compressedValueBlockBufPool = sync.Pool{
429 1 : New: func() interface{} {
430 1 : return &blockBuffer{}
431 1 : },
432 : }
433 :
434 1 : func releaseToValueBlockBufPool(pool *sync.Pool, b *blockBuffer) {
435 1 : // Don't pool buffers larger than 128KB, in case we had some rare large
436 1 : // values.
437 1 : if len(b.b) > 128*1024 {
438 0 : return
439 0 : }
440 1 : if invariants.Enabled {
441 1 : // Set the bytes to a random value. Cap the number of bytes being
442 1 : // randomized to prevent test timeouts.
443 1 : length := cap(b.b)
444 1 : if length > 1000 {
445 1 : length = 1000
446 1 : }
447 1 : b.b = b.b[:length:length]
448 1 : rand.Read(b.b)
449 : }
450 1 : pool.Put(b)
451 : }
452 :
453 : var valueBlockWriterPool = sync.Pool{
454 1 : New: func() interface{} {
455 1 : return &valueBlockWriter{}
456 1 : },
457 : }
458 :
459 : func newValueBlockWriter(
460 : blockSize int,
461 : blockSizeThreshold int,
462 : compression Compression,
463 : checksumType ChecksumType,
464 : // compressedSize should exclude the block trailer.
465 : blockFinishedFunc func(compressedSize int),
466 1 : ) *valueBlockWriter {
467 1 : w := valueBlockWriterPool.Get().(*valueBlockWriter)
468 1 : *w = valueBlockWriter{
469 1 : blockSize: blockSize,
470 1 : blockSizeThreshold: blockSizeThreshold,
471 1 : compression: compression,
472 1 : checksummer: checksummer{
473 1 : checksumType: checksumType,
474 1 : },
475 1 : blockFinishedFunc: blockFinishedFunc,
476 1 : buf: uncompressedValueBlockBufPool.Get().(*blockBuffer),
477 1 : compressedBuf: compressedValueBlockBufPool.Get().(*blockBuffer),
478 1 : blocks: w.blocks[:0],
479 1 : }
480 1 : w.buf.b = w.buf.b[:0]
481 1 : w.compressedBuf.b = w.compressedBuf.b[:0]
482 1 : return w
483 1 : }
484 :
485 1 : func releaseValueBlockWriter(w *valueBlockWriter) {
486 1 : for i := range w.blocks {
487 1 : if w.blocks[i].compressed {
488 1 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.blocks[i].block)
489 1 : } else {
490 1 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.blocks[i].block)
491 1 : }
492 1 : w.blocks[i].block = nil
493 : }
494 1 : if w.buf != nil {
495 1 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.buf)
496 1 : }
497 1 : if w.compressedBuf != nil {
498 1 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.compressedBuf)
499 1 : }
500 1 : *w = valueBlockWriter{
501 1 : blocks: w.blocks[:0],
502 1 : }
503 1 : valueBlockWriterPool.Put(w)
504 : }
505 :
506 1 : func (w *valueBlockWriter) addValue(v []byte) (valueHandle, error) {
507 1 : if invariants.Enabled && len(v) == 0 {
508 0 : return valueHandle{}, errors.Errorf("cannot write empty value to value block")
509 0 : }
510 1 : w.numValues++
511 1 : blockLen := len(w.buf.b)
512 1 : valueLen := len(v)
513 1 : if blockLen >= w.blockSize ||
514 1 : (blockLen > w.blockSizeThreshold && blockLen+valueLen > w.blockSize) {
515 1 : // Block is not currently empty and adding this value will become too big,
516 1 : // so finish this block.
517 1 : w.compressAndFlush()
518 1 : blockLen = len(w.buf.b)
519 1 : if invariants.Enabled && blockLen != 0 {
520 0 : panic("blockLen of new block should be 0")
521 : }
522 : }
523 1 : vh := valueHandle{
524 1 : valueLen: uint32(valueLen),
525 1 : blockNum: uint32(len(w.blocks)),
526 1 : offsetInBlock: uint32(blockLen),
527 1 : }
528 1 : blockLen = int(vh.offsetInBlock + vh.valueLen)
529 1 : if cap(w.buf.b) < blockLen {
530 1 : size := 2 * cap(w.buf.b)
531 1 : if size < 1024 {
532 1 : size = 1024
533 1 : }
534 1 : for size < blockLen {
535 0 : size *= 2
536 0 : }
537 1 : buf := make([]byte, blockLen, size)
538 1 : _ = copy(buf, w.buf.b)
539 1 : w.buf.b = buf
540 1 : } else {
541 1 : w.buf.b = w.buf.b[:blockLen]
542 1 : }
543 1 : buf := w.buf.b[vh.offsetInBlock:]
544 1 : n := copy(buf, v)
545 1 : if n != len(buf) {
546 0 : panic("incorrect length computation")
547 : }
548 1 : return vh, nil
549 : }
550 :
551 1 : func (w *valueBlockWriter) compressAndFlush() {
552 1 : // Compress the buffer, discarding the result if the improvement isn't at
553 1 : // least 12.5%.
554 1 : blockType := noCompressionBlockType
555 1 : b := w.buf
556 1 : if w.compression != NoCompression {
557 1 : blockType, w.compressedBuf.b =
558 1 : compressBlock(w.compression, w.buf.b, w.compressedBuf.b[:cap(w.compressedBuf.b)])
559 1 : if len(w.compressedBuf.b) < len(w.buf.b)-len(w.buf.b)/8 {
560 1 : b = w.compressedBuf
561 1 : } else {
562 1 : blockType = noCompressionBlockType
563 1 : }
564 : }
565 1 : n := len(b.b)
566 1 : if n+blockTrailerLen > cap(b.b) {
567 1 : block := make([]byte, n+blockTrailerLen)
568 1 : copy(block, b.b)
569 1 : b.b = block
570 1 : } else {
571 1 : b.b = b.b[:n+blockTrailerLen]
572 1 : }
573 1 : b.b[n] = byte(blockType)
574 1 : w.computeChecksum(b.b)
575 1 : bh := BlockHandle{Offset: w.totalBlockBytes, Length: uint64(n)}
576 1 : w.totalBlockBytes += uint64(len(b.b))
577 1 : // blockFinishedFunc length excludes the block trailer.
578 1 : w.blockFinishedFunc(n)
579 1 : compressed := blockType != noCompressionBlockType
580 1 : w.blocks = append(w.blocks, blockAndHandle{
581 1 : block: b,
582 1 : handle: bh,
583 1 : compressed: compressed,
584 1 : })
585 1 : // Handed off a buffer to w.blocks, so need get a new one.
586 1 : if compressed {
587 1 : w.compressedBuf = compressedValueBlockBufPool.Get().(*blockBuffer)
588 1 : } else {
589 1 : w.buf = uncompressedValueBlockBufPool.Get().(*blockBuffer)
590 1 : }
591 1 : w.buf.b = w.buf.b[:0]
592 : }
593 :
594 1 : func (w *valueBlockWriter) computeChecksum(block []byte) {
595 1 : n := len(block) - blockTrailerLen
596 1 : checksum := w.checksummer.checksum(block[:n], block[n:n+1])
597 1 : binary.LittleEndian.PutUint32(block[n+1:], checksum)
598 1 : }
599 :
600 : func (w *valueBlockWriter) finish(
601 : writer io.Writer, fileOffset uint64,
602 1 : ) (valueBlocksIndexHandle, valueBlocksAndIndexStats, error) {
603 1 : if len(w.buf.b) > 0 {
604 1 : w.compressAndFlush()
605 1 : }
606 1 : n := len(w.blocks)
607 1 : if n == 0 {
608 1 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, nil
609 1 : }
610 1 : largestOffset := uint64(0)
611 1 : largestLength := uint64(0)
612 1 : for i := range w.blocks {
613 1 : _, err := writer.Write(w.blocks[i].block.b)
614 1 : if err != nil {
615 0 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
616 0 : }
617 1 : w.blocks[i].handle.Offset += fileOffset
618 1 : largestOffset = w.blocks[i].handle.Offset
619 1 : if largestLength < w.blocks[i].handle.Length {
620 1 : largestLength = w.blocks[i].handle.Length
621 1 : }
622 : }
623 1 : vbihOffset := fileOffset + w.totalBlockBytes
624 1 :
625 1 : vbih := valueBlocksIndexHandle{
626 1 : h: BlockHandle{
627 1 : Offset: vbihOffset,
628 1 : },
629 1 : blockNumByteLength: uint8(lenLittleEndian(uint64(n - 1))),
630 1 : blockOffsetByteLength: uint8(lenLittleEndian(largestOffset)),
631 1 : blockLengthByteLength: uint8(lenLittleEndian(largestLength)),
632 1 : }
633 1 : var err error
634 1 : if vbih, err = w.writeValueBlocksIndex(writer, vbih); err != nil {
635 0 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
636 0 : }
637 1 : stats := valueBlocksAndIndexStats{
638 1 : numValueBlocks: uint64(n),
639 1 : numValuesInValueBlocks: w.numValues,
640 1 : valueBlocksAndIndexSize: w.totalBlockBytes + vbih.h.Length + blockTrailerLen,
641 1 : }
642 1 : return vbih, stats, err
643 : }
644 :
645 : func (w *valueBlockWriter) writeValueBlocksIndex(
646 : writer io.Writer, h valueBlocksIndexHandle,
647 1 : ) (valueBlocksIndexHandle, error) {
648 1 : blockLen :=
649 1 : int(h.blockNumByteLength+h.blockOffsetByteLength+h.blockLengthByteLength) * len(w.blocks)
650 1 : h.h.Length = uint64(blockLen)
651 1 : blockLen += blockTrailerLen
652 1 : var buf []byte
653 1 : if cap(w.buf.b) < blockLen {
654 1 : buf = make([]byte, blockLen)
655 1 : w.buf.b = buf
656 1 : } else {
657 1 : buf = w.buf.b[:blockLen]
658 1 : }
659 1 : b := buf
660 1 : for i := range w.blocks {
661 1 : littleEndianPut(uint64(i), b, int(h.blockNumByteLength))
662 1 : b = b[int(h.blockNumByteLength):]
663 1 : littleEndianPut(w.blocks[i].handle.Offset, b, int(h.blockOffsetByteLength))
664 1 : b = b[int(h.blockOffsetByteLength):]
665 1 : littleEndianPut(w.blocks[i].handle.Length, b, int(h.blockLengthByteLength))
666 1 : b = b[int(h.blockLengthByteLength):]
667 1 : }
668 1 : if len(b) != blockTrailerLen {
669 0 : panic("incorrect length calculation")
670 : }
671 1 : b[0] = byte(noCompressionBlockType)
672 1 : w.computeChecksum(buf)
673 1 : if _, err := writer.Write(buf); err != nil {
674 0 : return valueBlocksIndexHandle{}, err
675 0 : }
676 1 : return h, nil
677 : }
678 :
679 : // littleEndianPut writes v to b using little endian encoding, under the
680 : // assumption that v can be represented using n bytes.
681 1 : func littleEndianPut(v uint64, b []byte, n int) {
682 1 : _ = b[n-1] // bounds check
683 1 : for i := 0; i < n; i++ {
684 1 : b[i] = byte(v)
685 1 : v = v >> 8
686 1 : }
687 : }
688 :
689 : // lenLittleEndian returns the minimum number of bytes needed to encode v
690 : // using little endian encoding.
691 1 : func lenLittleEndian(v uint64) int {
692 1 : n := 0
693 1 : for i := 0; i < 8; i++ {
694 1 : n++
695 1 : v = v >> 8
696 1 : if v == 0 {
697 1 : break
698 : }
699 : }
700 1 : return n
701 : }
702 :
703 1 : func littleEndianGet(b []byte, n int) uint64 {
704 1 : _ = b[n-1] // bounds check
705 1 : v := uint64(b[0])
706 1 : for i := 1; i < n; i++ {
707 1 : v |= uint64(b[i]) << (8 * i)
708 1 : }
709 1 : return v
710 : }
711 :
712 : // UserKeyPrefixBound represents a [Lower,Upper) bound of user key prefixes.
713 : // If both are nil, there is no bound specified. Else, Compare(Lower,Upper)
714 : // must be < 0.
715 : type UserKeyPrefixBound struct {
716 : // Lower is a lower bound user key prefix.
717 : Lower []byte
718 : // Upper is an upper bound user key prefix.
719 : Upper []byte
720 : }
721 :
722 : // IsEmpty returns true iff the bound is empty.
723 1 : func (ukb *UserKeyPrefixBound) IsEmpty() bool {
724 1 : return len(ukb.Lower) == 0 && len(ukb.Upper) == 0
725 1 : }
726 :
727 : type blockProviderWhenOpen interface {
728 : readBlockForVBR(
729 : h BlockHandle, stats *base.InternalIteratorStats,
730 : ) (bufferHandle, error)
731 : }
732 :
733 : type blockProviderWhenClosed struct {
734 : rp ReaderProvider
735 : r *Reader
736 : }
737 :
738 1 : func (bpwc *blockProviderWhenClosed) open() error {
739 1 : var err error
740 1 : bpwc.r, err = bpwc.rp.GetReader()
741 1 : return err
742 1 : }
743 :
744 1 : func (bpwc *blockProviderWhenClosed) close() {
745 1 : bpwc.rp.Close()
746 1 : bpwc.r = nil
747 1 : }
748 :
749 : func (bpwc blockProviderWhenClosed) readBlockForVBR(
750 : h BlockHandle, stats *base.InternalIteratorStats,
751 1 : ) (bufferHandle, error) {
752 1 : // This is rare, since most block reads happen when the corresponding
753 1 : // sstable iterator is open. So we are willing to sacrifice a proper context
754 1 : // for tracing.
755 1 : //
756 1 : // TODO(sumeer): consider fixing this. See
757 1 : // https://github.com/cockroachdb/pebble/pull/3065#issue-1991175365 for an
758 1 : // alternative.
759 1 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.ValueBlock)
760 1 : // TODO(jackson,sumeer): Consider whether to use a buffer pool in this case.
761 1 : // The bpwc is not allowed to outlive the iterator tree, so it cannot
762 1 : // outlive the buffer pool.
763 1 : return bpwc.r.readBlock(
764 1 : ctx, h, nil, nil, stats, nil /* iterStats */, nil /* buffer pool */)
765 1 : }
766 :
767 : // ReaderProvider supports the implementation of blockProviderWhenClosed.
768 : // GetReader and Close can be called multiple times in pairs.
769 : type ReaderProvider interface {
770 : GetReader() (r *Reader, err error)
771 : Close()
772 : }
773 :
774 : // TrivialReaderProvider implements ReaderProvider for a Reader that will
775 : // outlive the top-level iterator in the iterator tree.
776 : type TrivialReaderProvider struct {
777 : *Reader
778 : }
779 :
780 : var _ ReaderProvider = TrivialReaderProvider{}
781 :
782 : // GetReader implements ReaderProvider.
783 0 : func (trp TrivialReaderProvider) GetReader() (*Reader, error) {
784 0 : return trp.Reader, nil
785 0 : }
786 :
787 : // Close implements ReaderProvider.
788 0 : func (trp TrivialReaderProvider) Close() {}
789 :
790 : // valueBlockReader is used to retrieve values in value
791 : // blocks. It is used when the sstable was written with
792 : // Properties.ValueBlocksAreEnabled.
793 : type valueBlockReader struct {
794 : bpOpen blockProviderWhenOpen
795 : rp ReaderProvider
796 : vbih valueBlocksIndexHandle
797 : stats *base.InternalIteratorStats
798 :
799 : // The value blocks index is lazily retrieved the first time the reader
800 : // needs to read a value that resides in a value block.
801 : vbiBlock []byte
802 : vbiCache bufferHandle
803 : // When sequentially iterating through all key-value pairs, the cost of
804 : // repeatedly getting a block that is already in the cache and releasing the
805 : // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the
806 : // last value block it retrieved, in case there is locality of access, and
807 : // this value block can be used for the next value retrieval.
808 : valueBlockNum uint32
809 : valueBlock []byte
810 : valueBlockPtr unsafe.Pointer
811 : valueCache bufferHandle
812 : lazyFetcher base.LazyFetcher
813 : closed bool
814 : bufToMangle []byte
815 : }
816 :
817 1 : func (r *valueBlockReader) getLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue {
818 1 : fetcher := &r.lazyFetcher
819 1 : valLen, h := decodeLenFromValueHandle(handle[1:])
820 1 : *fetcher = base.LazyFetcher{
821 1 : Fetcher: r,
822 1 : Attribute: base.AttributeAndLen{
823 1 : ValueLen: int32(valLen),
824 1 : ShortAttribute: getShortAttribute(valuePrefix(handle[0])),
825 1 : },
826 1 : }
827 1 : if r.stats != nil {
828 1 : r.stats.SeparatedPointValue.Count++
829 1 : r.stats.SeparatedPointValue.ValueBytes += uint64(valLen)
830 1 : }
831 1 : return base.LazyValue{
832 1 : ValueOrHandle: h,
833 1 : Fetcher: fetcher,
834 1 : }
835 : }
836 :
837 1 : func (r *valueBlockReader) close() {
838 1 : r.bpOpen = nil
839 1 : r.vbiBlock = nil
840 1 : r.vbiCache.Release()
841 1 : // Set the handle to empty since Release does not nil the Handle.value. If
842 1 : // we were to reopen this valueBlockReader and retrieve the same
843 1 : // Handle.value from the cache, we don't want to accidentally unref it when
844 1 : // attempting to unref the old handle.
845 1 : r.vbiCache = bufferHandle{}
846 1 : r.valueBlock = nil
847 1 : r.valueBlockPtr = nil
848 1 : r.valueCache.Release()
849 1 : // See comment above.
850 1 : r.valueCache = bufferHandle{}
851 1 : r.closed = true
852 1 : // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be
853 1 : // implemented.
854 1 : }
855 :
856 : // Fetch implements base.ValueFetcher.
857 : func (r *valueBlockReader) Fetch(
858 : handle []byte, valLen int32, buf []byte,
859 1 : ) (val []byte, callerOwned bool, err error) {
860 1 : if !r.closed {
861 1 : val, err := r.getValueInternal(handle, valLen)
862 1 : if invariants.Enabled {
863 1 : val = r.doValueMangling(val)
864 1 : }
865 1 : return val, false, err
866 : }
867 :
868 1 : bp := blockProviderWhenClosed{rp: r.rp}
869 1 : err = bp.open()
870 1 : if err != nil {
871 0 : return nil, false, err
872 0 : }
873 1 : defer bp.close()
874 1 : defer r.close()
875 1 : r.bpOpen = bp
876 1 : var v []byte
877 1 : v, err = r.getValueInternal(handle, valLen)
878 1 : if err != nil {
879 0 : return nil, false, err
880 0 : }
881 1 : buf = append(buf[:0], v...)
882 1 : return buf, true, nil
883 : }
884 :
885 : // doValueMangling attempts to uncover violations of the contract listed in
886 : // the declaration comment of LazyValue. It is expensive, hence only called
887 : // when invariants.Enabled.
888 1 : func (r *valueBlockReader) doValueMangling(v []byte) []byte {
889 1 : // Randomly set the bytes in the previous retrieved value to 0, since
890 1 : // property P1 only requires the valueBlockReader to maintain the memory of
891 1 : // one fetched value.
892 1 : if rand.Intn(2) == 0 {
893 1 : clear(r.bufToMangle)
894 1 : }
895 : // Store the current value in a new buffer for future mangling.
896 1 : r.bufToMangle = append([]byte(nil), v...)
897 1 : return r.bufToMangle
898 : }
899 :
900 1 : func (r *valueBlockReader) getValueInternal(handle []byte, valLen int32) (val []byte, err error) {
901 1 : vh := decodeRemainingValueHandle(handle)
902 1 : vh.valueLen = uint32(valLen)
903 1 : if r.vbiBlock == nil {
904 1 : ch, err := r.bpOpen.readBlockForVBR(r.vbih.h, r.stats)
905 1 : if err != nil {
906 0 : return nil, err
907 0 : }
908 1 : r.vbiCache = ch
909 1 : r.vbiBlock = ch.Get()
910 : }
911 1 : if r.valueBlock == nil || r.valueBlockNum != vh.blockNum {
912 1 : vbh, err := r.getBlockHandle(vh.blockNum)
913 1 : if err != nil {
914 0 : return nil, err
915 0 : }
916 1 : vbCacheHandle, err := r.bpOpen.readBlockForVBR(vbh, r.stats)
917 1 : if err != nil {
918 0 : return nil, err
919 0 : }
920 1 : r.valueBlockNum = vh.blockNum
921 1 : r.valueCache.Release()
922 1 : r.valueCache = vbCacheHandle
923 1 : r.valueBlock = vbCacheHandle.Get()
924 1 : r.valueBlockPtr = unsafe.Pointer(&r.valueBlock[0])
925 : }
926 1 : if r.stats != nil {
927 1 : r.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen)
928 1 : }
929 1 : return r.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil
930 : }
931 :
932 1 : func (r *valueBlockReader) getBlockHandle(blockNum uint32) (BlockHandle, error) {
933 1 : indexEntryLen :=
934 1 : int(r.vbih.blockNumByteLength + r.vbih.blockOffsetByteLength + r.vbih.blockLengthByteLength)
935 1 : offsetInIndex := indexEntryLen * int(blockNum)
936 1 : if len(r.vbiBlock) < offsetInIndex+indexEntryLen {
937 0 : return BlockHandle{}, base.AssertionFailedf(
938 0 : "index entry out of bounds: offset %d length %d block length %d",
939 0 : offsetInIndex, indexEntryLen, len(r.vbiBlock))
940 0 : }
941 1 : b := r.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen]
942 1 : n := int(r.vbih.blockNumByteLength)
943 1 : bn := littleEndianGet(b, n)
944 1 : if uint32(bn) != blockNum {
945 0 : return BlockHandle{},
946 0 : errors.Errorf("expected block num %d but found %d", blockNum, bn)
947 0 : }
948 1 : b = b[n:]
949 1 : n = int(r.vbih.blockOffsetByteLength)
950 1 : blockOffset := littleEndianGet(b, n)
951 1 : b = b[n:]
952 1 : n = int(r.vbih.blockLengthByteLength)
953 1 : blockLen := littleEndianGet(b, n)
954 1 : return BlockHandle{Offset: blockOffset, Length: blockLen}, nil
955 : }
|