LCOV - code coverage report
Current view: top level - pebble/sstable/colblk - block.go (source / functions) Hit Total Coverage
Test: 2024-10-11 08:16Z fb2c2ff4 - tests + meta.lcov Lines: 148 167 88.6 %
Date: 2024-10-11 08:17:43 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package colblk implements a columnar block format.
       6             : //
       7             : // The columnar block format organizes data into columns. The number of columns
       8             : // and their data types are dependent on the use and are configurable.
       9             : //
      10             : // # Format
      11             : //
      12             : // Every columnar block begins with a header describing the structure and schema
      13             : // of the block. Then columns values are encoded in sequence. The block ends
      14             : // with a single padding byte.
      15             : //
      16             : // The block header begins with:
      17             : // - Version number (1 byte)
      18             : // - The number of columns in the block (2 bytes)
      19             : // - The number of rows in the block (4 bytes)
      20             : //
      21             : // Then follows a column-header for each column. Each column header encodes the
      22             : // data type (1 byte) and the offset into the block where the column data begins
      23             : // (4 bytes).
      24             : //
      25             : //      +-----------+
      26             : //      | Vers (1B) |
      27             : //      +-------------------+--------------------------------+
      28             : //      | # columns (2B)    | # of rows (4B)                 |
      29             : //      +-----------+-------+---------------------+----------+
      30             : //      | Type (1B) | Page offset (4B)                | Col 0
      31             : //      +-----------+---------------------------------+
      32             : //      | Type (1B) | Page offset (4B)                | Col 1
      33             : //      +-----------+---------------------------------+
      34             : //      | ...       | ...                             |
      35             : //      +-----------+---------------------------------+
      36             : //      | Type (1B) | Page offset (4B)                | Col n-1
      37             : //      +-----------+----------------------------------
      38             : //      |  column 0 data                                ...
      39             : //      +----------------------------------------------
      40             : //      |  column 1 data                                ...
      41             : //      +----------------------------------------------
      42             : //      | ...
      43             : //      +----------------------------------------------
      44             : //      |  column n-1 data                              ...
      45             : //      +-------------+--------------------------------
      46             : //      | Unused (1B) |
      47             : //      +-------------+
      48             : //
      49             : // The encoding of column data itself is dependent on the data type.
      50             : //
      51             : // The trailing padding byte exists to allow columns to represent the end of
      52             : // column data using a pointer to the byte after the end of the column. The
      53             : // padding byte ensures that the pointer will always fall within the allocated
      54             : // memory. Without the padding byte, such a pointer would be invalid according
      55             : // to Go's pointer rules.
      56             : //
      57             : // # Alignment
      58             : //
      59             : // Block buffers are required to be word-aligned, during encoding and decoding.
      60             : // This ensures that if any individual column or piece of data requires
      61             : // word-alignment, the writer can align the offset into the block buffer
      62             : // appropriately to ensure that the data is word-aligned.
      63             : //
      64             : // # Keyspan blocks
      65             : //
      66             : // Blocks encoding key spans (range deletions, range keys) decompose the fields
      67             : // of keyspan.Key into columns. Key spans are always stored fragmented, such
      68             : // that all overlapping keys have identical bounds. When encoding key spans to a
      69             : // columnar block, we take advantage of this fragmentation to store the set of
      70             : // unique user key span boundaries in a separate column. This column does not
      71             : // have the same number of rows as the other columns. Each individual fragment
      72             : // stores the index of its start boundary user key within the user key column.
      73             : //
      74             : // For example, consider the three unfragmented spans:
      75             : //
      76             : //      a-e:{(#0,RANGEDEL)}
      77             : //      b-d:{(#100,RANGEDEL)}
      78             : //      f-g:{(#20,RANGEDEL)}
      79             : //
      80             : // Once fragmented, these spans become five keyspan.Keys:
      81             : //
      82             : //      a-b:{(#0,RANGEDEL)}
      83             : //      b-d:{(#100,RANGEDEL), (#0,RANGEDEL)}
      84             : //      d-e:{(#0,RANGEDEL)}
      85             : //      f-g:{(#20,RANGEDEL)}
      86             : //
      87             : // When encoded within a columnar keyspan block, the boundary columns (user key
      88             : // and start indices) would hold six rows:
      89             : //
      90             : //      +-----------------+-----------------+
      91             : //      | User key        | Start index     |
      92             : //      +-----------------+-----------------+
      93             : //      | a               | 0               |
      94             : //      +-----------------+-----------------+
      95             : //      | b               | 1               |
      96             : //      +-----------------+-----------------+
      97             : //      | d               | 3               |
      98             : //      +-----------------+-----------------+
      99             : //      | e               | 4               |
     100             : //      +-----------------+-----------------+
     101             : //      | f               | 4               |
     102             : //      +-----------------+-----------------+
     103             : //      | g               | 5               |
     104             : //      +-----------------+-----------------+
     105             : //
     106             : // The remaining keyspan.Key columns would look like:
     107             : //
     108             : //      +-----------------+-----------------+-----------------+
     109             : //      | Trailer         | Suffix          | Value           |
     110             : //      +-----------------+-----------------+-----------------+
     111             : //      | (#0,RANGEDEL)   | -               | -               | (0)
     112             : //      +-----------------+-----------------+-----------------+
     113             : //      | (#100,RANGEDEL) | -               | -               | (1)
     114             : //      +-----------------+-----------------+-----------------+
     115             : //      | (#0,RANGEDEL)   | -               | -               | (2)
     116             : //      +-----------------+-----------------+-----------------+
     117             : //      | (#0,RANGEDEL)   | -               | -               | (3)
     118             : //      +-----------------+-----------------+-----------------+
     119             : //      | (#20,RANGEDEL)  | -               | -               | (4)
     120             : //      +-----------------+-----------------+-----------------+
     121             : //
     122             : // This encoding does not explicitly encode the mapping of keyspan.Key to
     123             : // boundary keys.  Rather each boundary key encodes the index where keys
     124             : // beginning at a boundary >= the key begin. Readers look up the key start index
     125             : // for the start boundary (s) and the end boundary (e). Any keys within indexes
     126             : // [s,e) have the corresponding bounds.
     127             : //
     128             : // Both range deletions and range keys are encoded with the same schema. Range
     129             : // deletion keyspan.Keys never contain suffixes or values. When one of these
     130             : // columns is encoded, the RawBytes encoding uses uintEncodingAllZero to avoid
     131             : // encoding N offsets. Each of these empty columns is encoded in just 1 byte of
     132             : // column data.
     133             : package colblk
     134             : 
     135             : import (
     136             :         "cmp"
     137             :         "encoding/binary"
     138             :         "fmt"
     139             :         "unsafe"
     140             : 
     141             :         "github.com/cockroachdb/errors"
     142             :         "github.com/cockroachdb/pebble/internal/aligned"
     143             :         "github.com/cockroachdb/pebble/internal/binfmt"
     144             : )
     145             : 
     146             : // Version indicates the version of the columnar block format encoded. The
     147             : // version byte is always the first byte within the block. This ensures that
     148             : // readers can switch on the version before reading the rest of the block.
     149             : type Version uint8
     150             : 
     151             : const (
     152             :         // Version1 is the first version of the columnar block format.
     153             :         Version1 Version = 0x01
     154             : )
     155             : 
     156             : const blockHeaderBaseSize = 7
     157             : const columnHeaderSize = 5
     158             : const maxBlockRetainedSize = 256 << 10
     159             : 
     160             : // Header holds the metadata extracted from a columnar block's header.
     161             : type Header struct {
     162             :         Version Version
     163             :         // Columns holds the number of columns encoded within the block.
     164             :         Columns uint16
     165             :         // Rows holds the number of rows encoded within the block.
     166             :         Rows uint32
     167             : }
     168             : 
     169             : // String implements the fmt.Stringer interface, returning a human-readable
     170             : // representation of the block header.
     171           0 : func (h Header) String() string {
     172           0 :         return fmt.Sprintf("Version=%v; Columns=%d; Rows=%d", h.Version, h.Columns, h.Rows)
     173           0 : }
     174             : 
     175             : // Encode encodes the header to the provided buf. The length of buf must be at
     176             : // least 7 bytes.
     177           1 : func (h Header) Encode(buf []byte) {
     178           1 :         buf[0] = byte(h.Version)
     179           1 :         binary.LittleEndian.PutUint16(buf[1:], h.Columns)
     180           1 :         binary.LittleEndian.PutUint32(buf[1+align16:], h.Rows)
     181           1 : }
     182             : 
     183             : // blockHeaderSize returns the size of the block header, including column
     184             : // headers, for a block with the specified number of columns and optionally a
     185             : // custom header size.
     186           1 : func blockHeaderSize(cols int, customHeaderSize int) uint32 {
     187           1 :         // Each column has a 1-byte DataType and a 4-byte offset into the block.
     188           1 :         return uint32(blockHeaderBaseSize + cols*columnHeaderSize + customHeaderSize)
     189           1 : }
     190             : 
     191             : // DecodeHeader reads the block header from the provided serialized columnar
     192             : // block.
     193           1 : func DecodeHeader(data []byte) Header {
     194           1 :         return Header{
     195           1 :                 Version: Version(data[0]),
     196           1 :                 Columns: uint16(binary.LittleEndian.Uint16(data[1:])),
     197           1 :                 Rows:    uint32(binary.LittleEndian.Uint32(data[1+align16:])),
     198           1 :         }
     199           1 : }
     200             : 
     201             : // A blockEncoder encodes a columnar block and handles encoding the block's
     202             : // header, including individual column headers.
     203             : type blockEncoder struct {
     204             :         buf          []byte
     205             :         headerOffset uint32
     206             :         pageOffset   uint32
     207             : }
     208             : 
     209           1 : func (e *blockEncoder) reset() {
     210           1 :         if cap(e.buf) > maxBlockRetainedSize {
     211           0 :                 e.buf = nil
     212           0 :         }
     213           1 :         e.headerOffset = 0
     214           1 :         e.pageOffset = 0
     215             : }
     216             : 
     217             : // init initializes the block encoder with a buffer of the specified size and
     218             : // header.
     219           1 : func (e *blockEncoder) init(size int, h Header, customHeaderSize int) {
     220           1 :         if cap(e.buf) < size {
     221           1 :                 e.buf = aligned.ByteSlice(size)
     222           1 :         } else {
     223           1 :                 e.buf = e.buf[:size]
     224           1 :         }
     225           1 :         e.headerOffset = uint32(customHeaderSize) + blockHeaderBaseSize
     226           1 :         e.pageOffset = blockHeaderSize(int(h.Columns), customHeaderSize)
     227           1 :         h.Encode(e.buf[customHeaderSize:])
     228             : }
     229             : 
     230             : // data returns the underlying buffer.
     231           1 : func (e *blockEncoder) data() []byte {
     232           1 :         return e.buf
     233           1 : }
     234             : 
     235             : // encode writes w's columns to the block.
     236           1 : func (e *blockEncoder) encode(rows int, w ColumnWriter) {
     237           1 :         for i := 0; i < w.NumColumns(); i++ {
     238           1 :                 e.buf[e.headerOffset] = byte(w.DataType(i))
     239           1 :                 binary.LittleEndian.PutUint32(e.buf[e.headerOffset+1:], e.pageOffset)
     240           1 :                 e.headerOffset += columnHeaderSize
     241           1 :                 e.pageOffset = w.Finish(i, rows, e.pageOffset, e.buf)
     242           1 :         }
     243             : }
     244             : 
     245             : // finish finalizes the block encoding, returning the encoded block. The
     246             : // returned byte slice points to the encoder's buffer, so if the encoder is
     247             : // reused the returned slice will be invalidated.
     248           1 : func (e *blockEncoder) finish() []byte {
     249           1 :         e.buf[e.pageOffset] = 0x00 // Padding byte
     250           1 :         e.pageOffset++
     251           1 :         if e.pageOffset != uint32(len(e.buf)) {
     252           0 :                 panic(errors.AssertionFailedf("expected pageOffset=%d to equal size=%d", e.pageOffset, len(e.buf)))
     253             :         }
     254           1 :         return e.buf
     255             : }
     256             : 
     257             : // FinishBlock writes the columnar block to a heap-allocated byte slice.
     258             : // FinishBlock assumes all columns have the same number of rows. If that's not
     259             : // the case, the caller should manually construct their own block.
     260           1 : func FinishBlock(rows int, writers []ColumnWriter) []byte {
     261           1 :         size := blockHeaderSize(len(writers), 0)
     262           1 :         nCols := uint16(0)
     263           1 :         for _, cw := range writers {
     264           1 :                 size = cw.Size(rows, size)
     265           1 :                 nCols += uint16(cw.NumColumns())
     266           1 :         }
     267           1 :         size++ // +1 for the trailing version byte.
     268           1 : 
     269           1 :         var enc blockEncoder
     270           1 :         enc.init(int(size), Header{
     271           1 :                 Version: Version1,
     272           1 :                 Columns: nCols,
     273           1 :                 Rows:    uint32(rows),
     274           1 :         }, 0)
     275           1 :         for _, cw := range writers {
     276           1 :                 enc.encode(rows, cw)
     277           1 :         }
     278           1 :         return enc.finish()
     279             : }
     280             : 
     281             : // DecodeColumn decodes the col'th column of the provided reader's block as a
     282             : // column of dataType using decodeFunc.
     283             : func DecodeColumn[V any](
     284             :         r *BlockDecoder, col int, rows int, dataType DataType, decodeFunc DecodeFunc[V],
     285           1 : ) V {
     286           1 :         if uint16(col) >= r.header.Columns {
     287           0 :                 panic(errors.AssertionFailedf("column %d is out of range [0, %d)", col, r.header.Columns))
     288             :         }
     289           1 :         if dt := r.dataType(col); dt != dataType {
     290           0 :                 panic(errors.AssertionFailedf("column %d is type %s; not %s", col, dt, dataType))
     291             :         }
     292           1 :         v, endOffset := decodeFunc(r.data, r.pageStart(col), rows)
     293           1 :         if nextColumnOff := r.pageStart(col + 1); endOffset != nextColumnOff {
     294           0 :                 panic(errors.AssertionFailedf("column %d decoded to offset %d; expected %d", col, endOffset, nextColumnOff))
     295             :         }
     296           1 :         return v
     297             : }
     298             : 
     299             : // A BlockDecoder holds metadata for accessing the columns of a columnar block.
     300             : type BlockDecoder struct {
     301             :         data             []byte
     302             :         header           Header
     303             :         customHeaderSize uint32
     304             : }
     305             : 
     306             : // DecodeBlock decodes the header of the provided columnar block and returns a
     307             : // new BlockDecoder configured to read from the block. The caller must ensure
     308             : // that the data is formatted as to the block layout specification.
     309           1 : func DecodeBlock(data []byte, customHeaderSize uint32) BlockDecoder {
     310           1 :         r := BlockDecoder{}
     311           1 :         r.Init(data, customHeaderSize)
     312           1 :         return r
     313           1 : }
     314             : 
     315             : // Init initializes a BlockDecoder with the data contained in the provided block.
     316           1 : func (d *BlockDecoder) Init(data []byte, customHeaderSize uint32) {
     317           1 :         *d = BlockDecoder{
     318           1 :                 data:             data,
     319           1 :                 header:           DecodeHeader(data[customHeaderSize:]),
     320           1 :                 customHeaderSize: customHeaderSize,
     321           1 :         }
     322           1 : }
     323             : 
     324             : // Rows returns the number of rows in the block, as indicated by the block header.
     325           0 : func (d *BlockDecoder) Rows() int {
     326           0 :         return int(d.header.Rows)
     327           0 : }
     328             : 
     329             : // DataType returns the data type of the col'th column. Every column's data type
     330             : // is encoded within the block header.
     331           1 : func (d *BlockDecoder) DataType(col int) DataType {
     332           1 :         if uint16(col) >= d.header.Columns {
     333           0 :                 panic(errors.AssertionFailedf("column %d is out of range [0, %d)", col, d.header.Columns))
     334             :         }
     335           1 :         return d.dataType(col)
     336             : }
     337             : 
     338           1 : func (d *BlockDecoder) dataType(col int) DataType {
     339           1 :         return DataType(*(*uint8)(d.pointer(d.customHeaderSize + blockHeaderBaseSize + columnHeaderSize*uint32(col))))
     340           1 : }
     341             : 
     342             : // Bitmap retrieves the col'th column as a bitmap. The column must be of type
     343             : // DataTypeBool.
     344           1 : func (d *BlockDecoder) Bitmap(col int) Bitmap {
     345           1 :         return DecodeColumn(d, col, int(d.header.Rows), DataTypeBool, DecodeBitmap)
     346           1 : }
     347             : 
     348             : // RawBytes retrieves the col'th column as a column of byte slices. The column
     349             : // must be of type DataTypeBytes.
     350           1 : func (d *BlockDecoder) RawBytes(col int) RawBytes {
     351           1 :         return DecodeColumn(d, col, int(d.header.Rows), DataTypeBytes, DecodeRawBytes)
     352           1 : }
     353             : 
     354             : // PrefixBytes retrieves the col'th column as a prefix-compressed byte slice column. The column
     355             : // must be of type DataTypePrefixBytes.
     356           1 : func (d *BlockDecoder) PrefixBytes(col int) PrefixBytes {
     357           1 :         return DecodeColumn(d, col, int(d.header.Rows), DataTypePrefixBytes, DecodePrefixBytes)
     358           1 : }
     359             : 
     360             : // Uints retrieves the col'th column as a column of uints. The column must be
     361             : // of type DataTypeUint.
     362           1 : func (d *BlockDecoder) Uints(col int) UnsafeUints {
     363           1 :         return DecodeColumn(d, col, int(d.header.Rows), DataTypeUint, DecodeUnsafeUints)
     364           1 : }
     365             : 
     366           1 : func (d *BlockDecoder) pageStart(col int) uint32 {
     367           1 :         if uint16(col) >= d.header.Columns {
     368           1 :                 // -1 for the trailing version byte
     369           1 :                 return uint32(len(d.data) - 1)
     370           1 :         }
     371           1 :         return binary.LittleEndian.Uint32(
     372           1 :                 unsafe.Slice((*byte)(d.pointer(d.customHeaderSize+uint32(blockHeaderBaseSize+columnHeaderSize*col+1))), 4))
     373             : }
     374             : 
     375           1 : func (d *BlockDecoder) pointer(offset uint32) unsafe.Pointer {
     376           1 :         return unsafe.Pointer(uintptr(unsafe.Pointer(&d.data[0])) + uintptr(offset))
     377           1 : }
     378             : 
     379             : // FormattedString returns a formatted representation of the block's binary
     380             : // data.
     381           1 : func (d *BlockDecoder) FormattedString() string {
     382           1 :         f := binfmt.New(d.data)
     383           1 :         d.headerToBinFormatter(f)
     384           1 :         for i := 0; i < int(d.header.Columns); i++ {
     385           1 :                 d.columnToBinFormatter(f, i, int(d.header.Rows))
     386           1 :         }
     387           1 :         f.HexBytesln(1, "block trailer padding")
     388           1 :         return f.String()
     389             : }
     390             : 
     391           1 : func (d *BlockDecoder) headerToBinFormatter(f *binfmt.Formatter) {
     392           1 :         f.CommentLine("columnar block header")
     393           1 :         f.HexBytesln(1, "version %v", Version(f.PeekUint(1)))
     394           1 :         f.HexBytesln(2, "%d columns", d.header.Columns)
     395           1 :         f.HexBytesln(4, "%d rows", d.header.Rows)
     396           1 :         for i := 0; i < int(d.header.Columns); i++ {
     397           1 :                 f.Byte("col %d: %s", i, d.DataType(i))
     398           1 :                 f.HexBytesln(4, "col %d: page start %d", i, d.pageStart(i))
     399           1 :         }
     400             : }
     401             : 
     402             : func (d *BlockDecoder) formatColumn(
     403             :         f *binfmt.Formatter, col int, fn func(*binfmt.Formatter, DataType),
     404           1 : ) {
     405           1 :         f.CommentLine("data for column %d", col)
     406           1 :         dataType := d.DataType(col)
     407           1 :         colSize := d.pageStart(col+1) - d.pageStart(col)
     408           1 :         endOff := f.Offset() + int(colSize)
     409           1 :         fn(f, dataType)
     410           1 : 
     411           1 :         // We expect formatting the column data to have consumed all the bytes
     412           1 :         // between the column's pageOffset and the next column's pageOffset.
     413           1 :         switch v := endOff - f.Offset(); cmp.Compare[int](v, 0) {
     414           0 :         case +1:
     415           0 :                 panic(fmt.Sprintf("expected f.Offset() = %d, but found %d; did column %s format too few bytes?", endOff, f.Offset(), dataType))
     416           1 :         case 0:
     417           0 :         case -1:
     418           0 :                 panic(fmt.Sprintf("expected f.Offset() = %d, but found %d; did column %s format too many bytes?", endOff, f.Offset(), dataType))
     419             :         }
     420             : }
     421             : 
     422           1 : func (d *BlockDecoder) columnToBinFormatter(f *binfmt.Formatter, col, rows int) {
     423           1 :         d.formatColumn(f, col, func(f *binfmt.Formatter, dataType DataType) {
     424           1 :                 switch dataType {
     425           1 :                 case DataTypeBool:
     426           1 :                         bitmapToBinFormatter(f, rows)
     427           1 :                 case DataTypeUint:
     428           1 :                         uintsToBinFormatter(f, rows, nil)
     429           1 :                 case DataTypePrefixBytes:
     430           1 :                         prefixBytesToBinFormatter(f, rows, nil)
     431           1 :                 case DataTypeBytes:
     432           1 :                         rawBytesToBinFormatter(f, rows, nil)
     433           0 :                 default:
     434           0 :                         panic("unimplemented")
     435             :                 }
     436             :         })
     437             : 
     438             : }

Generated by: LCOV version 1.14