LCOV - code coverage report
Current view: top level - pebble/sstable - value_block.go (source / functions) Hit Total Coverage
Test: 2024-06-28 08:15Z 3ef2e5b1 - tests + meta.lcov Lines: 436 484 90.1 %
Date: 2024-06-28 08:16:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "context"
       9             :         "encoding/binary"
      10             :         "sync"
      11             :         "unsafe"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      17             :         "github.com/cockroachdb/pebble/sstable/block"
      18             :         "golang.org/x/exp/rand"
      19             : )
      20             : 
      21             : // Value blocks are supported in TableFormatPebblev3.
      22             : //
      23             : // 1. Motivation and overview
      24             : //
      25             : // Value blocks are a mechanism designed for sstables storing MVCC data, where
      26             : // there can be many versions of a key that need to be kept, but only the
      27             : // latest value is typically read (see the documentation for Comparer.Split
      28             : // regarding MVCC keys). The goal is faster reads. Unlike Pebble versions,
      29             : // which can be eagerly thrown away (except when there are snapshots), MVCC
      30             : // versions are long-lived (e.g. default CockroachDB garbage collection
      31             : // threshold for older versions is 24 hours) and can significantly slow down
      32             : // reads. We have seen CockroachDB production workloads with very slow reads
      33             : // due to:
      34             : // - 100s of versions for each key in a table.
      35             : //
      36             : // - Tables with mostly MVCC garbage consisting of 2 versions per key -- a
      37             : //   real key-value pair, followed by a key-value pair whose value (usually
      38             : //   with zero byte length) indicates it is an MVCC tombstone.
      39             : //
      40             : // The value blocks mechanism attempts to improve read throughput in these
      41             : // cases when the key size is smaller than the value sizes of older versions.
      42             : // This is done by moving the value of an older version to a value block in a
      43             : // different part of the sstable. This improves spatial locality of the data
      44             : // being read by the workload, which increases caching effectiveness.
      45             : //
      46             : // Additionally, even when the key size is not smaller than the value of older
      47             : // versions (e.g. secondary indexes in CockroachDB), TableFormatPebblev3
      48             : // stores the result of key comparisons done at write time inside the sstable,
      49             : // which makes stepping from one key prefix to the next prefix (i.e., skipping
      50             : // over older versions of a MVCC key) more efficient by avoiding key
      51             : // comparisons and key decoding. See the results in
      52             : // https://github.com/cockroachdb/pebble/pull/2149 and more details in the
      53             : // comment inside BenchmarkIteratorScanNextPrefix. These improvements are also
      54             : // visible in end-to-end CockroachDB tests, as outlined in
      55             : // https://github.com/cockroachdb/cockroach/pull/96652.
      56             : //
      57             : // In TableFormatPebblev3, each SET has a one byte value prefix that tells us
      58             : // whether the value is in-place or in a value block. This 1 byte prefix
      59             : // encodes additional information:
      60             : //
      61             : // - ShortAttribute: This is an attribute of the value. Currently, CockroachDB
      62             : //   uses it to represent whether the value is a tombstone or not. This avoids
      63             : //   the need to fetch a value from the value block if the caller only wants
      64             : //   to figure out whether it is an MVCC tombstone. The length of the value is
      65             : //   another attribute that the caller can be interested in, and it is also
      66             : //   accessible without reading the value in the value block (see the value
      67             : //   handle in the details section).
      68             : //
      69             : // - SET-same-prefix: this enables the aforementioned optimization when
      70             : //   stepping from one key prefix to the next key prefix.
      71             : //
      72             : // We further optimize this iteration over prefixes by using the restart
      73             : // points in a block to encode whether the SET at a restart point has the same
      74             : // prefix since the last restart point. This allows us to skip over restart
      75             : // points within the same block. See the comment in blockWriter, and how both
      76             : // SET-same-prefix and the restart point information is used in
      77             : // blockIter.nextPrefixV3.
      78             : //
      79             : // This flexibility of values that are in-place or in value blocks requires
      80             : // flexibility in the iterator interface. The InternalIterator interface
      81             : // returns a LazyValue instead of a byte slice. Additionally, pebble.Iterator
      82             : // allows the caller to ask for a LazyValue. See lazy_value.go for details,
      83             : // including the memory lifetime management.
      84             : //
      85             : // For historical discussions about this feature, see the issue
      86             : // https://github.com/cockroachdb/pebble/issues/1170 and the prototype in
      87             : // https://github.com/cockroachdb/pebble/pull/1443.
      88             : //
      89             : // The code in this file mainly covers value block and related encodings. We
      90             : // discuss these in the next section.
      91             : //
      92             : // 2. Details
      93             : //
      94             : // Note that the notion of the latest value is local to the sstable. It is
      95             : // possible that that latest value has been deleted by a sstable in a higher
      96             : // level, and what is the latest value from the perspective of the whole LSM
      97             : // is an older MVCC version. This only affects performance and not
      98             : // correctness. This local knowledge is also why we continue to store these
      99             : // older versions in the same sstable -- we need to be able to conveniently
     100             : // read them. The code in this file is agnostic to the policy regarding what
     101             : // should be stored in value blocks -- it allows even the latest MVCC version
     102             : // to be stored in a value block. The policy decision in made in the
     103             : // sstable.Writer. See Writer.makeAddPointDecisionV3.
     104             : //
     105             : // Data blocks contain two kinds of SET keys: those with in-place values and
     106             : // those with a value handle. To distinguish these two cases we use a single
     107             : // byte prefix (ValuePrefix). This single byte prefix is split into multiple
     108             : // parts, where nb represents information that is encoded in n bits.
     109             : //
     110             : // +---------------+--------------------+-----------+--------------------+
     111             : // | value-kind 2b | SET-same-prefix 1b | unused 2b | short-attribute 3b |
     112             : // +---------------+--------------------+-----------+--------------------+
     113             : //
     114             : // The 2 bit value-kind specifies whether this is an in-place value or a value
     115             : // handle pointing to a value block. We use 2 bits here for future
     116             : // representation of values that are in separate files. The 1 bit
     117             : // SET-same-prefix is true if this key is a SET and is immediately preceded by
     118             : // a SET that shares the same prefix. The 3 bit short-attribute is described
     119             : // in base.ShortAttribute -- it stores user-defined attributes about the
     120             : // value. It is unused for in-place values.
     121             : //
     122             : // Value Handle and Value Blocks:
     123             : // valueHandles refer to values in value blocks. Value blocks are simpler than
     124             : // normal data blocks (that contain key-value pairs, and allow for binary
     125             : // search), which makes them cheap for value retrieval purposes. A valueHandle
     126             : // is a tuple (valueLen, blockNum, offsetInBlock), where blockNum is the 0
     127             : // indexed value block number and offsetInBlock is the byte offset in that
     128             : // block containing the value. The valueHandle.valueLen is included since
     129             : // there are multiple use cases in CockroachDB that need the value length but
     130             : // not the value, for which we can avoid reading the value in the value block
     131             : // (see
     132             : // https://github.com/cockroachdb/pebble/issues/1170#issuecomment-958203245).
     133             : //
     134             : // A value block has a checksum like other blocks, and is optionally
     135             : // compressed. An uncompressed value block is a sequence of values with no
     136             : // separator or length (we rely on the valueHandle to demarcate). The
     137             : // valueHandle.offsetInBlock points to the value, of length
     138             : // valueHandle.valueLen. While writing a sstable, all the (possibly
     139             : // compressed) value blocks need to be held in-memory until they can be
     140             : // written. Value blocks are placed after the "meta rangedel" and "meta range
     141             : // key" blocks since value blocks are considered less likely to be read.
     142             : //
     143             : // Meta Value Index Block:
     144             : // Since the (key, valueHandle) pair are written before there is any knowledge
     145             : // of the byte offset of the value block in the file, or its compressed
     146             : // length, we need another lookup to map the valueHandle.blockNum to the
     147             : // information needed to read it from the file. This information is provided
     148             : // by the "value index block". The "value index block" is referred to by the
     149             : // metaindex block. The design intentionally avoids making the "value index
     150             : // block" a general purpose key-value block, since each caller wants to lookup
     151             : // the information for a particular blockNum (there is no need for SeekGE
     152             : // etc.). Instead, this index block stores a sequence of (blockNum,
     153             : // blockOffset, blockLength) tuples, where the blockNums are consecutive
     154             : // integers, and the tuples are encoded with a fixed width encoding. This
     155             : // allows a reader to find the tuple for block K by looking at the offset
     156             : // K*fixed-width. The fixed width for each field is decided by looking at the
     157             : // maximum value of each of these fields. As a concrete example of a large
     158             : // sstable with many value blocks, we constructed a 100MB sstable with many
     159             : // versions and had 2475 value blocks (~32KB each). This sstable had this
     160             : // tuple encoded using 2+4+2=8 bytes, which means the uncompressed value index
     161             : // block was 2475*8=~19KB, which is modest. Therefore, we don't support more
     162             : // than one value index block. Consider the example of 2 byte blockNum, 4 byte
     163             : // blockOffset and 2 byte blockLen. The value index block will look like:
     164             : //
     165             : //   +---------------+------------------+---------------+
     166             : //   | blockNum (2B) | blockOffset (4B) | blockLen (2B) |
     167             : //   +---------------+------------------+---------------+
     168             : //   |       0       |    7,123,456     |  30,000       |
     169             : //   +---------------+------------------+---------------+
     170             : //   |       1       |    7,153,456     |  20,000       |
     171             : //   +---------------+------------------+---------------+
     172             : //   |       2       |    7,173,456     |  25,567       |
     173             : //   +---------------+------------------+---------------+
     174             : //   |     ....      |      ...         |    ...        |
     175             : //
     176             : //
     177             : // The metaindex block contains the valueBlocksIndexHandle which in addition
     178             : // to the BlockHandle also specifies the widths of these tuple fields. In the
     179             : // above example, the
     180             : // valueBlockIndexHandle.{blockNumByteLength,blockOffsetByteLength,blockLengthByteLength}
     181             : // will be (2,4,2).
     182             : 
     183             : // valueHandle is stored with a key when the value is in a value block. This
     184             : // handle is the pointer to that value.
     185             : type valueHandle struct {
     186             :         valueLen      uint32
     187             :         blockNum      uint32
     188             :         offsetInBlock uint32
     189             : }
     190             : 
     191             : // valueHandle fields are varint encoded, so maximum 5 bytes each, plus 1 byte
     192             : // for the valuePrefix. This could alternatively be group varint encoded, but
     193             : // experiments were inconclusive
     194             : // (https://github.com/cockroachdb/pebble/pull/1443#issuecomment-1270298802).
     195             : const valueHandleMaxLen = 5*3 + 1
     196             : 
     197             : // Assert blockHandleLikelyMaxLen >= valueHandleMaxLen.
     198             : const _ = uint(blockHandleLikelyMaxLen - valueHandleMaxLen)
     199             : 
     200           2 : func encodeValueHandle(dst []byte, v valueHandle) int {
     201           2 :         n := 0
     202           2 :         n += binary.PutUvarint(dst[n:], uint64(v.valueLen))
     203           2 :         n += binary.PutUvarint(dst[n:], uint64(v.blockNum))
     204           2 :         n += binary.PutUvarint(dst[n:], uint64(v.offsetInBlock))
     205           2 :         return n
     206           2 : }
     207             : 
     208           2 : func decodeLenFromValueHandle(src []byte) (uint32, []byte) {
     209           2 :         ptr := unsafe.Pointer(&src[0])
     210           2 :         var v uint32
     211           2 :         if a := *((*uint8)(ptr)); a < 128 {
     212           2 :                 v = uint32(a)
     213           2 :                 src = src[1:]
     214           2 :         } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
     215           0 :                 v = uint32(b)<<7 | uint32(a)
     216           0 :                 src = src[2:]
     217           1 :         } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
     218           0 :                 v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     219           0 :                 src = src[3:]
     220           1 :         } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
     221           0 :                 v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     222           0 :                 src = src[4:]
     223           1 :         } else {
     224           1 :                 d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
     225           1 :                 v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     226           1 :                 src = src[5:]
     227           1 :         }
     228           2 :         return v, src
     229             : }
     230             : 
     231           2 : func decodeRemainingValueHandle(src []byte) valueHandle {
     232           2 :         var vh valueHandle
     233           2 :         ptr := unsafe.Pointer(&src[0])
     234           2 :         // Manually inlined uvarint decoding. Saves ~25% in benchmarks. Unrolling
     235           2 :         // a loop for i:=0; i<2; i++, saves ~6%.
     236           2 :         var v uint32
     237           2 :         if a := *((*uint8)(ptr)); a < 128 {
     238           2 :                 v = uint32(a)
     239           2 :                 ptr = unsafe.Pointer(uintptr(ptr) + 1)
     240           2 :         } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
     241           0 :                 v = uint32(b)<<7 | uint32(a)
     242           0 :                 ptr = unsafe.Pointer(uintptr(ptr) + 2)
     243           1 :         } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
     244           1 :                 v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     245           1 :                 ptr = unsafe.Pointer(uintptr(ptr) + 3)
     246           1 :         } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
     247           0 :                 v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     248           0 :                 ptr = unsafe.Pointer(uintptr(ptr) + 4)
     249           1 :         } else {
     250           1 :                 d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
     251           1 :                 v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     252           1 :                 ptr = unsafe.Pointer(uintptr(ptr) + 5)
     253           1 :         }
     254           2 :         vh.blockNum = v
     255           2 : 
     256           2 :         if a := *((*uint8)(ptr)); a < 128 {
     257           2 :                 v = uint32(a)
     258           2 :         } else if a, b := a&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 1))); b < 128 {
     259           2 :                 v = uint32(b)<<7 | uint32(a)
     260           2 :         } else if b, c := b&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 2))); c < 128 {
     261           0 :                 v = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     262           1 :         } else if c, d := c&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 3))); d < 128 {
     263           0 :                 v = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     264           1 :         } else {
     265           1 :                 d, e := d&0x7f, *((*uint8)(unsafe.Pointer(uintptr(ptr) + 4)))
     266           1 :                 v = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
     267           1 :         }
     268           2 :         vh.offsetInBlock = v
     269           2 : 
     270           2 :         return vh
     271             : }
     272             : 
     273           1 : func decodeValueHandle(src []byte) valueHandle {
     274           1 :         valLen, src := decodeLenFromValueHandle(src)
     275           1 :         vh := decodeRemainingValueHandle(src)
     276           1 :         vh.valueLen = valLen
     277           1 :         return vh
     278           1 : }
     279             : 
     280             : // valueBlocksIndexHandle is placed in the metaindex if there are any value
     281             : // blocks. If there are no value blocks, there is no value blocks index, and
     282             : // no entry in the metaindex. Note that the lack of entry in the metaindex
     283             : // should not be used to ascertain whether the values are prefixed, since the
     284             : // former is an emergent property of the data that was written and not known
     285             : // until all the key-value pairs in the sstable are written.
     286             : type valueBlocksIndexHandle struct {
     287             :         h                     block.Handle
     288             :         blockNumByteLength    uint8
     289             :         blockOffsetByteLength uint8
     290             :         blockLengthByteLength uint8
     291             : }
     292             : 
     293             : const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3
     294             : 
     295             : // Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen.
     296             : const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen)
     297             : 
     298           2 : func encodeValueBlocksIndexHandle(dst []byte, v valueBlocksIndexHandle) int {
     299           2 :         n := encodeBlockHandle(dst, v.h)
     300           2 :         dst[n] = v.blockNumByteLength
     301           2 :         n++
     302           2 :         dst[n] = v.blockOffsetByteLength
     303           2 :         n++
     304           2 :         dst[n] = v.blockLengthByteLength
     305           2 :         n++
     306           2 :         return n
     307           2 : }
     308             : 
     309           2 : func decodeValueBlocksIndexHandle(src []byte) (valueBlocksIndexHandle, int, error) {
     310           2 :         var vbih valueBlocksIndexHandle
     311           2 :         var n int
     312           2 :         vbih.h, n = decodeBlockHandle(src)
     313           2 :         if n <= 0 {
     314           0 :                 return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
     315           0 :         }
     316           2 :         if len(src) != n+3 {
     317           0 :                 return vbih, 0, errors.Errorf("bad BlockHandle %x", src)
     318           0 :         }
     319           2 :         vbih.blockNumByteLength = src[n]
     320           2 :         vbih.blockOffsetByteLength = src[n+1]
     321           2 :         vbih.blockLengthByteLength = src[n+2]
     322           2 :         return vbih, n + 3, nil
     323             : }
     324             : 
     325             : type valueBlocksAndIndexStats struct {
     326             :         numValueBlocks         uint64
     327             :         numValuesInValueBlocks uint64
     328             :         // Includes both value blocks and value index block.
     329             :         valueBlocksAndIndexSize uint64
     330             : }
     331             : 
     332             : // valueBlockWriter writes a sequence of value blocks, and the value blocks
     333             : // index, for a sstable.
     334             : type valueBlockWriter struct {
     335             :         // The configured uncompressed block size and size threshold
     336             :         blockSize, blockSizeThreshold int
     337             :         // Configured compression.
     338             :         compression Compression
     339             :         // checksummer with configured checksum type.
     340             :         checksummer block.Checksummer
     341             :         // Block finished callback.
     342             :         blockFinishedFunc func(compressedSize int)
     343             : 
     344             :         // buf is the current block being written to (uncompressed).
     345             :         buf *blockBuffer
     346             :         // compressedBuf is used for compressing the block.
     347             :         compressedBuf *blockBuffer
     348             :         // Sequence of blocks that are finished.
     349             :         blocks []blockAndHandle
     350             :         // Cumulative value block bytes written so far.
     351             :         totalBlockBytes uint64
     352             :         numValues       uint64
     353             : }
     354             : 
     355             : type blockAndHandle struct {
     356             :         block      *blockBuffer
     357             :         handle     block.Handle
     358             :         compressed bool
     359             : }
     360             : 
     361             : type blockBuffer struct {
     362             :         b []byte
     363             : }
     364             : 
     365             : // Pool of block buffers that should be roughly the blockSize.
     366             : var uncompressedValueBlockBufPool = sync.Pool{
     367           2 :         New: func() interface{} {
     368           2 :                 return &blockBuffer{}
     369           2 :         },
     370             : }
     371             : 
     372             : // Pool of block buffers for compressed value blocks. These may widely vary in
     373             : // size based on compression ratios.
     374             : var compressedValueBlockBufPool = sync.Pool{
     375           2 :         New: func() interface{} {
     376           2 :                 return &blockBuffer{}
     377           2 :         },
     378             : }
     379             : 
     380           2 : func releaseToValueBlockBufPool(pool *sync.Pool, b *blockBuffer) {
     381           2 :         // Don't pool buffers larger than 128KB, in case we had some rare large
     382           2 :         // values.
     383           2 :         if len(b.b) > 128*1024 {
     384           0 :                 return
     385           0 :         }
     386           2 :         if invariants.Enabled {
     387           2 :                 // Set the bytes to a random value. Cap the number of bytes being
     388           2 :                 // randomized to prevent test timeouts.
     389           2 :                 length := cap(b.b)
     390           2 :                 if length > 1000 {
     391           2 :                         length = 1000
     392           2 :                 }
     393           2 :                 b.b = b.b[:length:length]
     394           2 :                 rand.Read(b.b)
     395             :         }
     396           2 :         pool.Put(b)
     397             : }
     398             : 
     399             : var valueBlockWriterPool = sync.Pool{
     400           2 :         New: func() interface{} {
     401           2 :                 return &valueBlockWriter{}
     402           2 :         },
     403             : }
     404             : 
     405             : func newValueBlockWriter(
     406             :         blockSize int,
     407             :         blockSizeThreshold int,
     408             :         compression Compression,
     409             :         checksumType block.ChecksumType,
     410             :         // compressedSize should exclude the block trailer.
     411             :         blockFinishedFunc func(compressedSize int),
     412           2 : ) *valueBlockWriter {
     413           2 :         w := valueBlockWriterPool.Get().(*valueBlockWriter)
     414           2 :         *w = valueBlockWriter{
     415           2 :                 blockSize:          blockSize,
     416           2 :                 blockSizeThreshold: blockSizeThreshold,
     417           2 :                 compression:        compression,
     418           2 :                 checksummer: block.Checksummer{
     419           2 :                         Type: checksumType,
     420           2 :                 },
     421           2 :                 blockFinishedFunc: blockFinishedFunc,
     422           2 :                 buf:               uncompressedValueBlockBufPool.Get().(*blockBuffer),
     423           2 :                 compressedBuf:     compressedValueBlockBufPool.Get().(*blockBuffer),
     424           2 :                 blocks:            w.blocks[:0],
     425           2 :         }
     426           2 :         w.buf.b = w.buf.b[:0]
     427           2 :         w.compressedBuf.b = w.compressedBuf.b[:0]
     428           2 :         return w
     429           2 : }
     430             : 
     431           2 : func releaseValueBlockWriter(w *valueBlockWriter) {
     432           2 :         for i := range w.blocks {
     433           2 :                 if w.blocks[i].compressed {
     434           2 :                         releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.blocks[i].block)
     435           2 :                 } else {
     436           2 :                         releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.blocks[i].block)
     437           2 :                 }
     438           2 :                 w.blocks[i].block = nil
     439             :         }
     440           2 :         if w.buf != nil {
     441           2 :                 releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.buf)
     442           2 :         }
     443           2 :         if w.compressedBuf != nil {
     444           2 :                 releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.compressedBuf)
     445           2 :         }
     446           2 :         *w = valueBlockWriter{
     447           2 :                 blocks: w.blocks[:0],
     448           2 :         }
     449           2 :         valueBlockWriterPool.Put(w)
     450             : }
     451             : 
     452           2 : func (w *valueBlockWriter) addValue(v []byte) (valueHandle, error) {
     453           2 :         if invariants.Enabled && len(v) == 0 {
     454           0 :                 return valueHandle{}, errors.Errorf("cannot write empty value to value block")
     455           0 :         }
     456           2 :         w.numValues++
     457           2 :         blockLen := len(w.buf.b)
     458           2 :         valueLen := len(v)
     459           2 :         if blockLen >= w.blockSize ||
     460           2 :                 (blockLen > w.blockSizeThreshold && blockLen+valueLen > w.blockSize) {
     461           2 :                 // Block is not currently empty and adding this value will become too big,
     462           2 :                 // so finish this block.
     463           2 :                 w.compressAndFlush()
     464           2 :                 blockLen = len(w.buf.b)
     465           2 :                 if invariants.Enabled && blockLen != 0 {
     466           0 :                         panic("blockLen of new block should be 0")
     467             :                 }
     468             :         }
     469           2 :         vh := valueHandle{
     470           2 :                 valueLen:      uint32(valueLen),
     471           2 :                 blockNum:      uint32(len(w.blocks)),
     472           2 :                 offsetInBlock: uint32(blockLen),
     473           2 :         }
     474           2 :         blockLen = int(vh.offsetInBlock + vh.valueLen)
     475           2 :         if cap(w.buf.b) < blockLen {
     476           2 :                 size := 2 * cap(w.buf.b)
     477           2 :                 if size < 1024 {
     478           2 :                         size = 1024
     479           2 :                 }
     480           2 :                 for size < blockLen {
     481           0 :                         size *= 2
     482           0 :                 }
     483           2 :                 buf := make([]byte, blockLen, size)
     484           2 :                 _ = copy(buf, w.buf.b)
     485           2 :                 w.buf.b = buf
     486           2 :         } else {
     487           2 :                 w.buf.b = w.buf.b[:blockLen]
     488           2 :         }
     489           2 :         buf := w.buf.b[vh.offsetInBlock:]
     490           2 :         n := copy(buf, v)
     491           2 :         if n != len(buf) {
     492           0 :                 panic("incorrect length computation")
     493             :         }
     494           2 :         return vh, nil
     495             : }
     496             : 
     497           2 : func (w *valueBlockWriter) compressAndFlush() {
     498           2 :         // Compress the buffer, discarding the result if the improvement isn't at
     499           2 :         // least 12.5%.
     500           2 :         blockType := noCompressionBlockType
     501           2 :         b := w.buf
     502           2 :         if w.compression != NoCompression {
     503           2 :                 blockType, w.compressedBuf.b =
     504           2 :                         compressBlock(w.compression, w.buf.b, w.compressedBuf.b[:cap(w.compressedBuf.b)])
     505           2 :                 if len(w.compressedBuf.b) < len(w.buf.b)-len(w.buf.b)/8 {
     506           2 :                         b = w.compressedBuf
     507           2 :                 } else {
     508           2 :                         blockType = noCompressionBlockType
     509           2 :                 }
     510             :         }
     511           2 :         n := len(b.b)
     512           2 :         if n+block.TrailerLen > cap(b.b) {
     513           2 :                 block := make([]byte, n+block.TrailerLen)
     514           2 :                 copy(block, b.b)
     515           2 :                 b.b = block
     516           2 :         } else {
     517           2 :                 b.b = b.b[:n+block.TrailerLen]
     518           2 :         }
     519           2 :         b.b[n] = byte(blockType)
     520           2 :         w.computeChecksum(b.b)
     521           2 :         bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(n)}
     522           2 :         w.totalBlockBytes += uint64(len(b.b))
     523           2 :         // blockFinishedFunc length excludes the block trailer.
     524           2 :         w.blockFinishedFunc(n)
     525           2 :         compressed := blockType != noCompressionBlockType
     526           2 :         w.blocks = append(w.blocks, blockAndHandle{
     527           2 :                 block:      b,
     528           2 :                 handle:     bh,
     529           2 :                 compressed: compressed,
     530           2 :         })
     531           2 :         // Handed off a buffer to w.blocks, so need get a new one.
     532           2 :         if compressed {
     533           2 :                 w.compressedBuf = compressedValueBlockBufPool.Get().(*blockBuffer)
     534           2 :         } else {
     535           2 :                 w.buf = uncompressedValueBlockBufPool.Get().(*blockBuffer)
     536           2 :         }
     537           2 :         w.buf.b = w.buf.b[:0]
     538             : }
     539             : 
     540           2 : func (w *valueBlockWriter) computeChecksum(blk []byte) {
     541           2 :         n := len(blk) - block.TrailerLen
     542           2 :         checksum := w.checksummer.Checksum(blk[:n], blk[n:n+1])
     543           2 :         binary.LittleEndian.PutUint32(blk[n+1:], checksum)
     544           2 : }
     545             : 
     546             : func (w *valueBlockWriter) finish(
     547             :         layout *layoutWriter, fileOffset uint64,
     548           2 : ) (valueBlocksIndexHandle, valueBlocksAndIndexStats, error) {
     549           2 :         if len(w.buf.b) > 0 {
     550           2 :                 w.compressAndFlush()
     551           2 :         }
     552           2 :         n := len(w.blocks)
     553           2 :         if n == 0 {
     554           2 :                 return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, nil
     555           2 :         }
     556           2 :         largestOffset := uint64(0)
     557           2 :         largestLength := uint64(0)
     558           2 :         for i := range w.blocks {
     559           2 :                 _, err := layout.WriteValueBlock(w.blocks[i].block.b)
     560           2 :                 if err != nil {
     561           0 :                         return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
     562           0 :                 }
     563           2 :                 w.blocks[i].handle.Offset += fileOffset
     564           2 :                 largestOffset = w.blocks[i].handle.Offset
     565           2 :                 if largestLength < w.blocks[i].handle.Length {
     566           2 :                         largestLength = w.blocks[i].handle.Length
     567           2 :                 }
     568             :         }
     569           2 :         vbihOffset := fileOffset + w.totalBlockBytes
     570           2 : 
     571           2 :         vbih := valueBlocksIndexHandle{
     572           2 :                 h: block.Handle{
     573           2 :                         Offset: vbihOffset,
     574           2 :                 },
     575           2 :                 blockNumByteLength:    uint8(lenLittleEndian(uint64(n - 1))),
     576           2 :                 blockOffsetByteLength: uint8(lenLittleEndian(largestOffset)),
     577           2 :                 blockLengthByteLength: uint8(lenLittleEndian(largestLength)),
     578           2 :         }
     579           2 :         var err error
     580           2 :         if n > 0 {
     581           2 :                 if vbih, err = w.writeValueBlocksIndex(layout, vbih); err != nil {
     582           0 :                         return valueBlocksIndexHandle{}, valueBlocksAndIndexStats{}, err
     583           0 :                 }
     584             :         }
     585           2 :         stats := valueBlocksAndIndexStats{
     586           2 :                 numValueBlocks:          uint64(n),
     587           2 :                 numValuesInValueBlocks:  w.numValues,
     588           2 :                 valueBlocksAndIndexSize: w.totalBlockBytes + vbih.h.Length + block.TrailerLen,
     589           2 :         }
     590           2 :         return vbih, stats, err
     591             : }
     592             : 
     593             : func (w *valueBlockWriter) writeValueBlocksIndex(
     594             :         layout *layoutWriter, h valueBlocksIndexHandle,
     595           2 : ) (valueBlocksIndexHandle, error) {
     596           2 :         blockLen :=
     597           2 :                 int(h.blockNumByteLength+h.blockOffsetByteLength+h.blockLengthByteLength) * len(w.blocks)
     598           2 :         h.h.Length = uint64(blockLen)
     599           2 :         blockLen += block.TrailerLen
     600           2 :         var buf []byte
     601           2 :         if cap(w.buf.b) < blockLen {
     602           2 :                 buf = make([]byte, blockLen)
     603           2 :                 w.buf.b = buf
     604           2 :         } else {
     605           2 :                 buf = w.buf.b[:blockLen]
     606           2 :         }
     607           2 :         b := buf
     608           2 :         for i := range w.blocks {
     609           2 :                 littleEndianPut(uint64(i), b, int(h.blockNumByteLength))
     610           2 :                 b = b[int(h.blockNumByteLength):]
     611           2 :                 littleEndianPut(w.blocks[i].handle.Offset, b, int(h.blockOffsetByteLength))
     612           2 :                 b = b[int(h.blockOffsetByteLength):]
     613           2 :                 littleEndianPut(w.blocks[i].handle.Length, b, int(h.blockLengthByteLength))
     614           2 :                 b = b[int(h.blockLengthByteLength):]
     615           2 :         }
     616           2 :         if len(b) != block.TrailerLen {
     617           0 :                 panic("incorrect length calculation")
     618             :         }
     619           2 :         b[0] = byte(noCompressionBlockType)
     620           2 :         w.computeChecksum(buf)
     621           2 :         if _, err := layout.WriteValueIndexBlock(buf, h); err != nil {
     622           0 :                 return valueBlocksIndexHandle{}, err
     623           0 :         }
     624           2 :         return h, nil
     625             : }
     626             : 
     627             : // littleEndianPut writes v to b using little endian encoding, under the
     628             : // assumption that v can be represented using n bytes.
     629           2 : func littleEndianPut(v uint64, b []byte, n int) {
     630           2 :         _ = b[n-1] // bounds check
     631           2 :         for i := 0; i < n; i++ {
     632           2 :                 b[i] = byte(v)
     633           2 :                 v = v >> 8
     634           2 :         }
     635             : }
     636             : 
     637             : // lenLittleEndian returns the minimum number of bytes needed to encode v
     638             : // using little endian encoding.
     639           2 : func lenLittleEndian(v uint64) int {
     640           2 :         n := 0
     641           2 :         for i := 0; i < 8; i++ {
     642           2 :                 n++
     643           2 :                 v = v >> 8
     644           2 :                 if v == 0 {
     645           2 :                         break
     646             :                 }
     647             :         }
     648           2 :         return n
     649             : }
     650             : 
     651           2 : func littleEndianGet(b []byte, n int) uint64 {
     652           2 :         _ = b[n-1] // bounds check
     653           2 :         v := uint64(b[0])
     654           2 :         for i := 1; i < n; i++ {
     655           2 :                 v |= uint64(b[i]) << (8 * i)
     656           2 :         }
     657           2 :         return v
     658             : }
     659             : 
     660             : // UserKeyPrefixBound represents a [Lower,Upper) bound of user key prefixes.
     661             : // If both are nil, there is no bound specified. Else, Compare(Lower,Upper)
     662             : // must be < 0.
     663             : type UserKeyPrefixBound struct {
     664             :         // Lower is a lower bound user key prefix.
     665             :         Lower []byte
     666             :         // Upper is an upper bound user key prefix.
     667             :         Upper []byte
     668             : }
     669             : 
     670             : // IsEmpty returns true iff the bound is empty.
     671           2 : func (ukb *UserKeyPrefixBound) IsEmpty() bool {
     672           2 :         return len(ukb.Lower) == 0 && len(ukb.Upper) == 0
     673           2 : }
     674             : 
     675             : type blockProviderWhenOpen interface {
     676             :         readBlockForVBR(
     677             :                 h block.Handle, stats *base.InternalIteratorStats,
     678             :         ) (block.BufferHandle, error)
     679             : }
     680             : 
     681             : type blockProviderWhenClosed struct {
     682             :         rp ReaderProvider
     683             :         r  *Reader
     684             : }
     685             : 
     686           2 : func (bpwc *blockProviderWhenClosed) open() error {
     687           2 :         var err error
     688           2 :         bpwc.r, err = bpwc.rp.GetReader()
     689           2 :         return err
     690           2 : }
     691             : 
     692           2 : func (bpwc *blockProviderWhenClosed) close() {
     693           2 :         bpwc.rp.Close()
     694           2 :         bpwc.r = nil
     695           2 : }
     696             : 
     697             : func (bpwc blockProviderWhenClosed) readBlockForVBR(
     698             :         h block.Handle, stats *base.InternalIteratorStats,
     699           2 : ) (block.BufferHandle, error) {
     700           2 :         // This is rare, since most block reads happen when the corresponding
     701           2 :         // sstable iterator is open. So we are willing to sacrifice a proper context
     702           2 :         // for tracing.
     703           2 :         //
     704           2 :         // TODO(sumeer): consider fixing this. See
     705           2 :         // https://github.com/cockroachdb/pebble/pull/3065#issue-1991175365 for an
     706           2 :         // alternative.
     707           2 :         ctx := objiotracing.WithBlockType(context.Background(), objiotracing.ValueBlock)
     708           2 :         // TODO(jackson,sumeer): Consider whether to use a buffer pool in this case.
     709           2 :         // The bpwc is not allowed to outlive the iterator tree, so it cannot
     710           2 :         // outlive the buffer pool.
     711           2 :         return bpwc.r.readBlock(
     712           2 :                 ctx, h, nil, nil, stats, nil /* iterStats */, nil /* buffer pool */)
     713           2 : }
     714             : 
     715             : // ReaderProvider supports the implementation of blockProviderWhenClosed.
     716             : // GetReader and Close can be called multiple times in pairs.
     717             : type ReaderProvider interface {
     718             :         GetReader() (r *Reader, err error)
     719             :         Close()
     720             : }
     721             : 
     722             : // TrivialReaderProvider implements ReaderProvider for a Reader that will
     723             : // outlive the top-level iterator in the iterator tree.
     724             : type TrivialReaderProvider struct {
     725             :         *Reader
     726             : }
     727             : 
     728             : var _ ReaderProvider = TrivialReaderProvider{}
     729             : 
     730             : // GetReader implements ReaderProvider.
     731           1 : func (trp TrivialReaderProvider) GetReader() (*Reader, error) {
     732           1 :         return trp.Reader, nil
     733           1 : }
     734             : 
     735             : // Close implements ReaderProvider.
     736           1 : func (trp TrivialReaderProvider) Close() {}
     737             : 
     738             : // valueBlockReader is used to retrieve values in value
     739             : // blocks. It is used when the sstable was written with
     740             : // Properties.ValueBlocksAreEnabled.
     741             : type valueBlockReader struct {
     742             :         bpOpen blockProviderWhenOpen
     743             :         rp     ReaderProvider
     744             :         vbih   valueBlocksIndexHandle
     745             :         stats  *base.InternalIteratorStats
     746             : 
     747             :         // The value blocks index is lazily retrieved the first time the reader
     748             :         // needs to read a value that resides in a value block.
     749             :         vbiBlock []byte
     750             :         vbiCache block.BufferHandle
     751             :         // When sequentially iterating through all key-value pairs, the cost of
     752             :         // repeatedly getting a block that is already in the cache and releasing the
     753             :         // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the
     754             :         // last value block it retrieved, in case there is locality of access, and
     755             :         // this value block can be used for the next value retrieval.
     756             :         valueBlockNum uint32
     757             :         valueBlock    []byte
     758             :         valueBlockPtr unsafe.Pointer
     759             :         valueCache    block.BufferHandle
     760             :         lazyFetcher   base.LazyFetcher
     761             :         closed        bool
     762             :         bufToMangle   []byte
     763             : }
     764             : 
     765           2 : func (r *valueBlockReader) getLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue {
     766           2 :         fetcher := &r.lazyFetcher
     767           2 :         valLen, h := decodeLenFromValueHandle(handle[1:])
     768           2 :         *fetcher = base.LazyFetcher{
     769           2 :                 Fetcher: r,
     770           2 :                 Attribute: base.AttributeAndLen{
     771           2 :                         ValueLen:       int32(valLen),
     772           2 :                         ShortAttribute: block.ValuePrefix(handle[0]).ShortAttribute(),
     773           2 :                 },
     774           2 :         }
     775           2 :         if r.stats != nil {
     776           2 :                 r.stats.SeparatedPointValue.Count++
     777           2 :                 r.stats.SeparatedPointValue.ValueBytes += uint64(valLen)
     778           2 :         }
     779           2 :         return base.LazyValue{
     780           2 :                 ValueOrHandle: h,
     781           2 :                 Fetcher:       fetcher,
     782           2 :         }
     783             : }
     784             : 
     785           2 : func (r *valueBlockReader) close() {
     786           2 :         r.bpOpen = nil
     787           2 :         r.vbiBlock = nil
     788           2 :         r.vbiCache.Release()
     789           2 :         // Set the handle to empty since Release does not nil the Handle.value. If
     790           2 :         // we were to reopen this valueBlockReader and retrieve the same
     791           2 :         // Handle.value from the cache, we don't want to accidentally unref it when
     792           2 :         // attempting to unref the old handle.
     793           2 :         r.vbiCache = block.BufferHandle{}
     794           2 :         r.valueBlock = nil
     795           2 :         r.valueBlockPtr = nil
     796           2 :         r.valueCache.Release()
     797           2 :         // See comment above.
     798           2 :         r.valueCache = block.BufferHandle{}
     799           2 :         r.closed = true
     800           2 :         // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be
     801           2 :         // implemented.
     802           2 : }
     803             : 
     804             : // Fetch implements base.ValueFetcher.
     805             : func (r *valueBlockReader) Fetch(
     806             :         handle []byte, valLen int32, buf []byte,
     807           2 : ) (val []byte, callerOwned bool, err error) {
     808           2 :         if !r.closed {
     809           2 :                 val, err := r.getValueInternal(handle, valLen)
     810           2 :                 if invariants.Enabled {
     811           2 :                         val = r.doValueMangling(val)
     812           2 :                 }
     813           2 :                 return val, false, err
     814             :         }
     815             : 
     816           2 :         bp := blockProviderWhenClosed{rp: r.rp}
     817           2 :         err = bp.open()
     818           2 :         if err != nil {
     819           0 :                 return nil, false, err
     820           0 :         }
     821           2 :         defer bp.close()
     822           2 :         defer r.close()
     823           2 :         r.bpOpen = bp
     824           2 :         var v []byte
     825           2 :         v, err = r.getValueInternal(handle, valLen)
     826           2 :         if err != nil {
     827           0 :                 return nil, false, err
     828           0 :         }
     829           2 :         buf = append(buf[:0], v...)
     830           2 :         return buf, true, nil
     831             : }
     832             : 
     833             : // doValueMangling attempts to uncover violations of the contract listed in
     834             : // the declaration comment of LazyValue. It is expensive, hence only called
     835             : // when invariants.Enabled.
     836           2 : func (r *valueBlockReader) doValueMangling(v []byte) []byte {
     837           2 :         // Randomly set the bytes in the previous retrieved value to 0, since
     838           2 :         // property P1 only requires the valueBlockReader to maintain the memory of
     839           2 :         // one fetched value.
     840           2 :         if rand.Intn(2) == 0 {
     841           2 :                 clear(r.bufToMangle)
     842           2 :         }
     843             :         // Store the current value in a new buffer for future mangling.
     844           2 :         r.bufToMangle = append([]byte(nil), v...)
     845           2 :         return r.bufToMangle
     846             : }
     847             : 
     848           2 : func (r *valueBlockReader) getValueInternal(handle []byte, valLen int32) (val []byte, err error) {
     849           2 :         vh := decodeRemainingValueHandle(handle)
     850           2 :         vh.valueLen = uint32(valLen)
     851           2 :         if r.vbiBlock == nil {
     852           2 :                 ch, err := r.bpOpen.readBlockForVBR(r.vbih.h, r.stats)
     853           2 :                 if err != nil {
     854           0 :                         return nil, err
     855           0 :                 }
     856           2 :                 r.vbiCache = ch
     857           2 :                 r.vbiBlock = ch.Get()
     858             :         }
     859           2 :         if r.valueBlock == nil || r.valueBlockNum != vh.blockNum {
     860           2 :                 vbh, err := r.getBlockHandle(vh.blockNum)
     861           2 :                 if err != nil {
     862           0 :                         return nil, err
     863           0 :                 }
     864           2 :                 vbCacheHandle, err := r.bpOpen.readBlockForVBR(vbh, r.stats)
     865           2 :                 if err != nil {
     866           0 :                         return nil, err
     867           0 :                 }
     868           2 :                 r.valueBlockNum = vh.blockNum
     869           2 :                 r.valueCache.Release()
     870           2 :                 r.valueCache = vbCacheHandle
     871           2 :                 r.valueBlock = vbCacheHandle.Get()
     872           2 :                 r.valueBlockPtr = unsafe.Pointer(&r.valueBlock[0])
     873             :         }
     874           2 :         if r.stats != nil {
     875           2 :                 r.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen)
     876           2 :         }
     877           2 :         return r.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil
     878             : }
     879             : 
     880           2 : func (r *valueBlockReader) getBlockHandle(blockNum uint32) (block.Handle, error) {
     881           2 :         indexEntryLen :=
     882           2 :                 int(r.vbih.blockNumByteLength + r.vbih.blockOffsetByteLength + r.vbih.blockLengthByteLength)
     883           2 :         offsetInIndex := indexEntryLen * int(blockNum)
     884           2 :         if len(r.vbiBlock) < offsetInIndex+indexEntryLen {
     885           0 :                 return block.Handle{}, base.AssertionFailedf(
     886           0 :                         "index entry out of bounds: offset %d length %d block length %d",
     887           0 :                         offsetInIndex, indexEntryLen, len(r.vbiBlock))
     888           0 :         }
     889           2 :         b := r.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen]
     890           2 :         n := int(r.vbih.blockNumByteLength)
     891           2 :         bn := littleEndianGet(b, n)
     892           2 :         if uint32(bn) != blockNum {
     893           0 :                 return block.Handle{},
     894           0 :                         errors.Errorf("expected block num %d but found %d", blockNum, bn)
     895           0 :         }
     896           2 :         b = b[n:]
     897           2 :         n = int(r.vbih.blockOffsetByteLength)
     898           2 :         blockOffset := littleEndianGet(b, n)
     899           2 :         b = b[n:]
     900           2 :         n = int(r.vbih.blockLengthByteLength)
     901           2 :         blockLen := littleEndianGet(b, n)
     902           2 :         return block.Handle{Offset: blockOffset, Length: blockLen}, nil
     903             : }

Generated by: LCOV version 1.14