Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "sync"
11 : "unsafe"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
17 : "github.com/cockroachdb/pebble/sstable/block"
18 : "golang.org/x/exp/rand"
19 : )
20 :
21 : // Value blocks are supported in TableFormatPebblev3.
22 : //
23 : // 1. Motivation and overview
24 : //
25 : // Value blocks are a mechanism designed for sstables storing MVCC data, where
26 : // there can be many versions of a key that need to be kept, but only the
27 : // latest value is typically read (see the documentation for Comparer.Split
28 : // regarding MVCC keys). The goal is faster reads. Unlike Pebble versions,
29 : // which can be eagerly thrown away (except when there are snapshots), MVCC
30 : // versions are long-lived (e.g. default CockroachDB garbage collection
31 : // threshold for older versions is 24 hours) and can significantly slow down
32 : // reads. We have seen CockroachDB production workloads with very slow reads
33 : // due to:
34 : // - 100s of versions for each key in a table.
35 : //
36 : // - Tables with mostly MVCC garbage consisting of 2 versions per key -- a
37 : // real key-value pair, followed by a key-value pair whose value (usually
38 : // with zero byte length) indicates it is an MVCC tombstone.
39 : //
40 : // The value blocks mechanism attempts to improve read throughput in these
41 : // cases when the key size is smaller than the value sizes of older versions.
42 : // This is done by moving the value of an older version to a value block in a
43 : // different part of the sstable. This improves spatial locality of the data
44 : // being read by the workload, which increases caching effectiveness.
45 : //
46 : // Additionally, even when the key size is not smaller than the value of older
47 : // versions (e.g. secondary indexes in CockroachDB), TableFormatPebblev3
48 : // stores the result of key comparisons done at write time inside the sstable,
49 : // which makes stepping from one key prefix to the next prefix (i.e., skipping
50 : // over older versions of a MVCC key) more efficient by avoiding key
51 : // comparisons and key decoding. See the results in
52 : // https://github.com/cockroachdb/pebble/pull/2149 and more details in the
53 : // comment inside BenchmarkIteratorScanNextPrefix. These improvements are also
54 : // visible in end-to-end CockroachDB tests, as outlined in
55 : // https://github.com/cockroachdb/cockroach/pull/96652.
56 : //
57 : // In TableFormatPebblev3, each SET has a one byte value prefix that tells us
58 : // whether the value is in-place or in a value block. This 1 byte prefix
59 : // encodes additional information:
60 : //
61 : // - ShortAttribute: This is an attribute of the value. Currently, CockroachDB
62 : // uses it to represent whether the value is a tombstone or not. This avoids
63 : // the need to fetch a value from the value block if the caller only wants
64 : // to figure out whether it is an MVCC tombstone. The length of the value is
65 : // another attribute that the caller can be interested in, and it is also
66 : // accessible without reading the value in the value block (see the value
67 : // handle in the details section).
68 : //
69 : // - SET-same-prefix: this enables the aforementioned optimization when
70 : // stepping from one key prefix to the next key prefix.
71 : //
72 : // We further optimize this iteration over prefixes by using the restart
73 : // points in a block to encode whether the SET at a restart point has the same
74 : // prefix since the last restart point. This allows us to skip over restart
75 : // points within the same block. See the comment in blockWriter, and how both
76 : // SET-same-prefix and the restart point information is used in
77 : // blockIter.nextPrefixV3.
78 : //
79 : // This flexibility of values that are in-place or in value blocks requires
80 : // flexibility in the iterator interface. The InternalIterator interface
81 : // returns a LazyValue instead of a byte slice. Additionally, pebble.Iterator
82 : // allows the caller to ask for a LazyValue. See lazy_value.go for details,
83 : // including the memory lifetime management.
84 : //
85 : // For historical discussions about this feature, see the issue
86 : // https://github.com/cockroachdb/pebble/issues/1170 and the prototype in
87 : // https://github.com/cockroachdb/pebble/pull/1443.
88 : //
89 : // The code in this file mainly covers value block and related encodings. We
90 : // discuss these in the next section.
91 : //
92 : // 2. Details
93 : //
94 : // Note that the notion of the latest value is local to the sstable. It is
95 : // possible that that latest value has been deleted by a sstable in a higher
96 : // level, and what is the latest value from the perspective of the whole LSM
97 : // is an older MVCC version. This only affects performance and not
98 : // correctness. This local knowledge is also why we continue to store these
99 : // older versions in the same sstable -- we need to be able to conveniently
100 : // read them. The code in this file is agnostic to the policy regarding what
101 : // should be stored in value blocks -- it allows even the latest MVCC version
102 : // to be stored in a value block. The policy decision in made in the
103 : // sstable.Writer. See Writer.makeAddPointDecisionV3.
104 : //
105 : // Data blocks contain two kinds of SET keys: those with in-place values and
106 : // those with a value handle. To distinguish these two cases we use a single
107 : // byte prefix (ValuePrefix). This single byte prefix is split into multiple
108 : // parts, where nb represents information that is encoded in n bits.
109 : //
110 : // +---------------+--------------------+-----------+--------------------+
111 : // | value-kind 2b | SET-same-prefix 1b | unused 2b | short-attribute 3b |
112 : // +---------------+--------------------+-----------+--------------------+
113 : //
114 : // The 2 bit value-kind specifies whether this is an in-place value or a value
115 : // handle pointing to a value block. We use 2 bits here for future
116 : // representation of values that are in separate files. The 1 bit
117 : // SET-same-prefix is true if this key is a SET and is immediately preceded by
118 : // a SET that shares the same prefix. The 3 bit short-attribute is described
119 : // in base.ShortAttribute -- it stores user-defined attributes about the
120 : // value. It is unused for in-place values.
121 : //
122 : // Value Handle and Value Blocks:
123 : // valueHandles refer to values in value blocks. Value blocks are simpler than
124 : // normal data blocks (that contain key-value pairs, and allow for binary
125 : // search), which makes them cheap for value retrieval purposes. A valueHandle
126 : // is a tuple (valueLen, blockNum, offsetInBlock), where blockNum is the 0
127 : // indexed value block number and offsetInBlock is the byte offset in that
128 : // block containing the value. The valueHandle.valueLen is included since
129 : // there are multiple use cases in CockroachDB that need the value length but
130 : // not the value, for which we can avoid reading the value in the value block
131 : // (see
132 : // https://github.com/cockroachdb/pebble/issues/1170#issuecomment-958203245).
133 : //
134 : // A value block has a checksum like other blocks, and is optionally
135 : // compressed. An uncompressed value block is a sequence of values with no
136 : // separator or length (we rely on the valueHandle to demarcate). The
137 : // valueHandle.offsetInBlock points to the value, of length
138 : // valueHandle.valueLen. While writing a sstable, all the (possibly
139 : // compressed) value blocks need to be held in-memory until they can be
140 : // written. Value blocks are placed after the "meta rangedel" and "meta range
141 : // key" blocks since value blocks are considered less likely to be read.
142 : //
143 : // Meta Value Index Block:
144 : // Since the (key, valueHandle) pair are written before there is any knowledge
145 : // of the byte offset of the value block in the file, or its compressed
146 : // length, we need another lookup to map the valueHandle.blockNum to the
147 : // information needed to read it from the file. This information is provided
148 : // by the "value index block". The "value index block" is referred to by the
149 : // metaindex block. The design intentionally avoids making the "value index
150 : // block" a general purpose key-value block, since each caller wants to lookup
151 : // the information for a particular blockNum (there is no need for SeekGE
152 : // etc.). Instead, this index block stores a sequence of (blockNum,
153 : // blockOffset, blockLength) tuples, where the blockNums are consecutive
154 : // integers, and the tuples are encoded with a fixed width encoding. This
155 : // allows a reader to find the tuple for block K by looking at the offset
156 : // K*fixed-width. The fixed width for each field is decided by looking at the
157 : // maximum value of each of these fields. As a concrete example of a large
158 : // sstable with many value blocks, we constructed a 100MB sstable with many
159 : // versions and had 2475 value blocks (~32KB each). This sstable had this
160 : // tuple encoded using 2+4+2=8 bytes, which means the uncompressed value index
161 : // block was 2475*8=~19KB, which is modest. Therefore, we don't support more
162 : // than one value index block. Consider the example of 2 byte blockNum, 4 byte
163 : // blockOffset and 2 byte blockLen. The value index block will look like:
164 : //
165 : // +---------------+------------------+---------------+
166 : // | blockNum (2B) | blockOffset (4B) | blockLen (2B) |
167 : // +---------------+------------------+---------------+
168 : // | 0 | 7,123,456 | 30,000 |
169 : // +---------------+------------------+---------------+
170 : // | 1 | 7,153,456 | 20,000 |
171 : // +---------------+------------------+---------------+
172 : // | 2 | 7,173,456 | 25,567 |
173 : // +---------------+------------------+---------------+
174 : // | .... | ... | ... |
175 : //
176 : //
177 : // The metaindex block contains the valueBlocksIndexHandle which in addition
178 : // to the BlockHandle also specifies the widths of these tuple fields. In the
179 : // above example, the
180 : // valueBlockIndexHandle.{blockNumByteLength,blockOffsetByteLength,blockLengthByteLength}
181 : // will be (2,4,2).
182 :
183 : // valueHandle is stored with a key when the value is in a value block. This
184 : // handle is the pointer to that value.
185 : type valueHandle struct {
186 : valueLen uint32
187 : blockNum uint32
188 : offsetInBlock uint32
189 : }
190 :
191 : // valueHandle fields are varint encoded, so maximum 5 bytes each, plus 1 byte
192 : // for the valuePrefix. This could alternatively be group varint encoded, but
193 : // experiments were inconclusive
194 : // (https://github.com/cockroachdb/pebble/pull/1443#issuecomment-1270298802).
195 : const valueHandleMaxLen = 5*3 + 1
196 :
197 : // Assert blockHandleLikelyMaxLen >= valueHandleMaxLen.
198 : const _ = uint(blockHandleLikelyMaxLen - valueHandleMaxLen)
199 :
200 2 : func encodeValueHandle(dst []byte, v valueHandle) int {
201 2 : n := 0
202 2 : n += binary.PutUvarint(dst[n:], uint64(v.valueLen))
203 2 : n += binary.PutUvarint(dst[n:], uint64(v.blockNum))
204 2 : n += binary.PutUvarint(dst[n:], uint64(v.offsetInBlock))
205 2 : return n
206 2 : }
207 :
208 2 : func decodeLenFromValueHandle(src []byte) (uint32, []byte) {
209 2 : ptr := unsafe.Pointer(&src[0])
210 2 : var v uint32
211 2 : if a := *((*uint8)(ptr)); a < 128 {
212 2 : v = uint32(a)
213 2 : src = src[1:]
214 2 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
215 0 : v = uint32(b)<<7 | uint32(a)
216 0 : src = src[2:]
217 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
218 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
219 0 : src = src[3:]
220 1 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
221 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
222 0 : src = src[4:]
223 1 : } else {
224 1 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
225 1 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
226 1 : src = src[5:]
227 1 : }
228 2 : return v, src
229 : }
230 :
231 2 : func decodeRemainingValueHandle(src []byte) valueHandle {
232 2 : var vh valueHandle
233 2 : ptr := unsafe.Pointer(&src[0])
234 2 : // Manually inlined uvarint decoding. Saves ~25% in benchmarks. Unrolling
235 2 : // a loop for i:=0; i<2; i++, saves ~6%.
236 2 : var v uint32
237 2 : if a := *((*uint8)(ptr)); a < 128 {
238 2 : v = uint32(a)
239 2 : ptr = unsafe.Pointer(uintptr(ptr) + 1)
240 2 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
241 0 : v = uint32(b)<<7 | uint32(a)
242 0 : ptr = unsafe.Pointer(uintptr(ptr) + 2)
243 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
244 1 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
245 1 : ptr = unsafe.Pointer(uintptr(ptr) + 3)
246 1 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
247 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
248 0 : ptr = unsafe.Pointer(uintptr(ptr) + 4)
249 1 : } else {
250 1 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
251 1 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
252 1 : ptr = unsafe.Pointer(uintptr(ptr) + 5)
253 1 : }
254 2 : vh.blockNum = v
255 2 :
256 2 : if a := *((*uint8)(ptr)); a < 128 {
257 2 : v = uint32(a)
258 2 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
259 2 : v = uint32(b)<<7 | uint32(a)
260 2 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
261 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
262 1 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
263 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
264 1 : } else {
265 1 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
266 1 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
267 1 : }
268 2 : vh.offsetInBlock = v
269 2 :
270 2 : return vh
271 : }
272 :
273 1 : func decodeValueHandle(src []byte) valueHandle {
274 1 : valLen, src := decodeLenFromValueHandle(src)
275 1 : vh := decodeRemainingValueHandle(src)
276 1 : vh.valueLen = valLen
277 1 : return vh
278 1 : }
279 :
280 : // valueBlocksIndexHandle is placed in the metaindex if there are any value
281 : // blocks. If there are no value blocks, there is no value blocks index, and
282 : // no entry in the metaindex. Note that the lack of entry in the metaindex
283 : // should not be used to ascertain whether the values are prefixed, since the
284 : // former is an emergent property of the data that was written and not known
285 : // until all the key-value pairs in the sstable are written.
286 : type valueBlocksIndexHandle struct {
287 : h block.Handle
288 : blockNumByteLength uint8
289 : blockOffsetByteLength uint8
290 : blockLengthByteLength uint8
291 : }
292 :
293 : const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3
294 :
295 : // Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen.
296 : const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen)
297 :
298 2 : func encodeValueBlocksIndexHandle(dst []byte, v valueBlocksIndexHandle) int {
299 2 : n := encodeBlockHandle(dst, v.h)
300 2 : dst[n] = v.blockNumByteLength
301 2 : n++
302 2 : dst[n] = v.blockOffsetByteLength
303 2 : n++
304 2 : dst[n] = v.blockLengthByteLength
305 2 : n++
306 2 : return n
307 2 : }
308 :
309 2 : func decodeValueBlocksIndexHandle(src []byte) (valueBlocksIndexHandle, int, error) {
310 2 : var vbih valueBlocksIndexHandle
311 2 : var n int
312 2 : vbih.h, n = decodeBlockHandle(src)
313 2 : if n <= 0 {
314 0 : return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
315 0 : }
316 2 : if len(src) != n+3 {
317 0 : return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
318 0 : }
319 2 : vbih.blockNumByteLength = src[n]
320 2 : vbih.blockOffsetByteLength = src[n+1]
321 2 : vbih.blockLengthByteLength = src[n+2]
322 2 : return vbih, n + 3, nil
323 : }
324 :
325 : type valueBlocksAndIndexStats struct {
326 : numValueBlocks uint64
327 : numValuesInValueBlocks uint64
328 : // Includes both value blocks and value index block.
329 : valueBlocksAndIndexSize uint64
330 : }
331 :
332 : // valueBlockWriter writes a sequence of value blocks, and the value blocks
333 : // index, for a sstable.
334 : type valueBlockWriter struct {
335 : // The configured uncompressed block size and size threshold
336 : blockSize, blockSizeThreshold int
337 : // Configured compression.
338 : compression Compression
339 : // checksummer with configured checksum type.
340 : checksummer block.Checksummer
341 : // Block finished callback.
342 : blockFinishedFunc func(compressedSize int)
343 :
344 : // buf is the current block being written to (uncompressed).
345 : buf *blockBuffer
346 : // compressedBuf is used for compressing the block.
347 : compressedBuf *blockBuffer
348 : // Sequence of blocks that are finished.
349 : blocks []blockAndHandle
350 : // Cumulative value block bytes written so far.
351 : totalBlockBytes uint64
352 : numValues uint64
353 : }
354 :
355 : type blockAndHandle struct {
356 : block *blockBuffer
357 : handle block.Handle
358 : compressed bool
359 : }
360 :
361 : type blockBuffer struct {
362 : b []byte
363 : }
364 :
365 : // Pool of block buffers that should be roughly the blockSize.
366 : var uncompressedValueBlockBufPool = sync.Pool{
367 2 : New: func() interface{} {
368 2 : return &blockBuffer{}
369 2 : },
370 : }
371 :
372 : // Pool of block buffers for compressed value blocks. These may widely vary in
373 : // size based on compression ratios.
374 : var compressedValueBlockBufPool = sync.Pool{
375 2 : New: func() interface{} {
376 2 : return &blockBuffer{}
377 2 : },
378 : }
379 :
380 2 : func releaseToValueBlockBufPool(pool *sync.Pool, b *blockBuffer) {
381 2 : // Don't pool buffers larger than 128KB, in case we had some rare large
382 2 : // values.
383 2 : if len(b.b) > 128*1024 {
384 0 : return
385 0 : }
386 2 : if invariants.Enabled {
387 2 : // Set the bytes to a random value. Cap the number of bytes being
388 2 : // randomized to prevent test timeouts.
389 2 : length := cap(b.b)
390 2 : if length > 1000 {
391 2 : length = 1000
392 2 : }
393 2 : b.b = b.b[:length:length]
394 2 : rand.Read(b.b)
395 : }
396 2 : pool.Put(b)
397 : }
398 :
399 : var valueBlockWriterPool = sync.Pool{
400 2 : New: func() interface{} {
401 2 : return &valueBlockWriter{}
402 2 : },
403 : }
404 :
405 : func newValueBlockWriter(
406 : blockSize int,
407 : blockSizeThreshold int,
408 : compression Compression,
409 : checksumType block.ChecksumType,
410 : // compressedSize should exclude the block trailer.
411 : blockFinishedFunc func(compressedSize int),
412 2 : ) *valueBlockWriter {
413 2 : w := valueBlockWriterPool.Get().(*valueBlockWriter)
414 2 : *w = valueBlockWriter{
415 2 : blockSize: blockSize,
416 2 : blockSizeThreshold: blockSizeThreshold,
417 2 : compression: compression,
418 2 : checksummer: block.Checksummer{
419 2 : Type: checksumType,
420 2 : },
421 2 : blockFinishedFunc: blockFinishedFunc,
422 2 : buf: uncompressedValueBlockBufPool.Get().(*blockBuffer),
423 2 : compressedBuf: compressedValueBlockBufPool.Get().(*blockBuffer),
424 2 : blocks: w.blocks[:0],
425 2 : }
426 2 : w.buf.b = w.buf.b[:0]
427 2 : w.compressedBuf.b = w.compressedBuf.b[:0]
428 2 : return w
429 2 : }
430 :
431 2 : func releaseValueBlockWriter(w *valueBlockWriter) {
432 2 : for i := range w.blocks {
433 2 : if w.blocks[i].compressed {
434 2 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.blocks[i].block)
435 2 : } else {
436 2 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.blocks[i].block)
437 2 : }
438 2 : w.blocks[i].block = nil
439 : }
440 2 : if w.buf != nil {
441 2 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.buf)
442 2 : }
443 2 : if w.compressedBuf != nil {
444 2 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.compressedBuf)
445 2 : }
446 2 : *w = valueBlockWriter{
447 2 : blocks: w.blocks[:0],
448 2 : }
449 2 : valueBlockWriterPool.Put(w)
450 : }
451 :
452 2 : func (w *valueBlockWriter) addValue(v []byte) (valueHandle, error) {
453 2 : if invariants.Enabled && len(v) == 0 {
454 0 : return valueHandle{}, errors.Errorf("cannot write empty value to value block")
455 0 : }
456 2 : w.numValues++
457 2 : blockLen := len(w.buf.b)
458 2 : valueLen := len(v)
459 2 : if blockLen >= w.blockSize ||
460 2 : (blockLen > w.blockSizeThreshold && blockLen+valueLen > w.blockSize) {
461 2 : // Block is not currently empty and adding this value will become too big,
462 2 : // so finish this block.
463 2 : w.compressAndFlush()
464 2 : blockLen = len(w.buf.b)
465 2 : if invariants.Enabled && blockLen != 0 {
466 0 : panic("blockLen of new block should be 0")
467 : }
468 : }
469 2 : vh := valueHandle{
470 2 : valueLen: uint32(valueLen),
471 2 : blockNum: uint32(len(w.blocks)),
472 2 : offsetInBlock: uint32(blockLen),
473 2 : }
474 2 : blockLen = int(vh.offsetInBlock + vh.valueLen)
475 2 : if cap(w.buf.b) < blockLen {
476 2 : size := 2 * cap(w.buf.b)
477 2 : if size < 1024 {
478 2 : size = 1024
479 2 : }
480 2 : for size < blockLen {
481 0 : size *= 2
482 0 : }
483 2 : buf := make([]byte, blockLen, size)
484 2 : _ = copy(buf, w.buf.b)
485 2 : w.buf.b = buf
486 2 : } else {
487 2 : w.buf.b = w.buf.b[:blockLen]
488 2 : }
489 2 : buf := w.buf.b[vh.offsetInBlock:]
490 2 : n := copy(buf, v)
491 2 : if n != len(buf) {
492 0 : panic("incorrect length computation")
493 : }
494 2 : return vh, nil
495 : }
496 :
497 2 : func (w *valueBlockWriter) compressAndFlush() {
498 2 : // Compress the buffer, discarding the result if the improvement isn't at
499 2 : // least 12.5%.
500 2 : blockType := noCompressionBlockType
501 2 : b := w.buf
502 2 : if w.compression != NoCompression {
503 2 : blockType, w.compressedBuf.b =
504 2 : compressBlock(w.compression, w.buf.b, w.compressedBuf.b[:cap(w.compressedBuf.b)])
505 2 : if len(w.compressedBuf.b) < len(w.buf.b)-len(w.buf.b)/8 {
506 2 : b = w.compressedBuf
507 2 : } else {
508 2 : blockType = noCompressionBlockType
509 2 : }
510 : }
511 2 : n := len(b.b)
512 2 : if n+block.TrailerLen > cap(b.b) {
513 2 : block := make([]byte, n+block.TrailerLen)
514 2 : copy(block, b.b)
515 2 : b.b = block
516 2 : } else {
517 2 : b.b = b.b[:n+block.TrailerLen]
518 2 : }
519 2 : b.b[n] = byte(blockType)
520 2 : w.computeChecksum(b.b)
521 2 : bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(n)}
522 2 : w.totalBlockBytes += uint64(len(b.b))
523 2 : // blockFinishedFunc length excludes the block trailer.
524 2 : w.blockFinishedFunc(n)
525 2 : compressed := blockType != noCompressionBlockType
526 2 : w.blocks = append(w.blocks, blockAndHandle{
527 2 : block: b,
528 2 : handle: bh,
529 2 : compressed: compressed,
530 2 : })
531 2 : // Handed off a buffer to w.blocks, so need get a new one.
532 2 : if compressed {
533 2 : w.compressedBuf = compressedValueBlockBufPool.Get().(*blockBuffer)
534 2 : } else {
535 2 : w.buf = uncompressedValueBlockBufPool.Get().(*blockBuffer)
536 2 : }
537 2 : w.buf.b = w.buf.b[:0]
538 : }
539 :
540 2 : func (w *valueBlockWriter) computeChecksum(blk []byte) {
541 2 : n := len(blk) - block.TrailerLen
542 2 : checksum := w.checksummer.Checksum(blk[:n], blk[n:n+1])
543 2 : binary.LittleEndian.PutUint32(blk[n+1:], checksum)
544 2 : }
545 :
546 : func (w *valueBlockWriter) finish(
547 : layout *layoutWriter, fileOffset uint64,
548 2 : ) (valueBlocksIndexHandle, valueBlocksAndIndexStats, error) {
549 2 : if len(w.buf.b) > 0 {
550 2 : w.compressAndFlush()
551 2 : }
552 2 : n := len(w.blocks)
553 2 : if n == 0 {
554 2 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, nil
555 2 : }
556 2 : largestOffset := uint64(0)
557 2 : largestLength := uint64(0)
558 2 : for i := range w.blocks {
559 2 : _, err := layout.WriteValueBlock(w.blocks[i].block.b)
560 2 : if err != nil {
561 0 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
562 0 : }
563 2 : w.blocks[i].handle.Offset += fileOffset
564 2 : largestOffset = w.blocks[i].handle.Offset
565 2 : if largestLength < w.blocks[i].handle.Length {
566 2 : largestLength = w.blocks[i].handle.Length
567 2 : }
568 : }
569 2 : vbihOffset := fileOffset + w.totalBlockBytes
570 2 :
571 2 : vbih := valueBlocksIndexHandle{
572 2 : h: block.Handle{
573 2 : Offset: vbihOffset,
574 2 : },
575 2 : blockNumByteLength: uint8(lenLittleEndian(uint64(n - 1))),
576 2 : blockOffsetByteLength: uint8(lenLittleEndian(largestOffset)),
577 2 : blockLengthByteLength: uint8(lenLittleEndian(largestLength)),
578 2 : }
579 2 : var err error
580 2 : if n > 0 {
581 2 : if vbih, err = w.writeValueBlocksIndex(layout, vbih); err != nil {
582 0 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
583 0 : }
584 : }
585 2 : stats := valueBlocksAndIndexStats{
586 2 : numValueBlocks: uint64(n),
587 2 : numValuesInValueBlocks: w.numValues,
588 2 : valueBlocksAndIndexSize: w.totalBlockBytes + vbih.h.Length + block.TrailerLen,
589 2 : }
590 2 : return vbih, stats, err
591 : }
592 :
593 : func (w *valueBlockWriter) writeValueBlocksIndex(
594 : layout *layoutWriter, h valueBlocksIndexHandle,
595 2 : ) (valueBlocksIndexHandle, error) {
596 2 : blockLen :=
597 2 : int(h.blockNumByteLength+h.blockOffsetByteLength+h.blockLengthByteLength) * len(w.blocks)
598 2 : h.h.Length = uint64(blockLen)
599 2 : blockLen += block.TrailerLen
600 2 : var buf []byte
601 2 : if cap(w.buf.b) < blockLen {
602 2 : buf = make([]byte, blockLen)
603 2 : w.buf.b = buf
604 2 : } else {
605 2 : buf = w.buf.b[:blockLen]
606 2 : }
607 2 : b := buf
608 2 : for i := range w.blocks {
609 2 : littleEndianPut(uint64(i), b, int(h.blockNumByteLength))
610 2 : b = b[int(h.blockNumByteLength):]
611 2 : littleEndianPut(w.blocks[i].handle.Offset, b, int(h.blockOffsetByteLength))
612 2 : b = b[int(h.blockOffsetByteLength):]
613 2 : littleEndianPut(w.blocks[i].handle.Length, b, int(h.blockLengthByteLength))
614 2 : b = b[int(h.blockLengthByteLength):]
615 2 : }
616 2 : if len(b) != block.TrailerLen {
617 0 : panic("incorrect length calculation")
618 : }
619 2 : b[0] = byte(noCompressionBlockType)
620 2 : w.computeChecksum(buf)
621 2 : if _, err := layout.WriteValueIndexBlock(buf, h); err != nil {
622 0 : return valueBlocksIndexHandle{}, err
623 0 : }
624 2 : return h, nil
625 : }
626 :
627 : // littleEndianPut writes v to b using little endian encoding, under the
628 : // assumption that v can be represented using n bytes.
629 2 : func littleEndianPut(v uint64, b []byte, n int) {
630 2 : _ = b[n-1] // bounds check
631 2 : for i := 0; i < n; i++ {
632 2 : b[i] = byte(v)
633 2 : v = v >> 8
634 2 : }
635 : }
636 :
637 : // lenLittleEndian returns the minimum number of bytes needed to encode v
638 : // using little endian encoding.
639 2 : func lenLittleEndian(v uint64) int {
640 2 : n := 0
641 2 : for i := 0; i < 8; i++ {
642 2 : n++
643 2 : v = v >> 8
644 2 : if v == 0 {
645 2 : break
646 : }
647 : }
648 2 : return n
649 : }
650 :
651 2 : func littleEndianGet(b []byte, n int) uint64 {
652 2 : _ = b[n-1] // bounds check
653 2 : v := uint64(b[0])
654 2 : for i := 1; i < n; i++ {
655 2 : v |= uint64(b[i]) << (8 * i)
656 2 : }
657 2 : return v
658 : }
659 :
660 : // UserKeyPrefixBound represents a [Lower,Upper) bound of user key prefixes.
661 : // If both are nil, there is no bound specified. Else, Compare(Lower,Upper)
662 : // must be < 0.
663 : type UserKeyPrefixBound struct {
664 : // Lower is a lower bound user key prefix.
665 : Lower []byte
666 : // Upper is an upper bound user key prefix.
667 : Upper []byte
668 : }
669 :
670 : // IsEmpty returns true iff the bound is empty.
671 2 : func (ukb *UserKeyPrefixBound) IsEmpty() bool {
672 2 : return len(ukb.Lower) == 0 && len(ukb.Upper) == 0
673 2 : }
674 :
675 : type blockProviderWhenOpen interface {
676 : readBlockForVBR(
677 : h block.Handle, stats *base.InternalIteratorStats,
678 : ) (block.BufferHandle, error)
679 : }
680 :
681 : type blockProviderWhenClosed struct {
682 : rp ReaderProvider
683 : r *Reader
684 : }
685 :
686 2 : func (bpwc *blockProviderWhenClosed) open() error {
687 2 : var err error
688 2 : bpwc.r, err = bpwc.rp.GetReader()
689 2 : return err
690 2 : }
691 :
692 2 : func (bpwc *blockProviderWhenClosed) close() {
693 2 : bpwc.rp.Close()
694 2 : bpwc.r = nil
695 2 : }
696 :
697 : func (bpwc blockProviderWhenClosed) readBlockForVBR(
698 : h block.Handle, stats *base.InternalIteratorStats,
699 2 : ) (block.BufferHandle, error) {
700 2 : // This is rare, since most block reads happen when the corresponding
701 2 : // sstable iterator is open. So we are willing to sacrifice a proper context
702 2 : // for tracing.
703 2 : //
704 2 : // TODO(sumeer): consider fixing this. See
705 2 : // https://github.com/cockroachdb/pebble/pull/3065#issue-1991175365 for an
706 2 : // alternative.
707 2 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.ValueBlock)
708 2 : // TODO(jackson,sumeer): Consider whether to use a buffer pool in this case.
709 2 : // The bpwc is not allowed to outlive the iterator tree, so it cannot
710 2 : // outlive the buffer pool.
711 2 : return bpwc.r.readBlock(
712 2 : ctx, h, nil, nil, stats, nil /* iterStats */, nil /* buffer pool */)
713 2 : }
714 :
715 : // ReaderProvider supports the implementation of blockProviderWhenClosed.
716 : // GetReader and Close can be called multiple times in pairs.
717 : type ReaderProvider interface {
718 : GetReader() (r *Reader, err error)
719 : Close()
720 : }
721 :
722 : // TrivialReaderProvider implements ReaderProvider for a Reader that will
723 : // outlive the top-level iterator in the iterator tree.
724 : type TrivialReaderProvider struct {
725 : *Reader
726 : }
727 :
728 : var _ ReaderProvider = TrivialReaderProvider{}
729 :
730 : // GetReader implements ReaderProvider.
731 1 : func (trp TrivialReaderProvider) GetReader() (*Reader, error) {
732 1 : return trp.Reader, nil
733 1 : }
734 :
735 : // Close implements ReaderProvider.
736 1 : func (trp TrivialReaderProvider) Close() {}
737 :
738 : // valueBlockReader is used to retrieve values in value
739 : // blocks. It is used when the sstable was written with
740 : // Properties.ValueBlocksAreEnabled.
741 : type valueBlockReader struct {
742 : bpOpen blockProviderWhenOpen
743 : rp ReaderProvider
744 : vbih valueBlocksIndexHandle
745 : stats *base.InternalIteratorStats
746 :
747 : // The value blocks index is lazily retrieved the first time the reader
748 : // needs to read a value that resides in a value block.
749 : vbiBlock []byte
750 : vbiCache block.BufferHandle
751 : // When sequentially iterating through all key-value pairs, the cost of
752 : // repeatedly getting a block that is already in the cache and releasing the
753 : // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the
754 : // last value block it retrieved, in case there is locality of access, and
755 : // this value block can be used for the next value retrieval.
756 : valueBlockNum uint32
757 : valueBlock []byte
758 : valueBlockPtr unsafe.Pointer
759 : valueCache block.BufferHandle
760 : lazyFetcher base.LazyFetcher
761 : closed bool
762 : bufToMangle []byte
763 : }
764 :
765 2 : func (r *valueBlockReader) getLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue {
766 2 : fetcher := &r.lazyFetcher
767 2 : valLen, h := decodeLenFromValueHandle(handle[1:])
768 2 : *fetcher = base.LazyFetcher{
769 2 : Fetcher: r,
770 2 : Attribute: base.AttributeAndLen{
771 2 : ValueLen: int32(valLen),
772 2 : ShortAttribute: block.ValuePrefix(handle[0]).ShortAttribute(),
773 2 : },
774 2 : }
775 2 : if r.stats != nil {
776 2 : r.stats.SeparatedPointValue.Count++
777 2 : r.stats.SeparatedPointValue.ValueBytes += uint64(valLen)
778 2 : }
779 2 : return base.LazyValue{
780 2 : ValueOrHandle: h,
781 2 : Fetcher: fetcher,
782 2 : }
783 : }
784 :
785 2 : func (r *valueBlockReader) close() {
786 2 : r.bpOpen = nil
787 2 : r.vbiBlock = nil
788 2 : r.vbiCache.Release()
789 2 : // Set the handle to empty since Release does not nil the Handle.value. If
790 2 : // we were to reopen this valueBlockReader and retrieve the same
791 2 : // Handle.value from the cache, we don't want to accidentally unref it when
792 2 : // attempting to unref the old handle.
793 2 : r.vbiCache = block.BufferHandle{}
794 2 : r.valueBlock = nil
795 2 : r.valueBlockPtr = nil
796 2 : r.valueCache.Release()
797 2 : // See comment above.
798 2 : r.valueCache = block.BufferHandle{}
799 2 : r.closed = true
800 2 : // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be
801 2 : // implemented.
802 2 : }
803 :
804 : // Fetch implements base.ValueFetcher.
805 : func (r *valueBlockReader) Fetch(
806 : handle []byte, valLen int32, buf []byte,
807 2 : ) (val []byte, callerOwned bool, err error) {
808 2 : if !r.closed {
809 2 : val, err := r.getValueInternal(handle, valLen)
810 2 : if invariants.Enabled {
811 2 : val = r.doValueMangling(val)
812 2 : }
813 2 : return val, false, err
814 : }
815 :
816 2 : bp := blockProviderWhenClosed{rp: r.rp}
817 2 : err = bp.open()
818 2 : if err != nil {
819 0 : return nil, false, err
820 0 : }
821 2 : defer bp.close()
822 2 : defer r.close()
823 2 : r.bpOpen = bp
824 2 : var v []byte
825 2 : v, err = r.getValueInternal(handle, valLen)
826 2 : if err != nil {
827 0 : return nil, false, err
828 0 : }
829 2 : buf = append(buf[:0], v...)
830 2 : return buf, true, nil
831 : }
832 :
833 : // doValueMangling attempts to uncover violations of the contract listed in
834 : // the declaration comment of LazyValue. It is expensive, hence only called
835 : // when invariants.Enabled.
836 2 : func (r *valueBlockReader) doValueMangling(v []byte) []byte {
837 2 : // Randomly set the bytes in the previous retrieved value to 0, since
838 2 : // property P1 only requires the valueBlockReader to maintain the memory of
839 2 : // one fetched value.
840 2 : if rand.Intn(2) == 0 {
841 2 : clear(r.bufToMangle)
842 2 : }
843 : // Store the current value in a new buffer for future mangling.
844 2 : r.bufToMangle = append([]byte(nil), v...)
845 2 : return r.bufToMangle
846 : }
847 :
848 2 : func (r *valueBlockReader) getValueInternal(handle []byte, valLen int32) (val []byte, err error) {
849 2 : vh := decodeRemainingValueHandle(handle)
850 2 : vh.valueLen = uint32(valLen)
851 2 : if r.vbiBlock == nil {
852 2 : ch, err := r.bpOpen.readBlockForVBR(r.vbih.h, r.stats)
853 2 : if err != nil {
854 0 : return nil, err
855 0 : }
856 2 : r.vbiCache = ch
857 2 : r.vbiBlock = ch.Get()
858 : }
859 2 : if r.valueBlock == nil || r.valueBlockNum != vh.blockNum {
860 2 : vbh, err := r.getBlockHandle(vh.blockNum)
861 2 : if err != nil {
862 0 : return nil, err
863 0 : }
864 2 : vbCacheHandle, err := r.bpOpen.readBlockForVBR(vbh, r.stats)
865 2 : if err != nil {
866 0 : return nil, err
867 0 : }
868 2 : r.valueBlockNum = vh.blockNum
869 2 : r.valueCache.Release()
870 2 : r.valueCache = vbCacheHandle
871 2 : r.valueBlock = vbCacheHandle.Get()
872 2 : r.valueBlockPtr = unsafe.Pointer(&r.valueBlock[0])
873 : }
874 2 : if r.stats != nil {
875 2 : r.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen)
876 2 : }
877 2 : return r.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil
878 : }
879 :
880 2 : func (r *valueBlockReader) getBlockHandle(blockNum uint32) (block.Handle, error) {
881 2 : indexEntryLen :=
882 2 : int(r.vbih.blockNumByteLength + r.vbih.blockOffsetByteLength + r.vbih.blockLengthByteLength)
883 2 : offsetInIndex := indexEntryLen * int(blockNum)
884 2 : if len(r.vbiBlock) < offsetInIndex+indexEntryLen {
885 0 : return block.Handle{}, base.AssertionFailedf(
886 0 : "index entry out of bounds: offset %d length %d block length %d",
887 0 : offsetInIndex, indexEntryLen, len(r.vbiBlock))
888 0 : }
889 2 : b := r.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen]
890 2 : n := int(r.vbih.blockNumByteLength)
891 2 : bn := littleEndianGet(b, n)
892 2 : if uint32(bn) != blockNum {
893 0 : return block.Handle{},
894 0 : errors.Errorf("expected block num %d but found %d", blockNum, bn)
895 0 : }
896 2 : b = b[n:]
897 2 : n = int(r.vbih.blockOffsetByteLength)
898 2 : blockOffset := littleEndianGet(b, n)
899 2 : b = b[n:]
900 2 : n = int(r.vbih.blockLengthByteLength)
901 2 : blockLen := littleEndianGet(b, n)
902 2 : return block.Handle{Offset: blockOffset, Length: blockLen}, nil
903 : }
|