LCOV - code coverage report
Current view: top level - pebble/sstable/valblk - writer.go (source / functions) Hit Total Coverage
Test: 2025-01-07 08:17Z 28edac9f - tests only.lcov Lines: 190 205 92.7 %
Date: 2025-01-07 08:17:39 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package valblk
       6             : 
       7             : import (
       8             :         "encoding/binary"
       9             :         "math/rand/v2"
      10             :         "sync"
      11             : 
      12             :         "github.com/cockroachdb/errors"
      13             :         "github.com/cockroachdb/pebble/internal/invariants"
      14             :         "github.com/cockroachdb/pebble/sstable/block"
      15             : )
      16             : 
      17             : // Writer writes a sequence of value blocks, and the value blocks index, for a
      18             : // sstable.
      19             : type Writer struct {
      20             :         flush block.FlushGovernor
      21             :         // Configured compression.
      22             :         compression block.Compression
      23             :         // checksummer with configured checksum type.
      24             :         checksummer block.Checksummer
      25             :         // Block finished callback.
      26             :         blockFinishedFunc func(compressedSize int)
      27             : 
      28             :         // buf is the current block being written to (uncompressed).
      29             :         buf *blockBuffer
      30             :         // compressedBuf is used for compressing the block.
      31             :         compressedBuf *blockBuffer
      32             :         // Sequence of blocks that are finished.
      33             :         blocks []bufferedValueBlock
      34             :         // Cumulative value block bytes written so far.
      35             :         totalBlockBytes uint64
      36             :         numValues       uint64
      37             : }
      38             : 
      39             : type bufferedValueBlock struct {
      40             :         block  block.PhysicalBlock
      41             :         buffer *blockBuffer
      42             :         handle block.Handle
      43             : }
      44             : 
      45             : type blockBuffer struct {
      46             :         b []byte
      47             : }
      48             : 
      49             : // Pool of block buffers that should be roughly the blockSize.
      50             : var uncompressedValueBlockBufPool = sync.Pool{
      51           1 :         New: func() interface{} {
      52           1 :                 return &blockBuffer{}
      53           1 :         },
      54             : }
      55             : 
      56             : // Pool of block buffers for compressed value blocks. These may widely vary in
      57             : // size based on compression ratios.
      58             : var compressedValueBlockBufPool = sync.Pool{
      59           1 :         New: func() interface{} {
      60           1 :                 return &blockBuffer{}
      61           1 :         },
      62             : }
      63             : 
      64           1 : func releaseToValueBlockBufPool(pool *sync.Pool, b *blockBuffer) {
      65           1 :         // Don't pool buffers larger than 128KB, in case we had some rare large
      66           1 :         // values.
      67           1 :         if len(b.b) > 128*1024 {
      68           0 :                 return
      69           0 :         }
      70           1 :         if invariants.Enabled {
      71           1 :                 // Set the bytes to a random value. Cap the number of bytes being
      72           1 :                 // randomized to prevent test timeouts.
      73           1 :                 length := cap(b.b)
      74           1 :                 if length > 1000 {
      75           1 :                         length = 1000
      76           1 :                 }
      77           1 :                 b.b = b.b[:length:length]
      78           1 :                 for j := range b.b {
      79           1 :                         b.b[j] = byte(rand.Uint32())
      80           1 :                 }
      81             :         }
      82           1 :         pool.Put(b)
      83             : }
      84             : 
      85             : var valueBlockWriterPool = sync.Pool{
      86           1 :         New: func() interface{} {
      87           1 :                 return &Writer{}
      88           1 :         },
      89             : }
      90             : 
      91             : // NewWriter creates a new Writer of value blocks and value index blocks.
      92             : func NewWriter(
      93             :         flushGovernor block.FlushGovernor,
      94             :         compression block.Compression,
      95             :         checksumType block.ChecksumType,
      96             :         // compressedSize should exclude the block trailer.
      97             :         blockFinishedFunc func(compressedSize int),
      98           1 : ) *Writer {
      99           1 :         w := valueBlockWriterPool.Get().(*Writer)
     100           1 :         *w = Writer{
     101           1 :                 flush:       flushGovernor,
     102           1 :                 compression: compression,
     103           1 :                 checksummer: block.Checksummer{
     104           1 :                         Type: checksumType,
     105           1 :                 },
     106           1 :                 blockFinishedFunc: blockFinishedFunc,
     107           1 :                 buf:               uncompressedValueBlockBufPool.Get().(*blockBuffer),
     108           1 :                 compressedBuf:     compressedValueBlockBufPool.Get().(*blockBuffer),
     109           1 :                 blocks:            w.blocks[:0],
     110           1 :         }
     111           1 :         w.buf.b = w.buf.b[:0]
     112           1 :         w.compressedBuf.b = w.compressedBuf.b[:0]
     113           1 :         return w
     114           1 : }
     115             : 
     116             : // AddValue adds a value to the writer, returning a Handle referring to the
     117             : // stored value.
     118           1 : func (w *Writer) AddValue(v []byte) (Handle, error) {
     119           1 :         if invariants.Enabled && len(v) == 0 {
     120           0 :                 return Handle{}, errors.Errorf("cannot write empty value to value block")
     121           0 :         }
     122           1 :         w.numValues++
     123           1 :         blockLen := len(w.buf.b)
     124           1 :         valueLen := len(v)
     125           1 :         if w.flush.ShouldFlush(blockLen, blockLen+valueLen) {
     126           1 :                 // Block is not currently empty and adding this value will become too big,
     127           1 :                 // so finish this block.
     128           1 :                 w.compressAndFlush()
     129           1 :                 blockLen = len(w.buf.b)
     130           1 :                 if invariants.Enabled && blockLen != 0 {
     131           0 :                         panic("blockLen of new block should be 0")
     132             :                 }
     133             :         }
     134           1 :         vh := Handle{
     135           1 :                 ValueLen:      uint32(valueLen),
     136           1 :                 BlockNum:      uint32(len(w.blocks)),
     137           1 :                 OffsetInBlock: uint32(blockLen),
     138           1 :         }
     139           1 :         blockLen = int(vh.OffsetInBlock + vh.ValueLen)
     140           1 :         if cap(w.buf.b) < blockLen {
     141           1 :                 size := 2 * cap(w.buf.b)
     142           1 :                 if size < 1024 {
     143           1 :                         size = 1024
     144           1 :                 }
     145           1 :                 for size < blockLen {
     146           0 :                         size *= 2
     147           0 :                 }
     148           1 :                 buf := make([]byte, blockLen, size)
     149           1 :                 _ = copy(buf, w.buf.b)
     150           1 :                 w.buf.b = buf
     151           1 :         } else {
     152           1 :                 w.buf.b = w.buf.b[:blockLen]
     153           1 :         }
     154           1 :         buf := w.buf.b[vh.OffsetInBlock:]
     155           1 :         n := copy(buf, v)
     156           1 :         if n != len(buf) {
     157           0 :                 panic("incorrect length computation")
     158             :         }
     159           1 :         return vh, nil
     160             : }
     161             : 
     162             : // Size returns the total size of currently buffered value blocks.
     163           1 : func (w *Writer) Size() uint64 {
     164           1 :         sz := w.totalBlockBytes
     165           1 :         if w.buf != nil {
     166           1 :                 sz += uint64(len(w.buf.b))
     167           1 :         }
     168           1 :         return sz
     169             : }
     170             : 
     171           1 : func (w *Writer) compressAndFlush() {
     172           1 :         w.compressedBuf.b = w.compressedBuf.b[:cap(w.compressedBuf.b)]
     173           1 :         physicalBlock := block.CompressAndChecksum(&w.compressedBuf.b, w.buf.b, w.compression, &w.checksummer)
     174           1 :         bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(physicalBlock.LengthWithoutTrailer())}
     175           1 :         w.totalBlockBytes += uint64(physicalBlock.LengthWithTrailer())
     176           1 :         // blockFinishedFunc length excludes the block trailer.
     177           1 :         w.blockFinishedFunc(physicalBlock.LengthWithoutTrailer())
     178           1 :         b := bufferedValueBlock{
     179           1 :                 block:  physicalBlock,
     180           1 :                 handle: bh,
     181           1 :         }
     182           1 : 
     183           1 :         // We'll hand off a buffer to w.blocks and get a new one. Which buffer we're
     184           1 :         // handing off depends on the outcome of compression.
     185           1 :         if physicalBlock.IsCompressed() {
     186           1 :                 b.buffer = w.compressedBuf
     187           1 :                 w.compressedBuf = compressedValueBlockBufPool.Get().(*blockBuffer)
     188           1 :         } else {
     189           1 :                 b.buffer = w.buf
     190           1 :                 w.buf = uncompressedValueBlockBufPool.Get().(*blockBuffer)
     191           1 :         }
     192           1 :         w.blocks = append(w.blocks, b)
     193           1 :         w.buf.b = w.buf.b[:0]
     194             : }
     195             : 
     196           1 : func (w *Writer) computeChecksum(blk []byte) {
     197           1 :         n := len(blk) - block.TrailerLen
     198           1 :         checksum := w.checksummer.Checksum(blk[:n], blk[n])
     199           1 :         binary.LittleEndian.PutUint32(blk[n+1:], checksum)
     200           1 : }
     201             : 
     202             : // Finish finishes writing the value blocks and value blocks index, writing the
     203             : // buffered blocks out to the provider layout writer.
     204           1 : func (w *Writer) Finish(layout LayoutWriter, fileOffset uint64) (IndexHandle, WriterStats, error) {
     205           1 :         if len(w.buf.b) > 0 {
     206           1 :                 w.compressAndFlush()
     207           1 :         }
     208           1 :         n := len(w.blocks)
     209           1 :         if n == 0 {
     210           1 :                 return IndexHandle{}, WriterStats{}, nil
     211           1 :         }
     212           1 :         largestOffset := uint64(0)
     213           1 :         largestLength := uint64(0)
     214           1 :         for i := range w.blocks {
     215           1 :                 _, err := layout.WriteValueBlock(w.blocks[i].block)
     216           1 :                 if err != nil {
     217           0 :                         return IndexHandle{}, WriterStats{}, err
     218           0 :                 }
     219           1 :                 w.blocks[i].handle.Offset += fileOffset
     220           1 :                 largestOffset = w.blocks[i].handle.Offset
     221           1 :                 if largestLength < w.blocks[i].handle.Length {
     222           1 :                         largestLength = w.blocks[i].handle.Length
     223           1 :                 }
     224             :         }
     225           1 :         vbihOffset := fileOffset + w.totalBlockBytes
     226           1 : 
     227           1 :         vbih := IndexHandle{
     228           1 :                 Handle: block.Handle{
     229           1 :                         Offset: vbihOffset,
     230           1 :                 },
     231           1 :                 BlockNumByteLength:    uint8(lenLittleEndian(uint64(n - 1))),
     232           1 :                 BlockOffsetByteLength: uint8(lenLittleEndian(largestOffset)),
     233           1 :                 BlockLengthByteLength: uint8(lenLittleEndian(largestLength)),
     234           1 :         }
     235           1 :         var err error
     236           1 :         if n > 0 {
     237           1 :                 if vbih, err = w.writeValueBlocksIndex(layout, vbih); err != nil {
     238           0 :                         return IndexHandle{}, WriterStats{}, err
     239           0 :                 }
     240             :         }
     241           1 :         stats := WriterStats{
     242           1 :                 NumValueBlocks:          uint64(n),
     243           1 :                 NumValuesInValueBlocks:  w.numValues,
     244           1 :                 ValueBlocksAndIndexSize: w.totalBlockBytes + vbih.Handle.Length + block.TrailerLen,
     245           1 :         }
     246           1 :         return vbih, stats, err
     247             : }
     248             : 
     249           1 : func (w *Writer) writeValueBlocksIndex(layout LayoutWriter, h IndexHandle) (IndexHandle, error) {
     250           1 :         blockLen :=
     251           1 :                 int(h.BlockNumByteLength+h.BlockOffsetByteLength+h.BlockLengthByteLength) * len(w.blocks)
     252           1 :         h.Handle.Length = uint64(blockLen)
     253           1 :         blockLen += block.TrailerLen
     254           1 :         var buf []byte
     255           1 :         if cap(w.buf.b) < blockLen {
     256           1 :                 buf = make([]byte, blockLen)
     257           1 :                 w.buf.b = buf
     258           1 :         } else {
     259           1 :                 buf = w.buf.b[:blockLen]
     260           1 :         }
     261           1 :         b := buf
     262           1 :         for i := range w.blocks {
     263           1 :                 littleEndianPut(uint64(i), b, int(h.BlockNumByteLength))
     264           1 :                 b = b[int(h.BlockNumByteLength):]
     265           1 :                 littleEndianPut(w.blocks[i].handle.Offset, b, int(h.BlockOffsetByteLength))
     266           1 :                 b = b[int(h.BlockOffsetByteLength):]
     267           1 :                 littleEndianPut(w.blocks[i].handle.Length, b, int(h.BlockLengthByteLength))
     268           1 :                 b = b[int(h.BlockLengthByteLength):]
     269           1 :         }
     270           1 :         if len(b) != block.TrailerLen {
     271           0 :                 panic("incorrect length calculation")
     272             :         }
     273           1 :         b[0] = byte(block.NoCompressionIndicator)
     274           1 :         w.computeChecksum(buf)
     275           1 :         if _, err := layout.WriteValueIndexBlock(buf, h); err != nil {
     276           0 :                 return IndexHandle{}, err
     277           0 :         }
     278           1 :         return h, nil
     279             : }
     280             : 
     281             : // Release relinquishes the resources held by the writer and returns the Writer
     282             : // to a pool.
     283           1 : func (w *Writer) Release() {
     284           1 :         for i := range w.blocks {
     285           1 :                 if w.blocks[i].block.IsCompressed() {
     286           1 :                         releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.blocks[i].buffer)
     287           1 :                 } else {
     288           1 :                         releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.blocks[i].buffer)
     289           1 :                 }
     290           1 :                 w.blocks[i].buffer = nil
     291             :         }
     292           1 :         if w.buf != nil {
     293           1 :                 releaseToValueBlockBufPool(&uncompressedValueBlockBufPool, w.buf)
     294           1 :         }
     295           1 :         if w.compressedBuf != nil {
     296           1 :                 releaseToValueBlockBufPool(&compressedValueBlockBufPool, w.compressedBuf)
     297           1 :         }
     298           1 :         *w = Writer{
     299           1 :                 blocks: w.blocks[:0],
     300           1 :         }
     301           1 :         valueBlockWriterPool.Put(w)
     302             : }
     303             : 
     304             : // WriterStats contains statistics about the value blocks and value index block
     305             : // written by a Writer.
     306             : type WriterStats struct {
     307             :         NumValueBlocks         uint64
     308             :         NumValuesInValueBlocks uint64
     309             :         // Includes both value blocks and value index block.
     310             :         ValueBlocksAndIndexSize uint64
     311             : }
     312             : 
     313             : // TODO(jackson): Refactor the Writer into an Encoder and move the onus of
     314             : // calling into the sstable.layoutWriter onto the sstable.Writer.
     315             : 
     316             : // LayoutWriter defines the interface for a writer that writes out serialized
     317             : // value and value index blocks.
     318             : type LayoutWriter interface {
     319             :         // WriteValueBlock writes a pre-finished value block (with the trailer) to
     320             :         // the writer.
     321             :         WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error)
     322             :         // WriteValueBlockIndex writes a pre-finished value block index to the
     323             :         // writer.
     324             :         WriteValueIndexBlock(blk []byte, vbih IndexHandle) (block.Handle, error)
     325             : }

Generated by: LCOV version 1.14