LCOV - code coverage report
Current view: top level - pebble/sstable - layout.go (source / functions) Hit Total Coverage
Test: 2024-11-25 08:17Z 3ec779d3 - tests only.lcov Lines: 457 559 81.8 %
Date: 2024-11-25 08:17:50 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "slices"
      15             :         "unsafe"
      16             : 
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/binfmt"
      20             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      21             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      22             :         "github.com/cockroachdb/pebble/internal/treeprinter"
      23             :         "github.com/cockroachdb/pebble/objstorage"
      24             :         "github.com/cockroachdb/pebble/sstable/block"
      25             :         "github.com/cockroachdb/pebble/sstable/colblk"
      26             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      27             :         "github.com/cockroachdb/pebble/sstable/valblk"
      28             : )
      29             : 
      30             : // Layout describes the block organization of an sstable.
      31             : type Layout struct {
      32             :         // NOTE: changes to fields in this struct should also be reflected in
      33             :         // ValidateBlockChecksums, which validates a static list of BlockHandles
      34             :         // referenced in this struct.
      35             : 
      36             :         Data       []block.HandleWithProperties
      37             :         Index      []block.Handle
      38             :         TopIndex   block.Handle
      39             :         Filter     []NamedBlockHandle
      40             :         RangeDel   block.Handle
      41             :         RangeKey   block.Handle
      42             :         ValueBlock []block.Handle
      43             :         ValueIndex block.Handle
      44             :         Properties block.Handle
      45             :         MetaIndex  block.Handle
      46             :         Footer     block.Handle
      47             :         Format     TableFormat
      48             : }
      49             : 
      50             : // NamedBlockHandle holds a block.Handle and corresponding name.
      51             : type NamedBlockHandle struct {
      52             :         block.Handle
      53             :         Name string
      54             : }
      55             : 
      56             : // FilterByName retrieves the block handle of the named filter, if it exists.
      57             : // The provided the name should be the name as it appears in the metaindex
      58             : // block.
      59           1 : func (l *Layout) FilterByName(name string) (block.Handle, bool) {
      60           1 :         for i := range l.Filter {
      61           1 :                 if l.Filter[i].Name == name {
      62           1 :                         return l.Filter[i].Handle, true
      63           1 :                 }
      64             :         }
      65           1 :         return block.Handle{}, false
      66             : }
      67             : 
      68           1 : func (l *Layout) orderedBlocks() []NamedBlockHandle {
      69           1 :         var blocks []NamedBlockHandle
      70           1 :         for i := range l.Data {
      71           1 :                 blocks = append(blocks, NamedBlockHandle{l.Data[i].Handle, "data"})
      72           1 :         }
      73           1 :         for i := range l.Index {
      74           1 :                 blocks = append(blocks, NamedBlockHandle{l.Index[i], "index"})
      75           1 :         }
      76           1 :         if l.TopIndex.Length != 0 {
      77           1 :                 blocks = append(blocks, NamedBlockHandle{l.TopIndex, "top-index"})
      78           1 :         }
      79           1 :         blocks = append(blocks, l.Filter...)
      80           1 :         if l.RangeDel.Length != 0 {
      81           1 :                 blocks = append(blocks, NamedBlockHandle{l.RangeDel, "range-del"})
      82           1 :         }
      83           1 :         if l.RangeKey.Length != 0 {
      84           1 :                 blocks = append(blocks, NamedBlockHandle{l.RangeKey, "range-key"})
      85           1 :         }
      86           1 :         for i := range l.ValueBlock {
      87           1 :                 blocks = append(blocks, NamedBlockHandle{l.ValueBlock[i], "value-block"})
      88           1 :         }
      89           1 :         if l.ValueIndex.Length != 0 {
      90           1 :                 blocks = append(blocks, NamedBlockHandle{l.ValueIndex, "value-index"})
      91           1 :         }
      92           1 :         if l.Properties.Length != 0 {
      93           1 :                 blocks = append(blocks, NamedBlockHandle{l.Properties, "properties"})
      94           1 :         }
      95           1 :         if l.MetaIndex.Length != 0 {
      96           1 :                 blocks = append(blocks, NamedBlockHandle{l.MetaIndex, "meta-index"})
      97           1 :         }
      98           1 :         if l.Footer.Length != 0 {
      99           1 :                 if l.Footer.Length == levelDBFooterLen {
     100           1 :                         blocks = append(blocks, NamedBlockHandle{l.Footer, "leveldb-footer"})
     101           1 :                 } else {
     102           1 :                         blocks = append(blocks, NamedBlockHandle{l.Footer, "footer"})
     103           1 :                 }
     104             :         }
     105           1 :         slices.SortFunc(blocks, func(a, b NamedBlockHandle) int {
     106           1 :                 return cmp.Compare(a.Offset, b.Offset)
     107           1 :         })
     108           1 :         return blocks
     109             : }
     110             : 
     111             : // Describe returns a description of the layout. If the verbose parameter is
     112             : // true, details of the structure of each block are returned as well.
     113             : // If verbose is true and fmtKV is non-nil, the output includes the KVs (as formatted by this function).
     114             : func (l *Layout) Describe(
     115             :         verbose bool, r *Reader, fmtKV func(key *base.InternalKey, value []byte) string,
     116           1 : ) string {
     117           1 :         ctx := context.TODO()
     118           1 : 
     119           1 :         blocks := l.orderedBlocks()
     120           1 :         formatting := rowblkFormatting
     121           1 :         if l.Format.BlockColumnar() {
     122           1 :                 formatting = colblkFormatting
     123           1 :         }
     124             : 
     125           1 :         tp := treeprinter.New()
     126           1 :         root := tp.Child("sstable")
     127           1 : 
     128           1 :         for i := range blocks {
     129           1 :                 b := &blocks[i]
     130           1 :                 tpNode := root.Childf("%s  offset: %d  length: %d", b.Name, b.Offset, b.Length)
     131           1 : 
     132           1 :                 if !verbose {
     133           1 :                         continue
     134             :                 }
     135           1 :                 if b.Name == "filter" {
     136           0 :                         continue
     137             :                 }
     138             : 
     139           1 :                 if b.Name == "footer" || b.Name == "leveldb-footer" {
     140           1 :                         trailer, offset := make([]byte, b.Length), 0
     141           1 :                         _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset))
     142           1 : 
     143           1 :                         if b.Name == "footer" {
     144           1 :                                 checksumType := block.ChecksumType(trailer[0])
     145           1 :                                 tpNode.Childf("%03d  checksum type: %s", offset, checksumType)
     146           1 :                                 trailer, offset = trailer[1:], offset+1
     147           1 :                         }
     148             : 
     149           1 :                         metaHandle, n := binary.Uvarint(trailer)
     150           1 :                         metaLen, m := binary.Uvarint(trailer[n:])
     151           1 :                         tpNode.Childf("%03d  meta: offset=%d, length=%d", offset, metaHandle, metaLen)
     152           1 :                         trailer, offset = trailer[n+m:], offset+n+m
     153           1 : 
     154           1 :                         indexHandle, n := binary.Uvarint(trailer)
     155           1 :                         indexLen, m := binary.Uvarint(trailer[n:])
     156           1 :                         tpNode.Childf("%03d  index: offset=%d, length=%d", offset, indexHandle, indexLen)
     157           1 :                         trailer, offset = trailer[n+m:], offset+n+m
     158           1 : 
     159           1 :                         trailing := 12
     160           1 :                         if b.Name == "leveldb-footer" {
     161           0 :                                 trailing = 8
     162           0 :                         }
     163             : 
     164           1 :                         offset += len(trailer) - trailing
     165           1 :                         trailer = trailer[len(trailer)-trailing:]
     166           1 : 
     167           1 :                         if b.Name == "footer" {
     168           1 :                                 version := trailer[:4]
     169           1 :                                 tpNode.Childf("%03d  version: %d", offset, binary.LittleEndian.Uint32(version))
     170           1 :                                 trailer, offset = trailer[4:], offset+4
     171           1 :                         }
     172             : 
     173           1 :                         magicNumber := trailer
     174           1 :                         tpNode.Childf("%03d  magic number: 0x%x", offset, magicNumber)
     175           1 : 
     176           1 :                         continue
     177             :                 }
     178             : 
     179             :                 // Read the block and format it. Returns an error if we couldn't read the
     180             :                 // block.
     181           1 :                 err := func() error {
     182           1 :                         var err error
     183           1 :                         var h block.BufferHandle
     184           1 :                         // Defer release of any block handle that will have been read.
     185           1 :                         defer func() { h.Release() }()
     186             : 
     187           1 :                         switch b.Name {
     188           1 :                         case "data":
     189           1 :                                 h, err = r.readDataBlock(ctx, noEnv, noReadHandle, b.Handle)
     190           1 :                                 if err != nil {
     191           0 :                                         return err
     192           0 :                                 }
     193           1 :                                 if fmtKV == nil {
     194           1 :                                         formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), nil)
     195           1 :                                 } else {
     196           1 :                                         var lastKey InternalKey
     197           1 :                                         formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), func(key *base.InternalKey, value []byte) string {
     198           1 :                                                 v := fmtKV(key, value)
     199           1 :                                                 if base.InternalCompare(r.Compare, lastKey, *key) >= 0 {
     200           1 :                                                         v += " WARNING: OUT OF ORDER KEYS!"
     201           1 :                                                 }
     202           1 :                                                 lastKey.Trailer = key.Trailer
     203           1 :                                                 lastKey.UserKey = append(lastKey.UserKey[:0], key.UserKey...)
     204           1 :                                                 return v
     205             :                                         })
     206             :                                 }
     207             : 
     208           1 :                         case "range-del":
     209           1 :                                 h, err = r.readRangeDelBlock(ctx, noEnv, noReadHandle, b.Handle)
     210           1 :                                 if err != nil {
     211           0 :                                         return err
     212           0 :                                 }
     213             :                                 // TODO(jackson): colblk ignores fmtKV, because it doesn't
     214             :                                 // make sense in the context.
     215           1 :                                 formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
     216             : 
     217           1 :                         case "range-key":
     218           1 :                                 h, err = r.readRangeKeyBlock(ctx, noEnv, noReadHandle, b.Handle)
     219           1 :                                 if err != nil {
     220           0 :                                         return err
     221           0 :                                 }
     222             :                                 // TODO(jackson): colblk ignores fmtKV, because it doesn't
     223             :                                 // make sense in the context.
     224           1 :                                 formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
     225             : 
     226           1 :                         case "index", "top-index":
     227           1 :                                 h, err = r.readIndexBlock(ctx, noEnv, noReadHandle, b.Handle)
     228           1 :                                 if err != nil {
     229           0 :                                         return err
     230           0 :                                 }
     231           1 :                                 formatting.formatIndexBlock(tpNode, r, *b, h.BlockData())
     232             : 
     233           1 :                         case "properties":
     234           1 :                                 h, err = r.readBlockInternal(ctx, noEnv, noReadHandle, b.Handle, noInitBlockMetadataFn)
     235           1 :                                 if err != nil {
     236           0 :                                         return err
     237           0 :                                 }
     238           1 :                                 iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
     239           1 :                                 iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
     240           1 :                                         fmt.Fprintf(w, "%05d    %s (%d)", enc.Offset, key.UserKey, enc.Length)
     241           1 :                                 })
     242             : 
     243           1 :                         case "meta-index":
     244           1 :                                 if b.Handle != r.metaindexBH {
     245           0 :                                         return base.AssertionFailedf("range-del block handle does not match rangeDelBH")
     246           0 :                                 }
     247           1 :                                 h, err = r.readMetaindexBlock(ctx, noEnv, noReadHandle)
     248           1 :                                 if err != nil {
     249           0 :                                         return err
     250           0 :                                 }
     251           1 :                                 iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
     252           1 :                                 iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
     253           1 :                                         var bh block.Handle
     254           1 :                                         var n int
     255           1 :                                         var vbih valblk.IndexHandle
     256           1 :                                         isValueBlocksIndexHandle := false
     257           1 :                                         if bytes.Equal(iter.Key().UserKey, []byte(metaValueIndexName)) {
     258           1 :                                                 vbih, n, err = valblk.DecodeIndexHandle(value)
     259           1 :                                                 bh = vbih.Handle
     260           1 :                                                 isValueBlocksIndexHandle = true
     261           1 :                                         } else {
     262           1 :                                                 bh, n = block.DecodeHandle(value)
     263           1 :                                         }
     264           1 :                                         if n == 0 || n != len(value) {
     265           0 :                                                 fmt.Fprintf(w, "%04d    [err: %s]\n", enc.Offset, err)
     266           0 :                                                 return
     267           0 :                                         }
     268           1 :                                         var vbihStr string
     269           1 :                                         if isValueBlocksIndexHandle {
     270           1 :                                                 vbihStr = fmt.Sprintf(" value-blocks-index-lengths: %d(num), %d(offset), %d(length)",
     271           1 :                                                         vbih.BlockNumByteLength, vbih.BlockOffsetByteLength, vbih.BlockLengthByteLength)
     272           1 :                                         }
     273           1 :                                         fmt.Fprintf(w, "%04d    %s block:%d/%d%s",
     274           1 :                                                 uint64(enc.Offset), iter.Key().UserKey, bh.Offset, bh.Length, vbihStr)
     275             :                                 })
     276             : 
     277           1 :                         case "value-block":
     278             :                                 // We don't peer into the value-block since it can't be interpreted
     279             :                                 // without the valueHandles.
     280           1 :                         case "value-index":
     281             :                                 // We have already read the value-index to construct the list of
     282             :                                 // value-blocks, so no need to do it again.
     283             :                         }
     284             : 
     285             :                         // Format the trailer.
     286           1 :                         trailer := make([]byte, block.TrailerLen)
     287           1 :                         _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset+b.Length))
     288           1 :                         algo := block.CompressionIndicator(trailer[0])
     289           1 :                         checksum := binary.LittleEndian.Uint32(trailer[1:])
     290           1 :                         tpNode.Childf("trailer [compression=%s checksum=0x%04x]", algo, checksum)
     291           1 :                         return nil
     292             :                 }()
     293           1 :                 if err != nil {
     294           0 :                         tpNode.Childf("error reading block: %v", err)
     295           0 :                 }
     296             :         }
     297           1 :         return tp.String()
     298             : }
     299             : 
     300             : type blockFormatting struct {
     301             :         formatIndexBlock   formatBlockFunc
     302             :         formatDataBlock    formatBlockFuncKV
     303             :         formatKeyspanBlock formatBlockFuncKV
     304             : }
     305             : 
     306             : type (
     307             :         formatBlockFunc   func(treeprinter.Node, *Reader, NamedBlockHandle, []byte) error
     308             :         formatBlockFuncKV func(treeprinter.Node, *Reader, NamedBlockHandle, []byte, func(*base.InternalKey, []byte) string) error
     309             : )
     310             : 
     311             : var (
     312             :         rowblkFormatting = blockFormatting{
     313             :                 formatIndexBlock:   formatRowblkIndexBlock,
     314             :                 formatDataBlock:    formatRowblkDataBlock,
     315             :                 formatKeyspanBlock: formatRowblkDataBlock,
     316             :         }
     317             :         colblkFormatting = blockFormatting{
     318             :                 formatIndexBlock:   formatColblkIndexBlock,
     319             :                 formatDataBlock:    formatColblkDataBlock,
     320             :                 formatKeyspanBlock: formatColblkKeyspanBlock,
     321             :         }
     322             : )
     323             : 
     324           1 : func formatColblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
     325           1 :         var iter colblk.IndexIter
     326           1 :         if err := iter.Init(r.Compare, r.Split, data, NoTransforms); err != nil {
     327           0 :                 return err
     328           0 :         }
     329           1 :         defer iter.Close()
     330           1 :         i := 0
     331           1 :         for v := iter.First(); v; v = iter.Next() {
     332           1 :                 bh, err := iter.BlockHandleWithProperties()
     333           1 :                 if err != nil {
     334           0 :                         return err
     335           0 :                 }
     336           1 :                 tp.Childf("%05d    block:%d/%d\n", i, bh.Offset, bh.Length)
     337           1 :                 i++
     338             :         }
     339           1 :         return nil
     340             : }
     341             : 
     342             : func formatColblkDataBlock(
     343             :         tp treeprinter.Node,
     344             :         r *Reader,
     345             :         b NamedBlockHandle,
     346             :         data []byte,
     347             :         fmtKV func(key *base.InternalKey, value []byte) string,
     348           1 : ) error {
     349           1 :         var decoder colblk.DataBlockDecoder
     350           1 :         decoder.Init(r.keySchema, data)
     351           1 :         f := binfmt.New(data)
     352           1 :         decoder.Describe(f, tp)
     353           1 : 
     354           1 :         if fmtKV != nil {
     355           1 :                 var iter colblk.DataBlockIter
     356           1 :                 iter.InitOnce(r.keySchema, r.Compare, r.Split, describingLazyValueHandler{})
     357           1 :                 if err := iter.Init(&decoder, block.IterTransforms{}); err != nil {
     358           0 :                         return err
     359           0 :                 }
     360           1 :                 defer iter.Close()
     361           1 :                 for kv := iter.First(); kv != nil; kv = iter.Next() {
     362           1 :                         tp.Child(fmtKV(&kv.K, kv.V.ValueOrHandle))
     363           1 :                 }
     364             :         }
     365           1 :         return nil
     366             : }
     367             : 
     368             : // describingLazyValueHandler is a block.GetLazyValueForPrefixAndValueHandler
     369             : // that replaces a value handle with an in-place value describing the handle.
     370             : type describingLazyValueHandler struct{}
     371             : 
     372             : // Assert that debugLazyValueHandler implements the
     373             : // block.GetLazyValueForPrefixAndValueHandler interface.
     374             : var _ block.GetLazyValueForPrefixAndValueHandler = describingLazyValueHandler{}
     375             : 
     376             : func (describingLazyValueHandler) GetLazyValueForPrefixAndValueHandle(
     377             :         handle []byte,
     378           1 : ) base.LazyValue {
     379           1 :         vh := valblk.DecodeHandle(handle[1:])
     380           1 :         return base.LazyValue{ValueOrHandle: []byte(fmt.Sprintf("value handle %+v", vh))}
     381           1 : }
     382             : 
     383             : func formatColblkKeyspanBlock(
     384             :         tp treeprinter.Node,
     385             :         r *Reader,
     386             :         b NamedBlockHandle,
     387             :         data []byte,
     388             :         _ func(*base.InternalKey, []byte) string,
     389           1 : ) error {
     390           1 :         var decoder colblk.KeyspanDecoder
     391           1 :         decoder.Init(data)
     392           1 :         f := binfmt.New(data)
     393           1 :         decoder.Describe(f, tp)
     394           1 :         return nil
     395           1 : }
     396             : 
     397           1 : func formatRowblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
     398           1 :         iter, err := rowblk.NewIter(r.Compare, r.Split, data, NoTransforms)
     399           1 :         if err != nil {
     400           0 :                 return err
     401           0 :         }
     402           1 :         iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
     403           1 :                 bh, err := block.DecodeHandleWithProperties(value)
     404           1 :                 if err != nil {
     405           0 :                         fmt.Fprintf(w, "%05d    [err: %s]\n", enc.Offset, err)
     406           0 :                         return
     407           0 :                 }
     408           1 :                 fmt.Fprintf(w, "%05d    block:%d/%d", enc.Offset, bh.Offset, bh.Length)
     409           1 :                 if enc.IsRestart {
     410           1 :                         fmt.Fprintf(w, " [restart]")
     411           1 :                 }
     412             :         })
     413           1 :         return nil
     414             : }
     415             : 
     416             : func formatRowblkDataBlock(
     417             :         tp treeprinter.Node,
     418             :         r *Reader,
     419             :         b NamedBlockHandle,
     420             :         data []byte,
     421             :         fmtRecord func(key *base.InternalKey, value []byte) string,
     422           1 : ) error {
     423           1 :         iter, err := rowblk.NewIter(r.Compare, r.Split, data, NoTransforms)
     424           1 :         if err != nil {
     425           0 :                 return err
     426           0 :         }
     427           1 :         iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
     428           1 :                 // The format of the numbers in the record line is:
     429           1 :                 //
     430           1 :                 //   (<total> = <length> [<shared>] + <unshared> + <value>)
     431           1 :                 //
     432           1 :                 // <total>    is the total number of bytes for the record.
     433           1 :                 // <length>   is the size of the 3 varint encoded integers for <shared>,
     434           1 :                 //            <unshared>, and <value>.
     435           1 :                 // <shared>   is the number of key bytes shared with the previous key.
     436           1 :                 // <unshared> is the number of unshared key bytes.
     437           1 :                 // <value>    is the number of value bytes.
     438           1 :                 fmt.Fprintf(w, "%05d    record (%d = %d [%d] + %d + %d)",
     439           1 :                         uint64(enc.Offset), enc.Length,
     440           1 :                         enc.Length-int32(enc.KeyUnshared+enc.ValueLen), enc.KeyShared, enc.KeyUnshared, enc.ValueLen)
     441           1 :                 if enc.IsRestart {
     442           1 :                         fmt.Fprint(w, " [restart]")
     443           1 :                 }
     444           1 :                 if fmtRecord != nil {
     445           1 :                         if r.tableFormat < TableFormatPebblev3 || key.Kind() != InternalKeyKindSet {
     446           1 :                                 fmt.Fprintf(w, "\n         %s", fmtRecord(key, value))
     447           1 :                         } else if !block.ValuePrefix(value[0]).IsValueHandle() {
     448           1 :                                 fmt.Fprintf(w, "\n         %s", fmtRecord(key, value[1:]))
     449           1 :                         } else {
     450           1 :                                 vh := valblk.DecodeHandle(value[1:])
     451           1 :                                 fmt.Fprintf(w, "\n         %s", fmtRecord(key, []byte(fmt.Sprintf("value handle %+v", vh))))
     452           1 :                         }
     453             :                 }
     454             :         })
     455           1 :         return nil
     456             : }
     457             : 
     458           1 : func decodeLayout(comparer *base.Comparer, data []byte) (Layout, error) {
     459           1 :         foot, err := parseFooter(data, 0, int64(len(data)))
     460           1 :         if err != nil {
     461           0 :                 return Layout{}, err
     462           0 :         }
     463           1 :         decompressedMeta, err := decompressInMemory(data, foot.metaindexBH)
     464           1 :         if err != nil {
     465           0 :                 return Layout{}, errors.Wrap(err, "decompressing metaindex")
     466           0 :         }
     467           1 :         meta, vbih, err := decodeMetaindex(decompressedMeta)
     468           1 :         if err != nil {
     469           0 :                 return Layout{}, err
     470           0 :         }
     471           1 :         layout := Layout{
     472           1 :                 MetaIndex:  foot.metaindexBH,
     473           1 :                 Properties: meta[metaPropertiesName],
     474           1 :                 RangeDel:   meta[metaRangeDelV2Name],
     475           1 :                 RangeKey:   meta[metaRangeKeyName],
     476           1 :                 ValueIndex: vbih.Handle,
     477           1 :                 Footer:     foot.footerBH,
     478           1 :                 Format:     foot.format,
     479           1 :         }
     480           1 :         var props Properties
     481           1 :         decompressedProps, err := decompressInMemory(data, layout.Properties)
     482           1 :         if err != nil {
     483           0 :                 return Layout{}, errors.Wrap(err, "decompressing properties")
     484           0 :         }
     485           1 :         if err := props.load(decompressedProps, map[string]struct{}{}); err != nil {
     486           0 :                 return Layout{}, err
     487           0 :         }
     488             : 
     489           1 :         if props.IndexType == twoLevelIndex {
     490           1 :                 decompressed, err := decompressInMemory(data, foot.indexBH)
     491           1 :                 if err != nil {
     492           0 :                         return Layout{}, errors.Wrap(err, "decompressing two-level index")
     493           0 :                 }
     494           1 :                 layout.TopIndex = foot.indexBH
     495           1 :                 topLevelIter, err := newIndexIter(foot.format, comparer, decompressed)
     496           1 :                 if err != nil {
     497           0 :                         return Layout{}, err
     498           0 :                 }
     499           1 :                 err = forEachIndexEntry(topLevelIter, func(bhp block.HandleWithProperties) {
     500           1 :                         layout.Index = append(layout.Index, bhp.Handle)
     501           1 :                 })
     502           1 :                 if err != nil {
     503           0 :                         return Layout{}, err
     504           0 :                 }
     505           0 :         } else {
     506           0 :                 layout.Index = append(layout.Index, foot.indexBH)
     507           0 :         }
     508           1 :         for _, indexBH := range layout.Index {
     509           1 :                 decompressed, err := decompressInMemory(data, indexBH)
     510           1 :                 if err != nil {
     511           0 :                         return Layout{}, errors.Wrap(err, "decompressing index block")
     512           0 :                 }
     513           1 :                 indexIter, err := newIndexIter(foot.format, comparer, decompressed)
     514           1 :                 if err != nil {
     515           0 :                         return Layout{}, err
     516           0 :                 }
     517           1 :                 err = forEachIndexEntry(indexIter, func(bhp block.HandleWithProperties) {
     518           1 :                         layout.Data = append(layout.Data, bhp)
     519           1 :                 })
     520           1 :                 if err != nil {
     521           0 :                         return Layout{}, err
     522           0 :                 }
     523             :         }
     524             : 
     525           1 :         if layout.ValueIndex.Length > 0 {
     526           0 :                 vbiBlock, err := decompressInMemory(data, layout.ValueIndex)
     527           0 :                 if err != nil {
     528           0 :                         return Layout{}, errors.Wrap(err, "decompressing value index")
     529           0 :                 }
     530           0 :                 layout.ValueBlock, err = valblk.DecodeIndex(vbiBlock, vbih)
     531           0 :                 if err != nil {
     532           0 :                         return Layout{}, err
     533           0 :                 }
     534             :         }
     535             : 
     536           1 :         return layout, nil
     537             : }
     538             : 
     539           1 : func decompressInMemory(data []byte, bh block.Handle) ([]byte, error) {
     540           1 :         typ := block.CompressionIndicator(data[bh.Offset+bh.Length])
     541           1 :         var decompressed []byte
     542           1 :         if typ == block.NoCompressionIndicator {
     543           1 :                 return data[bh.Offset : bh.Offset+bh.Length], nil
     544           1 :         }
     545             :         // Decode the length of the decompressed value.
     546           1 :         decodedLen, prefixLen, err := block.DecompressedLen(typ, data[bh.Offset:bh.Offset+bh.Length])
     547           1 :         if err != nil {
     548           0 :                 return nil, err
     549           0 :         }
     550           1 :         decompressed = make([]byte, decodedLen)
     551           1 :         if err := block.DecompressInto(typ, data[int(bh.Offset)+prefixLen:bh.Offset+bh.Length], decompressed); err != nil {
     552           0 :                 return nil, err
     553           0 :         }
     554           1 :         return decompressed, nil
     555             : }
     556             : 
     557             : func newIndexIter(
     558             :         tableFormat TableFormat, comparer *base.Comparer, data []byte,
     559           1 : ) (block.IndexBlockIterator, error) {
     560           1 :         var iter block.IndexBlockIterator
     561           1 :         var err error
     562           1 :         if tableFormat <= TableFormatPebblev4 {
     563           1 :                 iter = new(rowblk.IndexIter)
     564           1 :                 err = iter.Init(comparer.Compare, comparer.Split, data, block.NoTransforms)
     565           1 :         } else {
     566           1 :                 iter = new(colblk.IndexIter)
     567           1 :                 err = iter.Init(comparer.Compare, comparer.Split, data, block.NoTransforms)
     568           1 :         }
     569           1 :         if err != nil {
     570           0 :                 return nil, err
     571           0 :         }
     572           1 :         return iter, nil
     573             : }
     574             : 
     575             : func forEachIndexEntry(
     576             :         indexIter block.IndexBlockIterator, fn func(block.HandleWithProperties),
     577           1 : ) error {
     578           1 :         for v := indexIter.First(); v; v = indexIter.Next() {
     579           1 :                 bhp, err := indexIter.BlockHandleWithProperties()
     580           1 :                 if err != nil {
     581           0 :                         return err
     582           0 :                 }
     583           1 :                 fn(bhp)
     584             :         }
     585           1 :         return indexIter.Close()
     586             : }
     587             : 
     588             : func decodeMetaindex(
     589             :         data []byte,
     590           1 : ) (meta map[string]block.Handle, vbih valblk.IndexHandle, err error) {
     591           1 :         i, err := rowblk.NewRawIter(bytes.Compare, data)
     592           1 :         if err != nil {
     593           0 :                 return nil, valblk.IndexHandle{}, err
     594           0 :         }
     595           1 :         defer func() { err = firstError(err, i.Close()) }()
     596             : 
     597           1 :         meta = map[string]block.Handle{}
     598           1 :         for valid := i.First(); valid; valid = i.Next() {
     599           1 :                 value := i.Value()
     600           1 :                 if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
     601           1 :                         var n int
     602           1 :                         vbih, n, err = valblk.DecodeIndexHandle(i.Value())
     603           1 :                         if err != nil {
     604           0 :                                 return nil, vbih, err
     605           0 :                         }
     606           1 :                         if n == 0 || n != len(value) {
     607           0 :                                 return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
     608           0 :                         }
     609           1 :                 } else {
     610           1 :                         bh, n := block.DecodeHandle(value)
     611           1 :                         if n == 0 || n != len(value) {
     612           0 :                                 return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
     613           0 :                         }
     614           1 :                         meta[string(i.Key().UserKey)] = bh
     615             :                 }
     616             :         }
     617           1 :         return meta, vbih, nil
     618             : }
     619             : 
     620             : // layoutWriter writes the structure of an sstable to durable storage. It
     621             : // accepts serialized blocks, writes them to storage and returns a block handle
     622             : // describing the offset and length of the block.
     623             : type layoutWriter struct {
     624             :         writable objstorage.Writable
     625             : 
     626             :         // cacheOpts are used to remove blocks written to the sstable from the cache,
     627             :         // providing a defense in depth against bugs which cause cache collisions.
     628             :         cacheOpts sstableinternal.CacheOptions
     629             : 
     630             :         // options copied from WriterOptions
     631             :         tableFormat  TableFormat
     632             :         compression  block.Compression
     633             :         checksumType block.ChecksumType
     634             : 
     635             :         // offset tracks the current write offset within the writable.
     636             :         offset uint64
     637             :         // lastIndexBlockHandle holds the handle to the most recently-written index
     638             :         // block.  It's updated by writeIndexBlock. When writing sstables with a
     639             :         // single-level index, this field will be updated once. When writing
     640             :         // sstables with a two-level index, the last update will set the two-level
     641             :         // index.
     642             :         lastIndexBlockHandle block.Handle
     643             :         handles              []metaIndexHandle
     644             :         handlesBuf           bytealloc.A
     645             :         tmp                  [blockHandleLikelyMaxLen]byte
     646             :         buf                  blockBuf
     647             : }
     648             : 
     649           1 : func makeLayoutWriter(w objstorage.Writable, opts WriterOptions) layoutWriter {
     650           1 :         return layoutWriter{
     651           1 :                 writable:     w,
     652           1 :                 cacheOpts:    opts.internal.CacheOpts,
     653           1 :                 tableFormat:  opts.TableFormat,
     654           1 :                 compression:  opts.Compression,
     655           1 :                 checksumType: opts.Checksum,
     656           1 :                 buf: blockBuf{
     657           1 :                         checksummer: block.Checksummer{Type: opts.Checksum},
     658           1 :                 },
     659           1 :         }
     660           1 : }
     661             : 
     662             : type metaIndexHandle struct {
     663             :         key                string
     664             :         encodedBlockHandle []byte
     665             : }
     666             : 
     667             : // Abort aborts writing the table, aborting the underlying writable too. Abort
     668             : // is idempotent.
     669           1 : func (w *layoutWriter) Abort() {
     670           1 :         if w.writable != nil {
     671           1 :                 w.writable.Abort()
     672           1 :                 w.writable = nil
     673           1 :         }
     674             : }
     675             : 
     676             : // WriteDataBlock constructs a trailer for the provided data block and writes
     677             : // the block and trailer to the writer. It returns the block's handle.
     678           1 : func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
     679           1 :         return w.writeBlock(b, w.compression, buf)
     680           1 : }
     681             : 
     682             : // WritePrecompressedDataBlock writes a pre-compressed data block and its
     683             : // pre-computed trailer to the writer, returning it's block handle.
     684           1 : func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (block.Handle, error) {
     685           1 :         return w.writePrecompressedBlock(blk)
     686           1 : }
     687             : 
     688             : // WriteIndexBlock constructs a trailer for the provided index (first or
     689             : // second-level) and writes the block and trailer to the writer. It remembers
     690             : // the last-written index block's handle and adds it to the file's meta index
     691             : // when the writer is finished.
     692           1 : func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
     693           1 :         h, err := w.writeBlock(b, w.compression, &w.buf)
     694           1 :         if err == nil {
     695           1 :                 w.lastIndexBlockHandle = h
     696           1 :         }
     697           1 :         return h, err
     698             : }
     699             : 
     700             : // WriteFilterBlock finishes the provided filter, constructs a trailer and
     701             : // writes the block and trailer to the writer. It automatically adds the filter
     702             : // block to the file's meta index when the writer is finished.
     703           1 : func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err error) {
     704           1 :         b, err := f.finish()
     705           1 :         if err != nil {
     706           0 :                 return block.Handle{}, err
     707           0 :         }
     708           1 :         return w.writeNamedBlock(b, f.metaName())
     709             : }
     710             : 
     711             : // WritePropertiesBlock constructs a trailer for the provided properties block
     712             : // and writes the block and trailer to the writer. It automatically adds the
     713             : // properties block to the file's meta index when the writer is finished.
     714           1 : func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
     715           1 :         return w.writeNamedBlock(b, metaPropertiesName)
     716           1 : }
     717             : 
     718             : // WriteRangeKeyBlock constructs a trailer for the provided range key block and
     719             : // writes the block and trailer to the writer. It automatically adds the range
     720             : // key block to the file's meta index when the writer is finished.
     721           1 : func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
     722           1 :         return w.writeNamedBlock(b, metaRangeKeyName)
     723           1 : }
     724             : 
     725             : // WriteRangeDeletionBlock constructs a trailer for the provided range deletion
     726             : // block and writes the block and trailer to the writer. It automatically adds
     727             : // the range deletion block to the file's meta index when the writer is
     728             : // finished.
     729           1 : func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
     730           1 :         return w.writeNamedBlock(b, metaRangeDelV2Name)
     731           1 : }
     732             : 
     733           1 : func (w *layoutWriter) writeNamedBlock(b []byte, name string) (bh block.Handle, err error) {
     734           1 :         bh, err = w.writeBlock(b, block.NoCompression, &w.buf)
     735           1 :         if err == nil {
     736           1 :                 w.recordToMetaindex(name, bh)
     737           1 :         }
     738           1 :         return bh, err
     739             : }
     740             : 
     741             : // WriteValueBlock writes a pre-finished value block (with the trailer) to the
     742             : // writer.
     743           1 : func (w *layoutWriter) WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error) {
     744           1 :         return w.writePrecompressedBlock(blk)
     745           1 : }
     746             : 
     747             : func (w *layoutWriter) WriteValueIndexBlock(
     748             :         blk []byte, vbih valblk.IndexHandle,
     749           1 : ) (block.Handle, error) {
     750           1 :         // NB: value index blocks are already finished and contain the block
     751           1 :         // trailer.
     752           1 :         // TODO(jackson): can this be refactored to make value blocks less
     753           1 :         // of a snowflake?
     754           1 :         off := w.offset
     755           1 :         w.clearFromCache(off)
     756           1 :         // Write the bytes to the file.
     757           1 :         if err := w.writable.Write(blk); err != nil {
     758           0 :                 return block.Handle{}, err
     759           0 :         }
     760           1 :         l := uint64(len(blk))
     761           1 :         w.offset += l
     762           1 : 
     763           1 :         n := valblk.EncodeIndexHandle(w.tmp[:], vbih)
     764           1 :         w.recordToMetaindexRaw(metaValueIndexName, w.tmp[:n])
     765           1 : 
     766           1 :         return block.Handle{Offset: off, Length: l}, nil
     767             : }
     768             : 
     769             : func (w *layoutWriter) writeBlock(
     770             :         b []byte, compression block.Compression, buf *blockBuf,
     771           1 : ) (block.Handle, error) {
     772           1 :         return w.writePrecompressedBlock(block.CompressAndChecksum(
     773           1 :                 &buf.compressedBuf, b, compression, &buf.checksummer))
     774           1 : }
     775             : 
     776             : // writePrecompressedBlock writes a pre-compressed block and its
     777             : // pre-computed trailer to the writer, returning it's block handle.
     778           1 : func (w *layoutWriter) writePrecompressedBlock(blk block.PhysicalBlock) (block.Handle, error) {
     779           1 :         w.clearFromCache(w.offset)
     780           1 :         // Write the bytes to the file.
     781           1 :         n, err := blk.WriteTo(w.writable)
     782           1 :         if err != nil {
     783           0 :                 return block.Handle{}, err
     784           0 :         }
     785           1 :         bh := block.Handle{Offset: w.offset, Length: uint64(blk.LengthWithoutTrailer())}
     786           1 :         w.offset += uint64(n)
     787           1 :         return bh, nil
     788             : }
     789             : 
     790             : // Write implements io.Writer. This is analogous to writePrecompressedBlock for
     791             : // blocks that already incorporate the trailer, and don't need the callee to
     792             : // return a BlockHandle.
     793           0 : func (w *layoutWriter) Write(blockWithTrailer []byte) (n int, err error) {
     794           0 :         offset := w.offset
     795           0 :         w.clearFromCache(offset)
     796           0 :         w.offset += uint64(len(blockWithTrailer))
     797           0 :         if err := w.writable.Write(blockWithTrailer); err != nil {
     798           0 :                 return 0, err
     799           0 :         }
     800           0 :         return len(blockWithTrailer), nil
     801             : }
     802             : 
     803             : // clearFromCache removes the block at the provided offset from the cache. This provides defense in
     804             : // depth against bugs which cause cache collisions.
     805           1 : func (w *layoutWriter) clearFromCache(offset uint64) {
     806           1 :         if w.cacheOpts.Cache != nil {
     807           1 :                 // TODO(peter): Alternatively, we could add the uncompressed value to the
     808           1 :                 // cache.
     809           1 :                 w.cacheOpts.Cache.Delete(w.cacheOpts.CacheID, w.cacheOpts.FileNum, offset)
     810           1 :         }
     811             : }
     812             : 
     813           1 : func (w *layoutWriter) recordToMetaindex(key string, h block.Handle) {
     814           1 :         n := h.EncodeVarints(w.tmp[:])
     815           1 :         w.recordToMetaindexRaw(key, w.tmp[:n])
     816           1 : }
     817             : 
     818           1 : func (w *layoutWriter) recordToMetaindexRaw(key string, h []byte) {
     819           1 :         var encodedHandle []byte
     820           1 :         w.handlesBuf, encodedHandle = w.handlesBuf.Alloc(len(h))
     821           1 :         copy(encodedHandle, h)
     822           1 :         w.handles = append(w.handles, metaIndexHandle{key: key, encodedBlockHandle: encodedHandle})
     823           1 : }
     824             : 
     825           1 : func (w *layoutWriter) IsFinished() bool { return w.writable == nil }
     826             : 
     827             : // Finish serializes the sstable, writing out the meta index block and sstable
     828             : // footer and closing the file. It returns the total size of the resulting
     829             : // ssatable.
     830           1 : func (w *layoutWriter) Finish() (size uint64, err error) {
     831           1 :         // Sort the meta index handles by key and write the meta index block.
     832           1 :         slices.SortFunc(w.handles, func(a, b metaIndexHandle) int {
     833           1 :                 return cmp.Compare(a.key, b.key)
     834           1 :         })
     835           1 :         bw := rowblk.Writer{RestartInterval: 1}
     836           1 :         for _, h := range w.handles {
     837           1 :                 bw.AddRaw(unsafe.Slice(unsafe.StringData(h.key), len(h.key)), h.encodedBlockHandle)
     838           1 :         }
     839           1 :         metaIndexHandle, err := w.writeBlock(bw.Finish(), block.NoCompression, &w.buf)
     840           1 :         if err != nil {
     841           0 :                 return 0, err
     842           0 :         }
     843             : 
     844             :         // Write the table footer.
     845           1 :         footer := footer{
     846           1 :                 format:      w.tableFormat,
     847           1 :                 checksum:    w.checksumType,
     848           1 :                 metaindexBH: metaIndexHandle,
     849           1 :                 indexBH:     w.lastIndexBlockHandle,
     850           1 :         }
     851           1 :         encodedFooter := footer.encode(w.tmp[:])
     852           1 :         if err := w.writable.Write(encodedFooter); err != nil {
     853           0 :                 return 0, err
     854           0 :         }
     855           1 :         w.offset += uint64(len(encodedFooter))
     856           1 : 
     857           1 :         err = w.writable.Finish()
     858           1 :         w.writable = nil
     859           1 :         return w.offset, err
     860             : }

Generated by: LCOV version 1.14