Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 : "encoding/binary"
10 : "math/rand/v2"
11 : "sync"
12 : "unsafe"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
18 : "github.com/cockroachdb/pebble/sstable/block"
19 : )
20 :
21 : // Value blocks are supported in TableFormatPebblev3.
22 : //
23 : // 1. Motivation and overview
24 : //
25 : // Value blocks are a mechanism designed for sstables storing MVCC data, where
26 : // there can be many versions of a key that need to be kept, but only the
27 : // latest value is typically read (see the documentation for Comparer.Split
28 : // regarding MVCC keys). The goal is faster reads. Unlike Pebble versions,
29 : // which can be eagerly thrown away (except when there are snapshots), MVCC
30 : // versions are long-lived (e.g. default CockroachDB garbage collection
31 : // threshold for older versions is 24 hours) and can significantly slow down
32 : // reads. We have seen CockroachDB production workloads with very slow reads
33 : // due to:
34 : // - 100s of versions for each key in a table.
35 : //
36 : // - Tables with mostly MVCC garbage consisting of 2 versions per key -- a
37 : // real key-value pair, followed by a key-value pair whose value (usually
38 : // with zero byte length) indicates it is an MVCC tombstone.
39 : //
40 : // The value blocks mechanism attempts to improve read throughput in these
41 : // cases when the key size is smaller than the value sizes of older versions.
42 : // This is done by moving the value of an older version to a value block in a
43 : // different part of the sstable. This improves spatial locality of the data
44 : // being read by the workload, which increases caching effectiveness.
45 : //
46 : // Additionally, even when the key size is not smaller than the value of older
47 : // versions (e.g. secondary indexes in CockroachDB), TableFormatPebblev3
48 : // stores the result of key comparisons done at write time inside the sstable,
49 : // which makes stepping from one key prefix to the next prefix (i.e., skipping
50 : // over older versions of a MVCC key) more efficient by avoiding key
51 : // comparisons and key decoding. See the results in
52 : // https://github.com/cockroachdb/pebble/pull/2149 and more details in the
53 : // comment inside BenchmarkIteratorScanNextPrefix. These improvements are also
54 : // visible in end-to-end CockroachDB tests, as outlined in
55 : // https://github.com/cockroachdb/cockroach/pull/96652.
56 : //
57 : // In TableFormatPebblev3, each SET has a one byte value prefix that tells us
58 : // whether the value is in-place or in a value block. This 1 byte prefix
59 : // encodes additional information:
60 : //
61 : // - ShortAttribute: This is an attribute of the value. Currently, CockroachDB
62 : // uses it to represent whether the value is a tombstone or not. This avoids
63 : // the need to fetch a value from the value block if the caller only wants
64 : // to figure out whether it is an MVCC tombstone. The length of the value is
65 : // another attribute that the caller can be interested in, and it is also
66 : // accessible without reading the value in the value block (see the value
67 : // handle in the details section).
68 : //
69 : // - SET-same-prefix: this enables the aforementioned optimization when
70 : // stepping from one key prefix to the next key prefix.
71 : //
72 : // We further optimize this iteration over prefixes by using the restart
73 : // points in a block to encode whether the SET at a restart point has the same
74 : // prefix since the last restart point. This allows us to skip over restart
75 : // points within the same block. See the comment in blockWriter, and how both
76 : // SET-same-prefix and the restart point information is used in
77 : // blockIter.nextPrefixV3.
78 : //
79 : // This flexibility of values that are in-place or in value blocks requires
80 : // flexibility in the iterator interface. The InternalIterator interface
81 : // returns a LazyValue instead of a byte slice. Additionally, pebble.Iterator
82 : // allows the caller to ask for a LazyValue. See lazy_value.go for details,
83 : // including the memory lifetime management.
84 : //
85 : // For historical discussions about this feature, see the issue
86 : // https://github.com/cockroachdb/pebble/issues/1170 and the prototype in
87 : // https://github.com/cockroachdb/pebble/pull/1443.
88 : //
89 : // The code in this file mainly covers value block and related encodings. We
90 : // discuss these in the next section.
91 : //
92 : // 2. Details
93 : //
94 : // Note that the notion of the latest value is local to the sstable. It is
95 : // possible that that latest value has been deleted by a sstable in a higher
96 : // level, and what is the latest value from the perspective of the whole LSM
97 : // is an older MVCC version. This only affects performance and not
98 : // correctness. This local knowledge is also why we continue to store these
99 : // older versions in the same sstable -- we need to be able to conveniently
100 : // read them. The code in this file is agnostic to the policy regarding what
101 : // should be stored in value blocks -- it allows even the latest MVCC version
102 : // to be stored in a value block. The policy decision in made in the
103 : // sstable.Writer. See Writer.makeAddPointDecisionV3.
104 : //
105 : // Data blocks contain two kinds of SET keys: those with in-place values and
106 : // those with a value handle. To distinguish these two cases we use a single
107 : // byte prefix (ValuePrefix). This single byte prefix is split into multiple
108 : // parts, where nb represents information that is encoded in n bits.
109 : //
110 : // +---------------+--------------------+-----------+--------------------+
111 : // | value-kind 2b | SET-same-prefix 1b | unused 2b | short-attribute 3b |
112 : // +---------------+--------------------+-----------+--------------------+
113 : //
114 : // The 2 bit value-kind specifies whether this is an in-place value or a value
115 : // handle pointing to a value block. We use 2 bits here for future
116 : // representation of values that are in separate files. The 1 bit
117 : // SET-same-prefix is true if this key is a SET and is immediately preceded by
118 : // a SET that shares the same prefix. The 3 bit short-attribute is described
119 : // in base.ShortAttribute -- it stores user-defined attributes about the
120 : // value. It is unused for in-place values.
121 : //
122 : // Value Handle and Value Blocks:
123 : // valueHandles refer to values in value blocks. Value blocks are simpler than
124 : // normal data blocks (that contain key-value pairs, and allow for binary
125 : // search), which makes them cheap for value retrieval purposes. A valueHandle
126 : // is a tuple (valueLen, blockNum, offsetInBlock), where blockNum is the 0
127 : // indexed value block number and offsetInBlock is the byte offset in that
128 : // block containing the value. The valueHandle.valueLen is included since
129 : // there are multiple use cases in CockroachDB that need the value length but
130 : // not the value, for which we can avoid reading the value in the value block
131 : // (see
132 : // https://github.com/cockroachdb/pebble/issues/1170#issuecomment-958203245).
133 : //
134 : // A value block has a checksum like other blocks, and is optionally
135 : // compressed. An uncompressed value block is a sequence of values with no
136 : // separator or length (we rely on the valueHandle to demarcate). The
137 : // valueHandle.offsetInBlock points to the value, of length
138 : // valueHandle.valueLen. While writing a sstable, all the (possibly
139 : // compressed) value blocks need to be held in-memory until they can be
140 : // written. Value blocks are placed after the "meta rangedel" and "meta range
141 : // key" blocks since value blocks are considered less likely to be read.
142 : //
143 : // Meta Value Index Block:
144 : // Since the (key, valueHandle) pair are written before there is any knowledge
145 : // of the byte offset of the value block in the file, or its compressed
146 : // length, we need another lookup to map the valueHandle.blockNum to the
147 : // information needed to read it from the file. This information is provided
148 : // by the "value index block". The "value index block" is referred to by the
149 : // metaindex block. The design intentionally avoids making the "value index
150 : // block" a general purpose key-value block, since each caller wants to lookup
151 : // the information for a particular blockNum (there is no need for SeekGE
152 : // etc.). Instead, this index block stores a sequence of (blockNum,
153 : // blockOffset, blockLength) tuples, where the blockNums are consecutive
154 : // integers, and the tuples are encoded with a fixed width encoding. This
155 : // allows a reader to find the tuple for block K by looking at the offset
156 : // K*fixed-width. The fixed width for each field is decided by looking at the
157 : // maximum value of each of these fields. As a concrete example of a large
158 : // sstable with many value blocks, we constructed a 100MB sstable with many
159 : // versions and had 2475 value blocks (~32KB each). This sstable had this
160 : // tuple encoded using 2+4+2=8 bytes, which means the uncompressed value index
161 : // block was 2475*8=~19KB, which is modest. Therefore, we don't support more
162 : // than one value index block. Consider the example of 2 byte blockNum, 4 byte
163 : // blockOffset and 2 byte blockLen. The value index block will look like:
164 : //
165 : // +---------------+------------------+---------------+
166 : // | blockNum (2B) | blockOffset (4B) | blockLen (2B) |
167 : // +---------------+------------------+---------------+
168 : // | 0 | 7,123,456 | 30,000 |
169 : // +---------------+------------------+---------------+
170 : // | 1 | 7,153,456 | 20,000 |
171 : // +---------------+------------------+---------------+
172 : // | 2 | 7,173,456 | 25,567 |
173 : // +---------------+------------------+---------------+
174 : // | .... | ... | ... |
175 : //
176 : //
177 : // The metaindex block contains the valueBlocksIndexHandle which in addition
178 : // to the BlockHandle also specifies the widths of these tuple fields. In the
179 : // above example, the
180 : // valueBlockIndexHandle.{blockNumByteLength,blockOffsetByteLength,blockLengthByteLength}
181 : // will be (2,4,2).
182 :
183 : // valueHandle is stored with a key when the value is in a value block. This
184 : // handle is the pointer to that value.
185 : type valueHandle struct {
186 : valueLen uint32
187 : blockNum uint32
188 : offsetInBlock uint32
189 : }
190 :
191 : // valueHandle fields are varint encoded, so maximum 5 bytes each, plus 1 byte
192 : // for the valuePrefix. This could alternatively be group varint encoded, but
193 : // experiments were inconclusive
194 : // (https://github.com/cockroachdb/pebble/pull/1443#issuecomment-1270298802).
195 : const valueHandleMaxLen = 5*3 + 1
196 :
197 : // Assert blockHandleLikelyMaxLen >= valueHandleMaxLen.
198 : const _ = uint(blockHandleLikelyMaxLen - valueHandleMaxLen)
199 :
200 1 : func encodeValueHandle(dst []byte, v valueHandle) int {
201 1 : n := 0
202 1 : n += binary.PutUvarint(dst[n:], uint64(v.valueLen))
203 1 : n += binary.PutUvarint(dst[n:], uint64(v.blockNum))
204 1 : n += binary.PutUvarint(dst[n:], uint64(v.offsetInBlock))
205 1 : return n
206 1 : }
207 :
208 1 : func decodeLenFromValueHandle(src []byte) (uint32, []byte) {
209 1 : ptr := unsafe.Pointer(&src[0])
210 1 : var v uint32
211 1 : if a := *((*uint8)(ptr)); a < 128 {
212 1 : v = uint32(a)
213 1 : src = src[1:]
214 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
215 0 : v = uint32(b)<<7 | uint32(a)
216 0 : src = src[2:]
217 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
218 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
219 0 : src = src[3:]
220 1 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
221 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
222 0 : src = src[4:]
223 1 : } else {
224 1 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
225 1 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
226 1 : src = src[5:]
227 1 : }
228 1 : return v, src
229 : }
230 :
231 1 : func decodeRemainingValueHandle(src []byte) valueHandle {
232 1 : var vh valueHandle
233 1 : ptr := unsafe.Pointer(&src[0])
234 1 : // Manually inlined uvarint decoding. Saves ~25% in benchmarks. Unrolling
235 1 : // a loop for i:=0; i<2; i++, saves ~6%.
236 1 : var v uint32
237 1 : if a := *((*uint8)(ptr)); a < 128 {
238 1 : v = uint32(a)
239 1 : ptr = unsafe.Pointer(uintptr(ptr) + 1)
240 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
241 0 : v = uint32(b)<<7 | uint32(a)
242 0 : ptr = unsafe.Pointer(uintptr(ptr) + 2)
243 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
244 1 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
245 1 : ptr = unsafe.Pointer(uintptr(ptr) + 3)
246 1 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
247 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
248 0 : ptr = unsafe.Pointer(uintptr(ptr) + 4)
249 1 : } else {
250 1 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
251 1 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
252 1 : ptr = unsafe.Pointer(uintptr(ptr) + 5)
253 1 : }
254 1 : vh.blockNum = v
255 1 :
256 1 : if a := *((*uint8)(ptr)); a < 128 {
257 1 : v = uint32(a)
258 1 : } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
259 1 : v = uint32(b)<<7 | uint32(a)
260 1 : } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
261 0 : v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
262 1 : } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
263 0 : v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
264 1 : } else {
265 1 : d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
266 1 : v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
267 1 : }
268 1 : vh.offsetInBlock = v
269 1 :
270 1 : return vh
271 : }
272 :
273 1 : func decodeValueHandle(src []byte) valueHandle {
274 1 : valLen, src := decodeLenFromValueHandle(src)
275 1 : vh := decodeRemainingValueHandle(src)
276 1 : vh.valueLen = valLen
277 1 : return vh
278 1 : }
279 :
280 : // valueBlocksIndexHandle is placed in the metaindex if there are any value
281 : // blocks. If there are no value blocks, there is no value blocks index, and
282 : // no entry in the metaindex. Note that the lack of entry in the metaindex
283 : // should not be used to ascertain whether the values are prefixed, since the
284 : // former is an emergent property of the data that was written and not known
285 : // until all the key-value pairs in the sstable are written.
286 : type valueBlocksIndexHandle struct {
287 : h block.Handle
288 : blockNumByteLength uint8
289 : blockOffsetByteLength uint8
290 : blockLengthByteLength uint8
291 : }
292 :
293 : const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3
294 :
295 : // Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen.
296 : const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen)
297 :
298 1 : func encodeValueBlocksIndexHandle(dst []byte, v valueBlocksIndexHandle) int {
299 1 : n := v.h.EncodeVarints(dst)
300 1 : dst[n] = v.blockNumByteLength
301 1 : n++
302 1 : dst[n] = v.blockOffsetByteLength
303 1 : n++
304 1 : dst[n] = v.blockLengthByteLength
305 1 : n++
306 1 : return n
307 1 : }
308 :
309 1 : func decodeValueBlocksIndexHandle(src []byte) (valueBlocksIndexHandle, int, error) {
310 1 : var vbih valueBlocksIndexHandle
311 1 : var n int
312 1 : vbih.h, n = block.DecodeHandle(src)
313 1 : if n <= 0 {
314 0 : return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
315 0 : }
316 1 : if len(src) != n+3 {
317 0 : return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
318 0 : }
319 1 : vbih.blockNumByteLength = src[n]
320 1 : vbih.blockOffsetByteLength = src[n+1]
321 1 : vbih.blockLengthByteLength = src[n+2]
322 1 : return vbih, n + 3, nil
323 : }
324 :
325 : type valueBlocksAndIndexStats struct {
326 : numValueBlocks uint64
327 : numValuesInValueBlocks uint64
328 : // Includes both value blocks and value index block.
329 : valueBlocksAndIndexSize uint64
330 : }
331 :
332 : // valueBlockWriter writes a sequence of value blocks, and the value blocks
333 : // index, for a sstable.
334 : type valueBlockWriter struct {
335 : flush block.FlushGovernor
336 : // Configured compression.
337 : compression block.Compression
338 : // checksummer with configured checksum type.
339 : checksummer block.Checksummer
340 : // Block finished callback.
341 : blockFinishedFunc func(compressedSize int)
342 :
343 : // buf is the current block being written to (uncompressed).
344 : buf *blockBuffer
345 : // compressedBuf is used for compressing the block.
346 : compressedBuf *blockBuffer
347 : // Sequence of blocks that are finished.
348 : blocks []bufferedValueBlock
349 : // Cumulative value block bytes written so far.
350 : totalBlockBytes uint64
351 : numValues uint64
352 : }
353 :
354 : type bufferedValueBlock struct {
355 : block block.PhysicalBlock
356 : buffer *blockBuffer
357 : handle block.Handle
358 : }
359 :
360 : type blockBuffer struct {
361 : b []byte
362 : }
363 :
364 : // Pool of block buffers that should be roughly the blockSize.
365 : var uncompressedValueBlockBufPool = sync.Pool{
366 1 : New: func() interface{} {
367 1 : return &blockBuffer{}
368 1 : },
369 : }
370 :
371 : // Pool of block buffers for compressed value blocks. These may widely vary in
372 : // size based on compression ratios.
373 : var compressedValueBlockBufPool = sync.Pool{
374 1 : New: func() interface{} {
375 1 : return &blockBuffer{}
376 1 : },
377 : }
378 :
379 1 : func releaseToValueBlockBufPool(pool *sync.Pool, b *blockBuffer) {
380 1 : // Don't pool buffers larger than 128KB, in case we had some rare large
381 1 : // values.
382 1 : if len(b.b) > 128*1024 {
383 0 : return
384 0 : }
385 1 : if invariants.Enabled {
386 1 : // Set the bytes to a random value. Cap the number of bytes being
387 1 : // randomized to prevent test timeouts.
388 1 : length := cap(b.b)
389 1 : if length > 1000 {
390 1 : length = 1000
391 1 : }
392 1 : b.b = b.b[:length:length]
393 1 : for j := range b.b {
394 1 : b.b[j] = byte(rand.Uint32())
395 1 : }
396 : }
397 1 : pool.Put(b)
398 : }
399 :
400 : var valueBlockWriterPool = sync.Pool{
401 1 : New: func() interface{} {
402 1 : return &valueBlockWriter{}
403 1 : },
404 : }
405 :
406 : func newValueBlockWriter(
407 : flushGovernor block.FlushGovernor,
408 : compression block.Compression,
409 : checksumType block.ChecksumType,
410 : // compressedSize should exclude the block trailer.
411 : blockFinishedFunc func(compressedSize int),
412 1 : ) *valueBlockWriter {
413 1 : w := valueBlockWriterPool.Get().(*valueBlockWriter)
414 1 : *w = valueBlockWriter{
415 1 : flush: flushGovernor,
416 1 : compression: compression,
417 1 : checksummer: block.Checksummer{
418 1 : Type: checksumType,
419 1 : },
420 1 : blockFinishedFunc: blockFinishedFunc,
421 1 : buf: uncompressedValueBlockBufPool.Get().(*blockBuffer),
422 1 : compressedBuf: compressedValueBlockBufPool.Get().(*blockBuffer),
423 1 : blocks: w.blocks[:0],
424 1 : }
425 1 : w.buf.b = w.buf.b[:0]
426 1 : w.compressedBuf.b = w.compressedBuf.b[:0]
427 1 : return w
428 1 : }
429 :
430 1 : func releaseValueBlockWriter(w *valueBlockWriter) {
431 1 : for i := range w.blocks {
432 1 : if w.blocks[i].block.IsCompressed() {
433 1 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.blocks[i].buffer)
434 1 : } else {
435 1 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.blocks[i].buffer)
436 1 : }
437 1 : w.blocks[i].buffer = nil
438 : }
439 1 : if w.buf != nil {
440 1 : releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.buf)
441 1 : }
442 1 : if w.compressedBuf != nil {
443 1 : releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.compressedBuf)
444 1 : }
445 1 : *w = valueBlockWriter{
446 1 : blocks: w.blocks[:0],
447 1 : }
448 1 : valueBlockWriterPool.Put(w)
449 : }
450 :
451 1 : func (w *valueBlockWriter) addValue(v []byte) (valueHandle, error) {
452 1 : if invariants.Enabled && len(v) == 0 {
453 0 : return valueHandle{}, errors.Errorf("cannot write empty value to value block")
454 0 : }
455 1 : w.numValues++
456 1 : blockLen := len(w.buf.b)
457 1 : valueLen := len(v)
458 1 : if w.flush.ShouldFlush(blockLen, blockLen+valueLen) {
459 1 : // Block is not currently empty and adding this value will become too big,
460 1 : // so finish this block.
461 1 : w.compressAndFlush()
462 1 : blockLen = len(w.buf.b)
463 1 : if invariants.Enabled && blockLen != 0 {
464 0 : panic("blockLen of new block should be 0")
465 : }
466 : }
467 1 : vh := valueHandle{
468 1 : valueLen: uint32(valueLen),
469 1 : blockNum: uint32(len(w.blocks)),
470 1 : offsetInBlock: uint32(blockLen),
471 1 : }
472 1 : blockLen = int(vh.offsetInBlock + vh.valueLen)
473 1 : if cap(w.buf.b) < blockLen {
474 1 : size := 2 * cap(w.buf.b)
475 1 : if size < 1024 {
476 1 : size = 1024
477 1 : }
478 1 : for size < blockLen {
479 0 : size *= 2
480 0 : }
481 1 : buf := make([]byte, blockLen, size)
482 1 : _ = copy(buf, w.buf.b)
483 1 : w.buf.b = buf
484 1 : } else {
485 1 : w.buf.b = w.buf.b[:blockLen]
486 1 : }
487 1 : buf := w.buf.b[vh.offsetInBlock:]
488 1 : n := copy(buf, v)
489 1 : if n != len(buf) {
490 0 : panic("incorrect length computation")
491 : }
492 1 : return vh, nil
493 : }
494 :
495 1 : func (w *valueBlockWriter) compressAndFlush() {
496 1 : w.compressedBuf.b = w.compressedBuf.b[:cap(w.compressedBuf.b)]
497 1 : physicalBlock := block.CompressAndChecksum(&w.compressedBuf.b, w.buf.b, w.compression, &w.checksummer)
498 1 : bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(physicalBlock.LengthWithoutTrailer())}
499 1 : w.totalBlockBytes += uint64(physicalBlock.LengthWithTrailer())
500 1 : // blockFinishedFunc length excludes the block trailer.
501 1 : w.blockFinishedFunc(physicalBlock.LengthWithoutTrailer())
502 1 : b := bufferedValueBlock{
503 1 : block: physicalBlock,
504 1 : handle: bh,
505 1 : }
506 1 :
507 1 : // We'll hand off a buffer to w.blocks and get a new one. Which buffer we're
508 1 : // handing off depends on the outcome of compression.
509 1 : if physicalBlock.IsCompressed() {
510 1 : b.buffer = w.compressedBuf
511 1 : w.compressedBuf = compressedValueBlockBufPool.Get().(*blockBuffer)
512 1 : } else {
513 1 : b.buffer = w.buf
514 1 : w.buf = uncompressedValueBlockBufPool.Get().(*blockBuffer)
515 1 : }
516 1 : w.blocks = append(w.blocks, b)
517 1 : w.buf.b = w.buf.b[:0]
518 : }
519 :
520 1 : func (w *valueBlockWriter) computeChecksum(blk []byte) {
521 1 : n := len(blk) - block.TrailerLen
522 1 : checksum := w.checksummer.Checksum(blk[:n], blk[n])
523 1 : binary.LittleEndian.PutUint32(blk[n+1:], checksum)
524 1 : }
525 :
526 : func (w *valueBlockWriter) finish(
527 : layout *layoutWriter, fileOffset uint64,
528 1 : ) (valueBlocksIndexHandle, valueBlocksAndIndexStats, error) {
529 1 : if len(w.buf.b) > 0 {
530 1 : w.compressAndFlush()
531 1 : }
532 1 : n := len(w.blocks)
533 1 : if n == 0 {
534 1 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, nil
535 1 : }
536 1 : largestOffset := uint64(0)
537 1 : largestLength := uint64(0)
538 1 : for i := range w.blocks {
539 1 : _, err := layout.WriteValueBlock(w.blocks[i].block)
540 1 : if err != nil {
541 0 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
542 0 : }
543 1 : w.blocks[i].handle.Offset += fileOffset
544 1 : largestOffset = w.blocks[i].handle.Offset
545 1 : if largestLength < w.blocks[i].handle.Length {
546 1 : largestLength = w.blocks[i].handle.Length
547 1 : }
548 : }
549 1 : vbihOffset := fileOffset + w.totalBlockBytes
550 1 :
551 1 : vbih := valueBlocksIndexHandle{
552 1 : h: block.Handle{
553 1 : Offset: vbihOffset,
554 1 : },
555 1 : blockNumByteLength: uint8(lenLittleEndian(uint64(n - 1))),
556 1 : blockOffsetByteLength: uint8(lenLittleEndian(largestOffset)),
557 1 : blockLengthByteLength: uint8(lenLittleEndian(largestLength)),
558 1 : }
559 1 : var err error
560 1 : if n > 0 {
561 1 : if vbih, err = w.writeValueBlocksIndex(layout, vbih); err != nil {
562 0 : return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
563 0 : }
564 : }
565 1 : stats := valueBlocksAndIndexStats{
566 1 : numValueBlocks: uint64(n),
567 1 : numValuesInValueBlocks: w.numValues,
568 1 : valueBlocksAndIndexSize: w.totalBlockBytes + vbih.h.Length + block.TrailerLen,
569 1 : }
570 1 : return vbih, stats, err
571 : }
572 :
573 : func (w *valueBlockWriter) writeValueBlocksIndex(
574 : layout *layoutWriter, h valueBlocksIndexHandle,
575 1 : ) (valueBlocksIndexHandle, error) {
576 1 : blockLen :=
577 1 : int(h.blockNumByteLength+h.blockOffsetByteLength+h.blockLengthByteLength) * len(w.blocks)
578 1 : h.h.Length = uint64(blockLen)
579 1 : blockLen += block.TrailerLen
580 1 : var buf []byte
581 1 : if cap(w.buf.b) < blockLen {
582 1 : buf = make([]byte, blockLen)
583 1 : w.buf.b = buf
584 1 : } else {
585 1 : buf = w.buf.b[:blockLen]
586 1 : }
587 1 : b := buf
588 1 : for i := range w.blocks {
589 1 : littleEndianPut(uint64(i), b, int(h.blockNumByteLength))
590 1 : b = b[int(h.blockNumByteLength):]
591 1 : littleEndianPut(w.blocks[i].handle.Offset, b, int(h.blockOffsetByteLength))
592 1 : b = b[int(h.blockOffsetByteLength):]
593 1 : littleEndianPut(w.blocks[i].handle.Length, b, int(h.blockLengthByteLength))
594 1 : b = b[int(h.blockLengthByteLength):]
595 1 : }
596 1 : if len(b) != block.TrailerLen {
597 0 : panic("incorrect length calculation")
598 : }
599 1 : b[0] = byte(block.NoCompressionIndicator)
600 1 : w.computeChecksum(buf)
601 1 : if _, err := layout.WriteValueIndexBlock(buf, h); err != nil {
602 0 : return valueBlocksIndexHandle{}, err
603 0 : }
604 1 : return h, nil
605 : }
606 :
607 : // littleEndianPut writes v to b using little endian encoding, under the
608 : // assumption that v can be represented using n bytes.
609 1 : func littleEndianPut(v uint64, b []byte, n int) {
610 1 : _ = b[n-1] // bounds check
611 1 : for i := 0; i < n; i++ {
612 1 : b[i] = byte(v)
613 1 : v = v >> 8
614 1 : }
615 : }
616 :
617 : // lenLittleEndian returns the minimum number of bytes needed to encode v
618 : // using little endian encoding.
619 1 : func lenLittleEndian(v uint64) int {
620 1 : n := 0
621 1 : for i := 0; i < 8; i++ {
622 1 : n++
623 1 : v = v >> 8
624 1 : if v == 0 {
625 1 : break
626 : }
627 : }
628 1 : return n
629 : }
630 :
631 1 : func littleEndianGet(b []byte, n int) uint64 {
632 1 : _ = b[n-1] // bounds check
633 1 : v := uint64(b[0])
634 1 : for i := 1; i < n; i++ {
635 1 : v |= uint64(b[i]) << (8 * i)
636 1 : }
637 1 : return v
638 : }
639 :
640 : // UserKeyPrefixBound represents a [Lower,Upper) bound of user key prefixes.
641 : // If both are nil, there is no bound specified. Else, Compare(Lower,Upper)
642 : // must be < 0.
643 : type UserKeyPrefixBound struct {
644 : // Lower is a lower bound user key prefix.
645 : Lower []byte
646 : // Upper is an upper bound user key prefix.
647 : Upper []byte
648 : }
649 :
650 : // IsEmpty returns true iff the bound is empty.
651 1 : func (ukb *UserKeyPrefixBound) IsEmpty() bool {
652 1 : return len(ukb.Lower) == 0 && len(ukb.Upper) == 0
653 1 : }
654 :
655 : type blockProviderWhenOpen interface {
656 : readBlockForVBR(
657 : h block.Handle, stats *base.InternalIteratorStats,
658 : ) (block.BufferHandle, error)
659 : }
660 :
661 : type blockProviderWhenClosed struct {
662 : rp ReaderProvider
663 : r *Reader
664 : }
665 :
666 1 : func (bpwc *blockProviderWhenClosed) open(ctx context.Context) error {
667 1 : var err error
668 1 : bpwc.r, err = bpwc.rp.GetReader(ctx)
669 1 : return err
670 1 : }
671 :
672 1 : func (bpwc *blockProviderWhenClosed) close() {
673 1 : bpwc.rp.Close()
674 1 : bpwc.r = nil
675 1 : }
676 :
677 : func (bpwc blockProviderWhenClosed) readBlockForVBR(
678 : h block.Handle, stats *base.InternalIteratorStats,
679 1 : ) (block.BufferHandle, error) {
680 1 : // This is rare, since most block reads happen when the corresponding
681 1 : // sstable iterator is open. So we are willing to sacrifice a proper context
682 1 : // for tracing.
683 1 : //
684 1 : // TODO(sumeer): consider fixing this. See
685 1 : // https://github.com/cockroachdb/pebble/pull/3065#issue-1991175365 for an
686 1 : // alternative.
687 1 : ctx := objiotracing.WithBlockType(context.Background(), objiotracing.ValueBlock)
688 1 : // TODO(jackson,sumeer): Consider whether to use a buffer pool in this case.
689 1 : // The bpwc is not allowed to outlive the iterator tree, so it cannot
690 1 : // outlive the buffer pool.
691 1 : return bpwc.r.readValueBlock(ctx, noEnv, noReadHandle, h)
692 1 : }
693 :
694 : // ReaderProvider supports the implementation of blockProviderWhenClosed.
695 : // GetReader and Close can be called multiple times in pairs.
696 : type ReaderProvider interface {
697 : GetReader(ctx context.Context) (r *Reader, err error)
698 : Close()
699 : }
700 :
701 : // MakeTrivialReaderProvider creates a ReaderProvider which always returns the
702 : // given reader. It should be used when the Reader will outlive the iterator
703 : // tree.
704 1 : func MakeTrivialReaderProvider(r *Reader) ReaderProvider {
705 1 : return (*trivialReaderProvider)(r)
706 1 : }
707 :
708 : // trivialReaderProvider implements ReaderProvider for a Reader that will
709 : // outlive the top-level iterator in the iterator tree.
710 : //
711 : // Defining the type in this manner (as opposed to a struct) avoids allocation.
712 : type trivialReaderProvider Reader
713 :
714 : var _ ReaderProvider = (*trivialReaderProvider)(nil)
715 :
716 : // GetReader implements ReaderProvider.
717 1 : func (trp *trivialReaderProvider) GetReader(ctx context.Context) (*Reader, error) {
718 1 : return (*Reader)(trp), nil
719 1 : }
720 :
721 : // Close implements ReaderProvider.
722 1 : func (trp *trivialReaderProvider) Close() {}
723 :
724 : // valueBlockReader implements GetLazyValueForPrefixAndValueHandler; it is used
725 : // to create LazyValues (each of which can can be used to retrieve a value in a
726 : // value block). It is used when the sstable was written with
727 : // Properties.ValueBlocksAreEnabled. The lifetime of this object is tied to the
728 : // lifetime of the sstable iterator.
729 : type valueBlockReader struct {
730 : bpOpen blockProviderWhenOpen
731 : rp ReaderProvider
732 : vbih valueBlocksIndexHandle
733 : stats *base.InternalIteratorStats
734 : // fetcher is allocated lazily the first time we create a LazyValue, in order
735 : // to avoid the allocation if we never read a lazy value (which should be the
736 : // case when we're reading the latest value of a key).
737 : fetcher *valueBlockFetcher
738 :
739 : // lazyFetcher is the LazyFetcher value embedded in any LazyValue that was returned.
740 : lazyFetcher base.LazyFetcher
741 : }
742 :
743 : var _ block.GetLazyValueForPrefixAndValueHandler = (*valueBlockReader)(nil)
744 :
745 1 : func (r *valueBlockReader) GetLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue {
746 1 : if r.fetcher == nil {
747 1 : // NB: we cannot avoid this allocation, since the valueBlockFetcher
748 1 : // can outlive the singleLevelIterator due to be being embedded in a
749 1 : // LazyValue.
750 1 : //
751 1 : // TODO(radu): since it is a relatively small object, we could allocate
752 1 : // multiple instances together, using a sync.Pool (each pool object would
753 1 : // contain an array of instances, a subset of which have been given out).
754 1 : r.fetcher = newValueBlockFetcher(r.bpOpen, r.rp, r.vbih, r.stats)
755 1 : }
756 1 : fetcher := &r.lazyFetcher
757 1 : valLen, h := decodeLenFromValueHandle(handle[1:])
758 1 : *fetcher = base.LazyFetcher{
759 1 : Fetcher: r.fetcher,
760 1 : Attribute: base.AttributeAndLen{
761 1 : ValueLen: int32(valLen),
762 1 : ShortAttribute: block.ValuePrefix(handle[0]).ShortAttribute(),
763 1 : },
764 1 : }
765 1 : if r.stats != nil {
766 1 : r.stats.SeparatedPointValue.Count++
767 1 : r.stats.SeparatedPointValue.ValueBytes += uint64(valLen)
768 1 : }
769 1 : return base.LazyValue{
770 1 : ValueOrHandle: h,
771 1 : Fetcher: fetcher,
772 1 : }
773 : }
774 :
775 1 : func (r *valueBlockReader) close() {
776 1 : r.bpOpen = nil
777 1 : if r.fetcher != nil {
778 1 : r.fetcher.close()
779 1 : r.fetcher = nil
780 1 : }
781 : }
782 :
783 : // valueBlockFetcher implements base.ValueFetcher and is used through LazyValue
784 : // to fetch a value from a value block. The lifetime of this object is not tied
785 : // to the lifetime of the iterator - a LazyValue can be accessed later.
786 : type valueBlockFetcher struct {
787 : bpOpen blockProviderWhenOpen
788 : rp ReaderProvider
789 : vbih valueBlocksIndexHandle
790 : stats *base.InternalIteratorStats
791 : // The value blocks index is lazily retrieved the first time the reader
792 : // needs to read a value that resides in a value block.
793 : vbiBlock []byte
794 : vbiCache block.BufferHandle
795 : // When sequentially iterating through all key-value pairs, the cost of
796 : // repeatedly getting a block that is already in the cache and releasing the
797 : // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the
798 : // last value block it retrieved, in case there is locality of access, and
799 : // this value block can be used for the next value retrieval.
800 : valueBlockNum uint32
801 : valueBlock []byte
802 : valueBlockPtr unsafe.Pointer
803 : valueCache block.BufferHandle
804 : closed bool
805 : bufToMangle []byte
806 : }
807 :
808 : var _ base.ValueFetcher = (*valueBlockFetcher)(nil)
809 :
810 : func newValueBlockFetcher(
811 : bpOpen blockProviderWhenOpen,
812 : rp ReaderProvider,
813 : vbih valueBlocksIndexHandle,
814 : stats *base.InternalIteratorStats,
815 1 : ) *valueBlockFetcher {
816 1 : return &valueBlockFetcher{
817 1 : bpOpen: bpOpen,
818 1 : rp: rp,
819 1 : vbih: vbih,
820 1 : stats: stats,
821 1 : }
822 1 : }
823 :
824 : // Fetch implements base.ValueFetcher.
825 : func (f *valueBlockFetcher) Fetch(
826 : ctx context.Context, handle []byte, valLen int32, buf []byte,
827 1 : ) (val []byte, callerOwned bool, err error) {
828 1 : if !f.closed {
829 1 : val, err := f.getValueInternal(handle, valLen)
830 1 : if invariants.Enabled {
831 1 : val = f.doValueMangling(val)
832 1 : }
833 1 : return val, false, err
834 : }
835 :
836 1 : bp := blockProviderWhenClosed{rp: f.rp}
837 1 : err = bp.open(ctx)
838 1 : if err != nil {
839 0 : return nil, false, err
840 0 : }
841 1 : defer bp.close()
842 1 : defer f.close()
843 1 : f.bpOpen = bp
844 1 : var v []byte
845 1 : v, err = f.getValueInternal(handle, valLen)
846 1 : if err != nil {
847 0 : return nil, false, err
848 0 : }
849 1 : buf = append(buf[:0], v...)
850 1 : return buf, true, nil
851 : }
852 :
853 1 : func (f *valueBlockFetcher) close() {
854 1 : f.vbiBlock = nil
855 1 : f.vbiCache.Release()
856 1 : // Set the handle to empty since Release does not nil the Handle.value. If
857 1 : // we were to reopen this valueBlockFetcher and retrieve the same
858 1 : // Handle.value from the cache, we don't want to accidentally unref it when
859 1 : // attempting to unref the old handle.
860 1 : f.vbiCache = block.BufferHandle{}
861 1 : f.valueBlock = nil
862 1 : f.valueBlockPtr = nil
863 1 : f.valueCache.Release()
864 1 : // See comment above.
865 1 : f.valueCache = block.BufferHandle{}
866 1 : f.closed = true
867 1 : // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be
868 1 : // implemented.
869 1 : }
870 :
871 : // doValueMangling attempts to uncover violations of the contract listed in
872 : // the declaration comment of LazyValue. It is expensive, hence only called
873 : // when invariants.Enabled.
874 1 : func (f *valueBlockFetcher) doValueMangling(v []byte) []byte {
875 1 : // Randomly set the bytes in the previous retrieved value to 0, since
876 1 : // property P1 only requires the valueBlockReader to maintain the memory of
877 1 : // one fetched value.
878 1 : if rand.IntN(2) == 0 {
879 1 : clear(f.bufToMangle)
880 1 : }
881 : // Store the current value in a new buffer for future mangling.
882 1 : f.bufToMangle = append([]byte(nil), v...)
883 1 : return f.bufToMangle
884 : }
885 :
886 1 : func (f *valueBlockFetcher) getValueInternal(handle []byte, valLen int32) (val []byte, err error) {
887 1 : vh := decodeRemainingValueHandle(handle)
888 1 : vh.valueLen = uint32(valLen)
889 1 : if f.vbiBlock == nil {
890 1 : ch, err := f.bpOpen.readBlockForVBR(f.vbih.h, f.stats)
891 1 : if err != nil {
892 0 : return nil, err
893 0 : }
894 1 : f.vbiCache = ch
895 1 : f.vbiBlock = ch.BlockData()
896 : }
897 1 : if f.valueBlock == nil || f.valueBlockNum != vh.blockNum {
898 1 : vbh, err := f.getBlockHandle(vh.blockNum)
899 1 : if err != nil {
900 0 : return nil, err
901 0 : }
902 1 : vbCacheHandle, err := f.bpOpen.readBlockForVBR(vbh, f.stats)
903 1 : if err != nil {
904 0 : return nil, err
905 0 : }
906 1 : f.valueBlockNum = vh.blockNum
907 1 : f.valueCache.Release()
908 1 : f.valueCache = vbCacheHandle
909 1 : f.valueBlock = vbCacheHandle.BlockData()
910 1 : f.valueBlockPtr = unsafe.Pointer(&f.valueBlock[0])
911 : }
912 1 : if f.stats != nil {
913 1 : f.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen)
914 1 : }
915 1 : return f.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil
916 : }
917 :
918 1 : func (f *valueBlockFetcher) getBlockHandle(blockNum uint32) (block.Handle, error) {
919 1 : indexEntryLen :=
920 1 : int(f.vbih.blockNumByteLength + f.vbih.blockOffsetByteLength + f.vbih.blockLengthByteLength)
921 1 : offsetInIndex := indexEntryLen * int(blockNum)
922 1 : if len(f.vbiBlock) < offsetInIndex+indexEntryLen {
923 0 : return block.Handle{}, base.AssertionFailedf(
924 0 : "index entry out of bounds: offset %d length %d block length %d",
925 0 : offsetInIndex, indexEntryLen, len(f.vbiBlock))
926 0 : }
927 1 : b := f.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen]
928 1 : n := int(f.vbih.blockNumByteLength)
929 1 : bn := littleEndianGet(b, n)
930 1 : if uint32(bn) != blockNum {
931 0 : return block.Handle{},
932 0 : errors.Errorf("expected block num %d but found %d", blockNum, bn)
933 0 : }
934 1 : b = b[n:]
935 1 : n = int(f.vbih.blockOffsetByteLength)
936 1 : blockOffset := littleEndianGet(b, n)
937 1 : b = b[n:]
938 1 : n = int(f.vbih.blockLengthByteLength)
939 1 : blockLen := littleEndianGet(b, n)
940 1 : return block.Handle{Offset: blockOffset, Length: blockLen}, nil
941 : }
|