Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "sync"
11 : "unsafe"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
17 : "github.com/cockroachdb/pebble/sstable/block"
18 : "golang.org/x/exp/rand"
19 : )
20 :
21 : // Value blocks are supported in TableFormatPebblev3.
22 : //
23 : // 1. Motivation and overview
24 : //
25 : // Value blocks are a mechanism designed for sstables storing MVCC data, where
26 : // there can be many versions of a key that need to be kept, but only the
27 : // latest value is typically read (see the documentation for Comparer.Split
28 : // regarding MVCC keys). The goal is faster reads. Unlike Pebble versions,
29 : // which can be eagerly thrown away (except when there are snapshots), MVCC
30 : // versions are long-lived (e.g. default CockroachDB garbage collection
31 : // threshold for older versions is 24 hours) and can significantly slow down
32 : // reads. We have seen CockroachDB production workloads with very slow reads
33 : // due to:
34 : // - 100s of versions for each key in a table.
35 : //
36 : // - Tables with mostly MVCC garbage consisting of 2 versions per key -- a
37 : // real key-value pair, followed by a key-value pair whose value (usually
38 : // with zero byte length) indicates it is an MVCC tombstone.
39 : //
40 : // The value blocks mechanism attempts to improve read throughput in these
41 : // cases when the key size is smaller than the value sizes of older versions.
42 : // This is done by moving the value of an older version to a value block in a
43 : // different part of the sstable. This improves spatial locality of the data
44 : // being read by the workload, which increases caching effectiveness.
45 : //
46 : // Additionally, even when the key size is not smaller than the value of older
47 : // versions (e.g. secondary indexes in CockroachDB), TableFormatPebblev3
48 : // stores the result of key comparisons done at write time inside the sstable,
49 : // which makes stepping from one key prefix to the next prefix (i.e., skipping
50 : // over older versions of a MVCC key) more efficient by avoiding key
51 : // comparisons and key decoding. See the results in
52 : // https://github.com/cockroachdb/pebble/pull/2149 and more details in the
53 : // comment inside BenchmarkIteratorScanNextPrefix. These improvements are also
54 : // visible in end-to-end CockroachDB tests, as outlined in
55 : // https://github.com/cockroachdb/cockroach/pull/96652.
56 : //
57 : // In TableFormatPebblev3, each SET has a one byte value prefix that tells us
58 : // whether the value is in-place or in a value block. This 1 byte prefix
59 : // encodes additional information:
60 : //
61 : // - ShortAttribute: This is an attribute of the value. Currently, CockroachDB
62 : // uses it to represent whether the value is a tombstone or not. This avoids
63 : // the need to fetch a value from the value block if the caller only wants
64 : // to figure out whether it is an MVCC tombstone. The length of the value is
65 : // another attribute that the caller can be interested in, and it is also
66 : // accessible without reading the value in the value block (see the value
67 : // handle in the details section).
68 : //
69 : // - SET-same-prefix: this enables the aforementioned optimization when
70 : // stepping from one key prefix to the next key prefix.
71 : //
72 : // We further optimize this iteration over prefixes by using the restart
73 : // points in a block to encode whether the SET at a restart point has the same
74 : // prefix since the last restart point. This allows us to skip over restart
75 : // points within the same block. See the comment in blockWriter, and how both
76 : // SET-same-prefix and the restart point information is used in
77 : // blockIter.nextPrefixV3.
78 : //
79 : // This flexibility of values that are in-place or in value blocks requires
80 : // flexibility in the iterator interface. The InternalIterator interface
81 : // returns a LazyValue instead of a byte slice. Additionally, pebble.Iterator
82 : // allows the caller to ask for a LazyValue. See lazy_value.go for details,
83 : // including the memory lifetime management.
84 : //
85 : // For historical discussions about this feature, see the issue
86 : // https://github.com/cockroachdb/pebble/issues/1170 and the prototype in
87 : // https://github.com/cockroachdb/pebble/pull/1443.
88 : //
89 : // The code in this file mainly covers value block and related encodings. We
90 : // discuss these in the next section.
91 : //
92 : // 2. Details
93 : //
94 : // Note that the notion of the latest value is local to the sstable. It is
95 : // possible that that latest value has been deleted by a sstable in a higher
96 : // level, and what is the latest value from the perspective of the whole LSM
97 : // is an older MVCC version. This only affects performance and not
98 : // correctness. This local knowledge is also why we continue to store these
99 : // older versions in the same sstable -- we need to be able to conveniently
100 : // read them. The code in this file is agnostic to the policy regarding what
101 : // should be stored in value blocks -- it allows even the latest MVCC version
102 : // to be stored in a value block. The policy decision in made in the
103 : // sstable.Writer. See Writer.makeAddPointDecisionV3.
104 : //
105 : // Data blocks contain two kinds of SET keys: those with in-place values and
106 : // those with a value handle. To distinguish these two cases we use a single
107 : // byte prefix (ValuePrefix). This single byte prefix is split into multiple
108 : // parts, where nb represents information that is encoded in n bits.
109 : //
110 : // +---------------+--------------------+-----------+--------------------+
111 : // | value-kind 2b | SET-same-prefix 1b | unused 2b | short-attribute 3b |
112 : // +---------------+--------------------+-----------+--------------------+
113 : //
114 : // The 2 bit value-kind specifies whether this is an in-place value or a value
115 : // handle pointing to a value block. We use 2 bits here for future
116 : // representation of values that are in separate files. The 1 bit
117 : // SET-same-prefix is true if this key is a SET and is immediately preceded by
118 : // a SET that shares the same prefix. The 3 bit short-attribute is described
119 : // in base.ShortAttribute -- it stores user-defined attributes about the
120 : // value. It is unused for in-place values.
121 : //
122 : // Value Handle and Value Blocks:
123 : // valueHandles refer to values in value blocks. Value blocks are simpler than
124 : // normal data blocks (that contain key-value pairs, and allow for binary
125 : // search), which makes them cheap for value retrieval purposes. A valueHandle
126 : // is a tuple (valueLen, blockNum, offsetInBlock), where blockNum is the 0
127 : // indexed value block number and offsetInBlock is the byte offset in that
128 : // block containing the value. The valueHandle.valueLen is included since
129 : // there are multiple use cases in CockroachDB that need the value length but
130 : // not the value, for which we can avoid reading the value in the value block
131 : // (see
132 : // https://github.com/cockroachdb/pebble/issues/1170#issuecomment-958203245).
133 : //
134 : // A value block has a checksum like other blocks, and is optionally
135 : // compressed. An uncompressed value block is a sequence of values with no
136 : // separator or length (we rely on the valueHandle to demarcate). The
137 : // valueHandle.offsetInBlock points to the value, of length
138 : // valueHandle.valueLen. While writing a sstable, all the (possibly
139 : // compressed) value blocks need to be held in-memory until they can be
140 : // written. Value blocks are placed after the "meta rangedel" and "meta range
141 : // key" blocks since value blocks are considered less likely to be read.
142 : //
143 : // Meta Value Index Block:
144 : // Since the (key, valueHandle) pair are written before there is any knowledge
145 : // of the byte offset of the value block in the file, or its compressed
146 : // length, we need another lookup to map the valueHandle.blockNum to the
147 : // information needed to read it from the file. This information is provided
148 : // by the "value index block". The "value index block" is referred to by the
149 : // metaindex block. The design intentionally avoids making the "value index
150 : // block" a general purpose key-value block, since each caller wants to lookup
151 : // the information for a particular blockNum (there is no need for SeekGE
152 : // etc.). Instead, this index block stores a sequence of (blockNum,
153 : // blockOffset, blockLength) tuples, where the blockNums are consecutive
154 : // integers, and the tuples are encoded with a fixed width encoding. This
155 : // allows a reader to find the tuple for block K by looking at the offset
156 : // K*fixed-width. The fixed width for each field is decided by looking at the
157 : // maximum value of each of these fields. As a concrete example of a large
158 : // sstable with many value blocks, we constructed a 100MB sstable with many
159 : // versions and had 2475 value blocks (~32KB each). This sstable had this
160 : // tuple encoded using 2+4+2=8 bytes, which means the uncompressed value index
161 : // block was 2475*8=~19KB, which is modest. Therefore, we don't support more
162 : // than one value index block. Consider the example of 2 byte blockNum, 4 byte
163 : // blockOffset and 2 byte blockLen. The value index block will look like:
164 : //
165 : // +---------------+------------------+---------------+
166 : // | blockNum (2B) | blockOffset (4B) | blockLen (2B) |
167 : // +---------------+------------------+---------------+
168 : // | 0 | 7,123,456 | 30,000 |
169 : // +---------------+------------------+---------------+
170 : // | 1 | 7,153,456 | 20,000 |
171 : // +---------------+------------------+---------------+
172 : // | 2 | 7,173,456 | 25,567 |
173 : // +---------------+------------------+---------------+
174 : // | .... | ... | ... |
175 : //
176 : //
177 : // The metaindex block contains the valueBlocksIndexHandle which in addition
178 : // to the BlockHandle also specifies the widths of these tuple fields. In the
179 : // above example, the
180 : // valueBlockIndexHandle.{blockNumByteLength,blockOffsetByteLength,blockLengthByteLength}
181 : // will be (2,4,2).
182 :
183 : // valueHandle is stored with a key when the value is in a value block. This
184 : // handle is the pointer to that value.
185 : type valueHandle struct {
186 : valueLen uint32
187 : blockNum uint32
188 : offsetInBlock uint32
189 : }
190 :
191 : // valueHandle fields are varint encoded, so maximum 5 bytes each, plus 1 byte
192 : // for the valuePrefix. This could alternatively be group varint encoded, but
193 : // experiments were inconclusive
194 : // (https://github.com/cockroachdb/pebble/pull/1443#issuecomment-1270298802).
195 : const valueHandleMaxLen = 5*3 + 1
196 :
197 : // Assert blockHandleLikelyMaxLen >= valueHandleMaxLen.
198 : const _ = uint(blockHandleLikelyMaxLen - valueHandleMaxLen)
199 :
200 1 : func encodeValueHandle(dst []byte, v valueHandle) int {
201 1 : n := 0
202 1 : n += binary.PutUvarint(dst[n:], uint64(v.valueLen))
203 1 : n += binary.PutUvarint(dst[n:], uint64(v.blockNum))
204 1 : n += binary.PutUvarint(dst[n:], uint64(v.offsetInBlock))
205 1 : return n
206 1 : }
207 :
208 1 : func decodeLenFromValueHandle(src []byte) (uint32, []byte) {
209 1 : ptr := unsafe.Pointer(&src[0])
210 1 : var v uint32
211 1 : if a := *((*uint8)(ptr)); a < 128 {
212 1 : v = uint32(a)
213 1 : src = src[1:]
214 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
215 0 : v = uint32(b)<<7 | uint32(a)
216 0 : src = src[2:]
217 0 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
218 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
219 0 : src = src[3:]
220 0 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
221 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
222 0 : src = src[4:]
223 0 : } else {
224 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
225 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
226 0 : src = src[5:]
227 0 : }
228 1 : return v, src
229 : }
230 :
231 1 : func decodeRemainingValueHandle(src []byte) valueHandle {
232 1 : var vh valueHandle
233 1 : ptr := unsafe.Pointer(&src[0])
234 1 : // Manually inlined uvarint decoding. Saves ~25% in benchmarks. Unrolling
235 1 : // a loop for i:=0; i<2; i++, saves ~6%.
236 1 : var v uint32
237 1 : if a := *((*uint8)(ptr)); a < 128 {
238 1 : v = uint32(a)
239 1 : ptr = unsafe.Pointer(uintptr(ptr) + 1)
240 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
241 1 : v = uint32(b)<<7 | uint32(a)
242 1 : ptr = unsafe.Pointer(uintptr(ptr) + 2)
243 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
244 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
245 0 : ptr = unsafe.Pointer(uintptr(ptr) + 3)
246 0 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
247 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
248 0 : ptr = unsafe.Pointer(uintptr(ptr) + 4)
249 0 : } else {
250 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
251 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
252 0 : ptr = unsafe.Pointer(uintptr(ptr) + 5)
253 0 : }
254 1 : vh.blockNum = v
255 1 :
256 1 : if a := *((*uint8)(ptr)); a < 128 {
257 1 : v = uint32(a)
258 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
259 1 : v = uint32(b)<<7 | uint32(a)
260 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
261 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
262 0 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
263 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
264 0 : } else {
265 0 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
266 0 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
267 0 : }
268 1 : vh.offsetInBlock = v
269 1 :
270 1 : return vh
271 : }
272 :
273 0 : func decodeValueHandle(src []byte) valueHandle {
274 0 : valLen, src := decodeLenFromValueHandle(src)
275 0 : vh := decodeRemainingValueHandle(src)
276 0 : vh.valueLen = valLen
277 0 : return vh
278 0 : }
279 :
280 : // valueBlocksIndexHandle is placed in the metaindex if there are any value
281 : // blocks. If there are no value blocks, there is no value blocks index, and
282 : // no entry in the metaindex. Note that the lack of entry in the metaindex
283 : // should not be used to ascertain whether the values are prefixed, since the
284 : // former is an emergent property of the data that was written and not known
285 : // until all the key-value pairs in the sstable are written.
286 : type valueBlocksIndexHandle struct {
287 : h block.Handle
288 : blockNumByteLength uint8
289 : blockOffsetByteLength uint8
290 : blockLengthByteLength uint8
291 : }
292 :
293 : const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3
294 :
295 : // Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen.
296 : const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen)
297 :
298 1 : func encodeValueBlocksIndexHandle(dst []byte, v valueBlocksIndexHandle) int {
299 1 : n := encodeBlockHandle(dst, v.h)
300 1 : dst[n] = v.blockNumByteLength
301 1 : n++
302 1 : dst[n] = v.blockOffsetByteLength
303 1 : n++
304 1 : dst[n] = v.blockLengthByteLength
305 1 : n++
306 1 : return n
307 1 : }
308 :
309 1 : func decodeValueBlocksIndexHandle(src []byte) (valueBlocksIndexHandle, int, error) {
310 1 : var vbih valueBlocksIndexHandle
311 1 : var n int
312 1 : vbih.h, n = decodeBlockHandle(src)
313 1 : if n <= 0 {
314 0 : return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
315 0 : }
316 1 : if len(src) != n+3 {
317 0 : return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
318 0 : }
319 1 : vbih.blockNumByteLength = src[n]
320 1 : vbih.blockOffsetByteLength = src[n+1]
321 1 : vbih.blockLengthByteLength = src[n+2]
322 1 : return vbih, n + 3, nil
323 : }
324 :
325 : type valueBlocksAndIndexStats struct {
326 : numValueBlocks uint64
327 : numValuesInValueBlocks uint64
328 : // Includes both value blocks and value index block.
329 : valueBlocksAndIndexSize uint64
330 : }
331 :
332 : // valueBlockWriter writes a sequence of value blocks, and the value blocks
333 : // index, for a sstable.
334 : type valueBlockWriter struct {
335 : // The configured uncompressed block size and size threshold
336 : blockSize, blockSizeThreshold int
337 : // Configured compression.
338 : compression Compression
339 : // checksummer with configured checksum type.
340 : checksummer block.Checksummer
341 : // Block finished callback.
342 : blockFinishedFunc func(compressedSize int)
343 :
344 : // buf is the current block being written to (uncompressed).
345 : buf *blockBuffer
346 : // compressedBuf is used for compressing the block.
347 : compressedBuf *blockBuffer
348 : // Sequence of blocks that are finished.
349 : blocks []blockAndHandle
350 : // Cumulative value block bytes written so far.
351 : totalBlockBytes uint64
352 : numValues uint64
353 : }
354 :
355 : type blockAndHandle struct {
356 : block *blockBuffer
357 : handle block.Handle
358 : compressed bool
359 : }
360 :
361 : type blockBuffer struct {
362 : b []byte
363 : }
364 :
365 : // Pool of block buffers that should be roughly the blockSize.
366 : var uncompressedValueBlockBufPool = sync.Pool{
367 1 : New: func() interface{} {
368 1 : return &blockBuffer{}
369 1 : },
370 : }
371 :
372 : // Pool of block buffers for compressed value blocks. These may widely vary in
373 : // size based on compression ratios.
374 : var compressedValueBlockBufPool = sync.Pool{
375 1 : New: func() interface{} {
376 1 : return &blockBuffer{}
377 1 : },
378 : }
379 :
380 1 : func releaseToValueBlockBufPool(pool *sync.Pool, b *blockBuffer) {
381 1 : // Don't pool buffers larger than 128KB, in case we had some rare large
382 1 : // values.
383 1 : if len(b.b) > 128*1024 {
384 0 : return
385 0 : }
386 1 : if invariants.Enabled {
387 1 : // Set the bytes to a random value. Cap the number of bytes being
388 1 : // randomized to prevent test timeouts.
389 1 : length := cap(b.b)
390 1 : if length > 1000 {
391 1 : length = 1000
392 1 : }
393 1 : b.b = b.b[:length:length]
394 1 : rand.Read(b.b)
395 : }
396 1 : pool.Put(b)
397 : }
398 :
399 : var valueBlockWriterPool = sync.Pool{
400 1 : New: func() interface{} {
401 1 : return &valueBlockWriter{}
402 1 : },
403 : }
404 :
405 : func newValueBlockWriter(
406 : blockSize int,
407 : blockSizeThreshold int,
408 : compression Compression,
409 : checksumType block.ChecksumType,
410 : // compressedSize should exclude the block trailer.
411 : blockFinishedFunc func(compressedSize int),
412 1 : ) *valueBlockWriter {
413 1 : w := valueBlockWriterPool.Get().(*valueBlockWriter)
414 1 : *w = valueBlockWriter{
415 1 : blockSize: blockSize,
416 1 : blockSizeThreshold: blockSizeThreshold,
417 1 : compression: compression,
418 1 : checksummer: block.Checksummer{
419 1 : Type: checksumType,
420 1 : },
421 1 : blockFinishedFunc: blockFinishedFunc,
422 1 : buf: uncompressedValueBlockBufPool.Get().(*blockBuffer),
423 1 : compressedBuf: compressedValueBlockBufPool.Get().(*blockBuffer),
424 1 : blocks: w.blocks[:0],
425 1 : }
426 1 : w.buf.b = w.buf.b[:0]
427 1 : w.compressedBuf.b = w.compressedBuf.b[:0]
428 1 : return w
429 1 : }
430 :
431 1 : func releaseValueBlockWriter(w *valueBlockWriter) {
432 1 : for i := range w.blocks {
433 1 : if w.blocks[i].compressed {
434 1 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.blocks[i].block)
435 1 : } else {
436 1 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.blocks[i].block)
437 1 : }
438 1 : w.blocks[i].block = nil
439 : }
440 1 : if w.buf != nil {
441 1 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.buf)
442 1 : }
443 1 : if w.compressedBuf != nil {
444 1 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.compressedBuf)
445 1 : }
446 1 : *w = valueBlockWriter{
447 1 : blocks: w.blocks[:0],
448 1 : }
449 1 : valueBlockWriterPool.Put(w)
450 : }
451 :
452 1 : func (w *valueBlockWriter) addValue(v []byte) (valueHandle, error) {
453 1 : if invariants.Enabled && len(v) == 0 {
454 0 : return valueHandle{}, errors.Errorf("cannot write empty value to value block")
455 0 : }
456 1 : w.numValues++
457 1 : blockLen := len(w.buf.b)
458 1 : valueLen := len(v)
459 1 : if blockLen >= w.blockSize ||
460 1 : (blockLen > w.blockSizeThreshold && blockLen+valueLen > w.blockSize) {
461 1 : // Block is not currently empty and adding this value will become too big,
462 1 : // so finish this block.
463 1 : w.compressAndFlush()
464 1 : blockLen = len(w.buf.b)
465 1 : if invariants.Enabled && blockLen != 0 {
466 0 : panic("blockLen of new block should be 0")
467 : }
468 : }
469 1 : vh := valueHandle{
470 1 : valueLen: uint32(valueLen),
471 1 : blockNum: uint32(len(w.blocks)),
472 1 : offsetInBlock: uint32(blockLen),
473 1 : }
474 1 : blockLen = int(vh.offsetInBlock + vh.valueLen)
475 1 : if cap(w.buf.b) < blockLen {
476 1 : size := 2 * cap(w.buf.b)
477 1 : if size < 1024 {
478 1 : size = 1024
479 1 : }
480 1 : for size < blockLen {
481 0 : size *= 2
482 0 : }
483 1 : buf := make([]byte, blockLen, size)
484 1 : _ = copy(buf, w.buf.b)
485 1 : w.buf.b = buf
486 1 : } else {
487 1 : w.buf.b = w.buf.b[:blockLen]
488 1 : }
489 1 : buf := w.buf.b[vh.offsetInBlock:]
490 1 : n := copy(buf, v)
491 1 : if n != len(buf) {
492 0 : panic("incorrect length computation")
493 : }
494 1 : return vh, nil
495 : }
496 :
497 1 : func (w *valueBlockWriter) compressAndFlush() {
498 1 : // Compress the buffer, discarding the result if the improvement isn't at
499 1 : // least 12.5%.
500 1 : blockType := noCompressionBlockType
501 1 : b := w.buf
502 1 : if w.compression != NoCompression {
503 1 : blockType, w.compressedBuf.b =
504 1 : compressBlock(w.compression, w.buf.b, w.compressedBuf.b[:cap(w.compressedBuf.b)])
505 1 : if len(w.compressedBuf.b) < len(w.buf.b)-len(w.buf.b)/8 {
506 1 : b = w.compressedBuf
507 1 : } else {
508 1 : blockType = noCompressionBlockType
509 1 : }
510 : }
511 1 : n := len(b.b)
512 1 : if n+block.TrailerLen > cap(b.b) {
513 1 : block := make([]byte, n+block.TrailerLen)
514 1 : copy(block, b.b)
515 1 : b.b = block
516 1 : } else {
517 1 : b.b = b.b[:n+block.TrailerLen]
518 1 : }
519 1 : b.b[n] = byte(blockType)
520 1 : w.computeChecksum(b.b)
521 1 : bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(n)}
522 1 : w.totalBlockBytes += uint64(len(b.b))
523 1 : // blockFinishedFunc length excludes the block trailer.
524 1 : w.blockFinishedFunc(n)
525 1 : compressed := blockType != noCompressionBlockType
526 1 : w.blocks = append(w.blocks, blockAndHandle{
527 1 : block: b,
528 1 : handle: bh,
529 1 : compressed: compressed,
530 1 : })
531 1 : // Handed off a buffer to w.blocks, so need get a new one.
532 1 : if compressed {
533 1 : w.compressedBuf = compressedValueBlockBufPool.Get().(*blockBuffer)
534 1 : } else {
535 1 : w.buf = uncompressedValueBlockBufPool.Get().(*blockBuffer)
536 1 : }
537 1 : w.buf.b = w.buf.b[:0]
538 : }
539 :
540 1 : func (w *valueBlockWriter) computeChecksum(blk []byte) {
541 1 : n := len(blk) - block.TrailerLen
542 1 : checksum := w.checksummer.Checksum(blk[:n], blk[n:n+1])
543 1 : binary.LittleEndian.PutUint32(blk[n+1:], checksum)
544 1 : }
545 :
546 : func (w *valueBlockWriter) finish(
547 : layout *layoutWriter, fileOffset uint64,
548 1 : ) (valueBlocksIndexHandle, valueBlocksAndIndexStats, error) {
549 1 : if len(w.buf.b) > 0 {
550 1 : w.compressAndFlush()
551 1 : }
552 1 : n := len(w.blocks)
553 1 : if n == 0 {
554 1 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, nil
555 1 : }
556 1 : largestOffset := uint64(0)
557 1 : largestLength := uint64(0)
558 1 : for i := range w.blocks {
559 1 : _, err := layout.WriteValueBlock(w.blocks[i].block.b)
560 1 : if err != nil {
561 0 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
562 0 : }
563 1 : w.blocks[i].handle.Offset += fileOffset
564 1 : largestOffset = w.blocks[i].handle.Offset
565 1 : if largestLength < w.blocks[i].handle.Length {
566 1 : largestLength = w.blocks[i].handle.Length
567 1 : }
568 : }
569 1 : vbihOffset := fileOffset + w.totalBlockBytes
570 1 :
571 1 : vbih := valueBlocksIndexHandle{
572 1 : h: block.Handle{
573 1 : Offset: vbihOffset,
574 1 : },
575 1 : blockNumByteLength: uint8(lenLittleEndian(uint64(n - 1))),
576 1 : blockOffsetByteLength: uint8(lenLittleEndian(largestOffset)),
577 1 : blockLengthByteLength: uint8(lenLittleEndian(largestLength)),
578 1 : }
579 1 : var err error
580 1 : if n > 0 {
581 1 : if vbih, err = w.writeValueBlocksIndex(layout, vbih); err != nil {
582 0 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
583 0 : }
584 : }
585 1 : stats := valueBlocksAndIndexStats{
586 1 : numValueBlocks: uint64(n),
587 1 : numValuesInValueBlocks: w.numValues,
588 1 : valueBlocksAndIndexSize: w.totalBlockBytes + vbih.h.Length + block.TrailerLen,
589 1 : }
590 1 : return vbih, stats, err
591 : }
592 :
593 : func (w *valueBlockWriter) writeValueBlocksIndex(
594 : layout *layoutWriter, h valueBlocksIndexHandle,
595 1 : ) (valueBlocksIndexHandle, error) {
596 1 : blockLen :=
597 1 : int(h.blockNumByteLength+h.blockOffsetByteLength+h.blockLengthByteLength) * len(w.blocks)
598 1 : h.h.Length = uint64(blockLen)
599 1 : blockLen += block.TrailerLen
600 1 : var buf []byte
601 1 : if cap(w.buf.b) < blockLen {
602 1 : buf = make([]byte, blockLen)
603 1 : w.buf.b = buf
604 1 : } else {
605 1 : buf = w.buf.b[:blockLen]
606 1 : }
607 1 : b := buf
608 1 : for i := range w.blocks {
609 1 : littleEndianPut(uint64(i), b, int(h.blockNumByteLength))
610 1 : b = b[int(h.blockNumByteLength):]
611 1 : littleEndianPut(w.blocks[i].handle.Offset, b, int(h.blockOffsetByteLength))
612 1 : b = b[int(h.blockOffsetByteLength):]
613 1 : littleEndianPut(w.blocks[i].handle.Length, b, int(h.blockLengthByteLength))
614 1 : b = b[int(h.blockLengthByteLength):]
615 1 : }
616 1 : if len(b) != block.TrailerLen {
617 0 : panic("incorrect length calculation")
618 : }
619 1 : b[0] = byte(noCompressionBlockType)
620 1 : w.computeChecksum(buf)
621 1 : if _, err := layout.WriteValueIndexBlock(buf, h); err != nil {
622 0 : return valueBlocksIndexHandle{}, err
623 0 : }
624 1 : return h, nil
625 : }
626 :
627 : // littleEndianPut writes v to b using little endian encoding, under the
628 : // assumption that v can be represented using n bytes.
629 1 : func littleEndianPut(v uint64, b []byte, n int) {
630 1 : _ = b[n-1] // bounds check
631 1 : for i := 0; i < n; i++ {
632 1 : b[i] = byte(v)
633 1 : v = v >> 8
634 1 : }
635 : }
636 :
637 : // lenLittleEndian returns the minimum number of bytes needed to encode v
638 : // using little endian encoding.
639 1 : func lenLittleEndian(v uint64) int {
640 1 : n := 0
641 1 : for i := 0; i < 8; i++ {
642 1 : n++
643 1 : v = v >> 8
644 1 : if v == 0 {
645 1 : break
646 : }
647 : }
648 1 : return n
649 : }
650 :
651 1 : func littleEndianGet(b []byte, n int) uint64 {
652 1 : _ = b[n-1] // bounds check
653 1 : v := uint64(b[0])
654 1 : for i := 1; i < n; i++ {
655 1 : v |= uint64(b[i]) << (8 * i)
656 1 : }
657 1 : return v
658 : }
659 :
660 : // UserKeyPrefixBound represents a [Lower,Upper) bound of user key prefixes.
661 : // If both are nil, there is no bound specified. Else, Compare(Lower,Upper)
662 : // must be < 0.
663 : type UserKeyPrefixBound struct {
664 : // Lower is a lower bound user key prefix.
665 : Lower []byte
666 : // Upper is an upper bound user key prefix.
667 : Upper []byte
668 : }
669 :
670 : // IsEmpty returns true iff the bound is empty.
671 1 : func (ukb *UserKeyPrefixBound) IsEmpty() bool {
672 1 : return len(ukb.Lower) == 0 && len(ukb.Upper) == 0
673 1 : }
674 :
675 : type blockProviderWhenOpen interface {
676 : readBlockForVBR(
677 : h block.Handle, stats *base.InternalIteratorStats,
678 : ) (block.BufferHandle, error)
679 : }
680 :
681 : type blockProviderWhenClosed struct {
682 : rp ReaderProvider
683 : r *Reader
684 : }
685 :
686 1 : func (bpwc *blockProviderWhenClosed) open(ctx context.Context) error {
687 1 : var err error
688 1 : bpwc.r, err = bpwc.rp.GetReader(ctx)
689 1 : return err
690 1 : }
691 :
692 1 : func (bpwc *blockProviderWhenClosed) close() {
693 1 : bpwc.rp.Close()
694 1 : bpwc.r = nil
695 1 : }
696 :
697 : func (bpwc blockProviderWhenClosed) readBlockForVBR(
698 : h block.Handle, stats *base.InternalIteratorStats,
699 1 : ) (block.BufferHandle, error) {
700 1 : // This is rare, since most block reads happen when the corresponding
701 1 : // sstable iterator is open. So we are willing to sacrifice a proper context
702 1 : // for tracing.
703 1 : //
704 1 : // TODO(sumeer): consider fixing this. See
705 1 : // https://github.com/cockroachdb/pebble/pull/3065#issue-1991175365 for an
706 1 : // alternative.
707 1 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.ValueBlock)
708 1 : // TODO(jackson,sumeer): Consider whether to use a buffer pool in this case.
709 1 : // The bpwc is not allowed to outlive the iterator tree, so it cannot
710 1 : // outlive the buffer pool.
711 1 : return bpwc.r.readBlock(
712 1 : ctx, h, nil, nil, stats, nil /* iterStats */, nil /* buffer pool */)
713 1 : }
714 :
715 : // ReaderProvider supports the implementation of blockProviderWhenClosed.
716 : // GetReader and Close can be called multiple times in pairs.
717 : type ReaderProvider interface {
718 : GetReader(ctx context.Context) (r *Reader, err error)
719 : Close()
720 : }
721 :
722 : // TrivialReaderProvider implements ReaderProvider for a Reader that will
723 : // outlive the top-level iterator in the iterator tree.
724 : type TrivialReaderProvider struct {
725 : *Reader
726 : }
727 :
728 : var _ ReaderProvider = TrivialReaderProvider{}
729 :
730 : // GetReader implements ReaderProvider.
731 0 : func (trp TrivialReaderProvider) GetReader(ctx context.Context) (*Reader, error) {
732 0 : return trp.Reader, nil
733 0 : }
734 :
735 : // Close implements ReaderProvider.
736 0 : func (trp TrivialReaderProvider) Close() {}
737 :
738 : // valueBlockReader is used to retrieve values in value
739 : // blocks. It is used when the sstable was written with
740 : // Properties.ValueBlocksAreEnabled.
741 : type valueBlockReader struct {
742 : bpOpen blockProviderWhenOpen
743 : rp ReaderProvider
744 : vbih valueBlocksIndexHandle
745 : stats *base.InternalIteratorStats
746 :
747 : // The value blocks index is lazily retrieved the first time the reader
748 : // needs to read a value that resides in a value block.
749 : vbiBlock []byte
750 : vbiCache block.BufferHandle
751 : // When sequentially iterating through all key-value pairs, the cost of
752 : // repeatedly getting a block that is already in the cache and releasing the
753 : // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the
754 : // last value block it retrieved, in case there is locality of access, and
755 : // this value block can be used for the next value retrieval.
756 : valueBlockNum uint32
757 : valueBlock []byte
758 : valueBlockPtr unsafe.Pointer
759 : valueCache block.BufferHandle
760 : lazyFetcher base.LazyFetcher
761 : closed bool
762 : bufToMangle []byte
763 : }
764 :
765 1 : func (r *valueBlockReader) getLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue {
766 1 : fetcher := &r.lazyFetcher
767 1 : valLen, h := decodeLenFromValueHandle(handle[1:])
768 1 : *fetcher = base.LazyFetcher{
769 1 : Fetcher: r,
770 1 : Attribute: base.AttributeAndLen{
771 1 : ValueLen: int32(valLen),
772 1 : ShortAttribute: block.ValuePrefix(handle[0]).ShortAttribute(),
773 1 : },
774 1 : }
775 1 : if r.stats != nil {
776 1 : r.stats.SeparatedPointValue.Count++
777 1 : r.stats.SeparatedPointValue.ValueBytes += uint64(valLen)
778 1 : }
779 1 : return base.LazyValue{
780 1 : ValueOrHandle: h,
781 1 : Fetcher: fetcher,
782 1 : }
783 : }
784 :
785 1 : func (r *valueBlockReader) close() {
786 1 : r.bpOpen = nil
787 1 : r.vbiBlock = nil
788 1 : r.vbiCache.Release()
789 1 : // Set the handle to empty since Release does not nil the Handle.value. If
790 1 : // we were to reopen this valueBlockReader and retrieve the same
791 1 : // Handle.value from the cache, we don't want to accidentally unref it when
792 1 : // attempting to unref the old handle.
793 1 : r.vbiCache = block.BufferHandle{}
794 1 : r.valueBlock = nil
795 1 : r.valueBlockPtr = nil
796 1 : r.valueCache.Release()
797 1 : // See comment above.
798 1 : r.valueCache = block.BufferHandle{}
799 1 : r.closed = true
800 1 : // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be
801 1 : // implemented.
802 1 : }
803 :
804 : // Fetch implements base.ValueFetcher.
805 : func (r *valueBlockReader) Fetch(
806 : ctx context.Context, handle []byte, valLen int32, buf []byte,
807 1 : ) (val []byte, callerOwned bool, err error) {
808 1 : if !r.closed {
809 1 : val, err := r.getValueInternal(handle, valLen)
810 1 : if invariants.Enabled {
811 1 : val = r.doValueMangling(val)
812 1 : }
813 1 : return val, false, err
814 : }
815 :
816 1 : bp := blockProviderWhenClosed{rp: r.rp}
817 1 : err = bp.open(ctx)
818 1 : if err != nil {
819 0 : return nil, false, err
820 0 : }
821 1 : defer bp.close()
822 1 : defer r.close()
823 1 : r.bpOpen = bp
824 1 : var v []byte
825 1 : v, err = r.getValueInternal(handle, valLen)
826 1 : if err != nil {
827 0 : return nil, false, err
828 0 : }
829 1 : buf = append(buf[:0], v...)
830 1 : return buf, true, nil
831 : }
832 :
833 : // doValueMangling attempts to uncover violations of the contract listed in
834 : // the declaration comment of LazyValue. It is expensive, hence only called
835 : // when invariants.Enabled.
836 1 : func (r *valueBlockReader) doValueMangling(v []byte) []byte {
837 1 : // Randomly set the bytes in the previous retrieved value to 0, since
838 1 : // property P1 only requires the valueBlockReader to maintain the memory of
839 1 : // one fetched value.
840 1 : if rand.Intn(2) == 0 {
841 1 : clear(r.bufToMangle)
842 1 : }
843 : // Store the current value in a new buffer for future mangling.
844 1 : r.bufToMangle = append([]byte(nil), v...)
845 1 : return r.bufToMangle
846 : }
847 :
848 1 : func (r *valueBlockReader) getValueInternal(handle []byte, valLen int32) (val []byte, err error) {
849 1 : vh := decodeRemainingValueHandle(handle)
850 1 : vh.valueLen = uint32(valLen)
851 1 : if r.vbiBlock == nil {
852 1 : ch, err := r.bpOpen.readBlockForVBR(r.vbih.h, r.stats)
853 1 : if err != nil {
854 0 : return nil, err
855 0 : }
856 1 : r.vbiCache = ch
857 1 : r.vbiBlock = ch.Get()
858 : }
859 1 : if r.valueBlock == nil || r.valueBlockNum != vh.blockNum {
860 1 : vbh, err := r.getBlockHandle(vh.blockNum)
861 1 : if err != nil {
862 0 : return nil, err
863 0 : }
864 1 : vbCacheHandle, err := r.bpOpen.readBlockForVBR(vbh, r.stats)
865 1 : if err != nil {
866 0 : return nil, err
867 0 : }
868 1 : r.valueBlockNum = vh.blockNum
869 1 : r.valueCache.Release()
870 1 : r.valueCache = vbCacheHandle
871 1 : r.valueBlock = vbCacheHandle.Get()
872 1 : r.valueBlockPtr = unsafe.Pointer(&r.valueBlock[0])
873 : }
874 1 : if r.stats != nil {
875 1 : r.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen)
876 1 : }
877 1 : return r.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil
878 : }
879 :
880 1 : func (r *valueBlockReader) getBlockHandle(blockNum uint32) (block.Handle, error) {
881 1 : indexEntryLen :=
882 1 : int(r.vbih.blockNumByteLength + r.vbih.blockOffsetByteLength + r.vbih.blockLengthByteLength)
883 1 : offsetInIndex := indexEntryLen * int(blockNum)
884 1 : if len(r.vbiBlock) < offsetInIndex+indexEntryLen {
885 0 : return block.Handle{}, base.AssertionFailedf(
886 0 : "index entry out of bounds: offset %d length %d block length %d",
887 0 : offsetInIndex, indexEntryLen, len(r.vbiBlock))
888 0 : }
889 1 : b := r.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen]
890 1 : n := int(r.vbih.blockNumByteLength)
891 1 : bn := littleEndianGet(b, n)
892 1 : if uint32(bn) != blockNum {
893 0 : return block.Handle{},
894 0 : errors.Errorf("expected block num %d but found %d", blockNum, bn)
895 0 : }
896 1 : b = b[n:]
897 1 : n = int(r.vbih.blockOffsetByteLength)
898 1 : blockOffset := littleEndianGet(b, n)
899 1 : b = b[n:]
900 1 : n = int(r.vbih.blockLengthByteLength)
901 1 : blockLen := littleEndianGet(b, n)
902 1 : return block.Handle{Offset: blockOffset, Length: blockLen}, nil
903 : }
|