LCOV - code coverage report
Current view: top level - pebble/sstable - layout.go (source / functions) Hit Total Coverage
Test: 2024-11-08 08:16Z 2da617a0 - tests + meta.lcov Lines: 477 586 81.4 %
Date: 2024-11-08 08:17:47 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "slices"
      15             :         "unsafe"
      16             : 
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/binfmt"
      20             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      21             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      22             :         "github.com/cockroachdb/pebble/internal/treeprinter"
      23             :         "github.com/cockroachdb/pebble/objstorage"
      24             :         "github.com/cockroachdb/pebble/sstable/block"
      25             :         "github.com/cockroachdb/pebble/sstable/colblk"
      26             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      27             : )
      28             : 
      29             : // Layout describes the block organization of an sstable.
      30             : type Layout struct {
      31             :         // NOTE: changes to fields in this struct should also be reflected in
      32             :         // ValidateBlockChecksums, which validates a static list of BlockHandles
      33             :         // referenced in this struct.
      34             : 
      35             :         Data       []block.HandleWithProperties
      36             :         Index      []block.Handle
      37             :         TopIndex   block.Handle
      38             :         Filter     []NamedBlockHandle
      39             :         RangeDel   block.Handle
      40             :         RangeKey   block.Handle
      41             :         ValueBlock []block.Handle
      42             :         ValueIndex block.Handle
      43             :         Properties block.Handle
      44             :         MetaIndex  block.Handle
      45             :         Footer     block.Handle
      46             :         Format     TableFormat
      47             : }
      48             : 
      49             : // NamedBlockHandle holds a block.Handle and corresponding name.
      50             : type NamedBlockHandle struct {
      51             :         block.Handle
      52             :         Name string
      53             : }
      54             : 
      55             : // FilterByName retrieves the block handle of the named filter, if it exists.
      56             : // The provided the name should be the name as it appears in the metaindex
      57             : // block.
      58           1 : func (l *Layout) FilterByName(name string) (block.Handle, bool) {
      59           1 :         for i := range l.Filter {
      60           1 :                 if l.Filter[i].Name == name {
      61           1 :                         return l.Filter[i].Handle, true
      62           1 :                 }
      63             :         }
      64           1 :         return block.Handle{}, false
      65             : }
      66             : 
      67           1 : func (l *Layout) orderedBlocks() []NamedBlockHandle {
      68           1 :         var blocks []NamedBlockHandle
      69           1 :         for i := range l.Data {
      70           1 :                 blocks = append(blocks, NamedBlockHandle{l.Data[i].Handle, "data"})
      71           1 :         }
      72           1 :         for i := range l.Index {
      73           1 :                 blocks = append(blocks, NamedBlockHandle{l.Index[i], "index"})
      74           1 :         }
      75           1 :         if l.TopIndex.Length != 0 {
      76           1 :                 blocks = append(blocks, NamedBlockHandle{l.TopIndex, "top-index"})
      77           1 :         }
      78           1 :         blocks = append(blocks, l.Filter...)
      79           1 :         if l.RangeDel.Length != 0 {
      80           1 :                 blocks = append(blocks, NamedBlockHandle{l.RangeDel, "range-del"})
      81           1 :         }
      82           1 :         if l.RangeKey.Length != 0 {
      83           1 :                 blocks = append(blocks, NamedBlockHandle{l.RangeKey, "range-key"})
      84           1 :         }
      85           1 :         for i := range l.ValueBlock {
      86           1 :                 blocks = append(blocks, NamedBlockHandle{l.ValueBlock[i], "value-block"})
      87           1 :         }
      88           1 :         if l.ValueIndex.Length != 0 {
      89           1 :                 blocks = append(blocks, NamedBlockHandle{l.ValueIndex, "value-index"})
      90           1 :         }
      91           1 :         if l.Properties.Length != 0 {
      92           1 :                 blocks = append(blocks, NamedBlockHandle{l.Properties, "properties"})
      93           1 :         }
      94           1 :         if l.MetaIndex.Length != 0 {
      95           1 :                 blocks = append(blocks, NamedBlockHandle{l.MetaIndex, "meta-index"})
      96           1 :         }
      97           1 :         if l.Footer.Length != 0 {
      98           1 :                 if l.Footer.Length == levelDBFooterLen {
      99           1 :                         blocks = append(blocks, NamedBlockHandle{l.Footer, "leveldb-footer"})
     100           1 :                 } else {
     101           1 :                         blocks = append(blocks, NamedBlockHandle{l.Footer, "footer"})
     102           1 :                 }
     103             :         }
     104           1 :         slices.SortFunc(blocks, func(a, b NamedBlockHandle) int {
     105           1 :                 return cmp.Compare(a.Offset, b.Offset)
     106           1 :         })
     107           1 :         return blocks
     108             : }
     109             : 
     110             : // Describe returns a description of the layout. If the verbose parameter is
     111             : // true, details of the structure of each block are returned as well.
     112             : // If verbose is true and fmtKV is non-nil, the output includes the KVs (as formatted by this function).
     113             : func (l *Layout) Describe(
     114             :         verbose bool, r *Reader, fmtKV func(key *base.InternalKey, value []byte) string,
     115           1 : ) string {
     116           1 :         ctx := context.TODO()
     117           1 : 
     118           1 :         blocks := l.orderedBlocks()
     119           1 :         formatting := rowblkFormatting
     120           1 :         if l.Format.BlockColumnar() {
     121           1 :                 formatting = colblkFormatting
     122           1 :         }
     123             : 
     124           1 :         tp := treeprinter.New()
     125           1 :         root := tp.Child("sstable")
     126           1 : 
     127           1 :         for i := range blocks {
     128           1 :                 b := &blocks[i]
     129           1 :                 tpNode := root.Childf("%s  offset: %d  length: %d", b.Name, b.Offset, b.Length)
     130           1 : 
     131           1 :                 if !verbose {
     132           1 :                         continue
     133             :                 }
     134           1 :                 if b.Name == "filter" {
     135           0 :                         continue
     136             :                 }
     137             : 
     138           1 :                 if b.Name == "footer" || b.Name == "leveldb-footer" {
     139           1 :                         trailer, offset := make([]byte, b.Length), 0
     140           1 :                         _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset))
     141           1 : 
     142           1 :                         if b.Name == "footer" {
     143           1 :                                 checksumType := block.ChecksumType(trailer[0])
     144           1 :                                 tpNode.Childf("%03d  checksum type: %s", offset, checksumType)
     145           1 :                                 trailer, offset = trailer[1:], offset+1
     146           1 :                         }
     147             : 
     148           1 :                         metaHandle, n := binary.Uvarint(trailer)
     149           1 :                         metaLen, m := binary.Uvarint(trailer[n:])
     150           1 :                         tpNode.Childf("%03d  meta: offset=%d, length=%d", offset, metaHandle, metaLen)
     151           1 :                         trailer, offset = trailer[n+m:], offset+n+m
     152           1 : 
     153           1 :                         indexHandle, n := binary.Uvarint(trailer)
     154           1 :                         indexLen, m := binary.Uvarint(trailer[n:])
     155           1 :                         tpNode.Childf("%03d  index: offset=%d, length=%d", offset, indexHandle, indexLen)
     156           1 :                         trailer, offset = trailer[n+m:], offset+n+m
     157           1 : 
     158           1 :                         trailing := 12
     159           1 :                         if b.Name == "leveldb-footer" {
     160           0 :                                 trailing = 8
     161           0 :                         }
     162             : 
     163           1 :                         offset += len(trailer) - trailing
     164           1 :                         trailer = trailer[len(trailer)-trailing:]
     165           1 : 
     166           1 :                         if b.Name == "footer" {
     167           1 :                                 version := trailer[:4]
     168           1 :                                 tpNode.Childf("%03d  version: %d", offset, binary.LittleEndian.Uint32(version))
     169           1 :                                 trailer, offset = trailer[4:], offset+4
     170           1 :                         }
     171             : 
     172           1 :                         magicNumber := trailer
     173           1 :                         tpNode.Childf("%03d  magic number: 0x%x", offset, magicNumber)
     174           1 : 
     175           1 :                         continue
     176             :                 }
     177             : 
     178             :                 // Read the block and format it. Returns an error if we couldn't read the
     179             :                 // block.
     180           1 :                 err := func() error {
     181           1 :                         var err error
     182           1 :                         var h block.BufferHandle
     183           1 :                         // Defer release of any block handle that will have been read.
     184           1 :                         defer func() { h.Release() }()
     185             : 
     186           1 :                         switch b.Name {
     187           1 :                         case "data":
     188           1 :                                 h, err = r.readDataBlock(ctx, noEnv, noReadHandle, b.Handle)
     189           1 :                                 if err != nil {
     190           0 :                                         return err
     191           0 :                                 }
     192           1 :                                 if fmtKV == nil {
     193           1 :                                         formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), nil)
     194           1 :                                 } else {
     195           1 :                                         var lastKey InternalKey
     196           1 :                                         formatting.formatDataBlock(tpNode, r, *b, h.BlockData(), func(key *base.InternalKey, value []byte) string {
     197           1 :                                                 v := fmtKV(key, value)
     198           1 :                                                 if base.InternalCompare(r.Compare, lastKey, *key) >= 0 {
     199           1 :                                                         v += " WARNING: OUT OF ORDER KEYS!"
     200           1 :                                                 }
     201           1 :                                                 lastKey.Trailer = key.Trailer
     202           1 :                                                 lastKey.UserKey = append(lastKey.UserKey[:0], key.UserKey...)
     203           1 :                                                 return v
     204             :                                         })
     205             :                                 }
     206             : 
     207           1 :                         case "range-del":
     208           1 :                                 h, err = r.readRangeDelBlock(ctx, noEnv, noReadHandle, b.Handle)
     209           1 :                                 if err != nil {
     210           0 :                                         return err
     211           0 :                                 }
     212             :                                 // TODO(jackson): colblk ignores fmtKV, because it doesn't
     213             :                                 // make sense in the context.
     214           1 :                                 formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
     215             : 
     216           1 :                         case "range-key":
     217           1 :                                 h, err = r.readRangeKeyBlock(ctx, noEnv, noReadHandle, b.Handle)
     218           1 :                                 if err != nil {
     219           0 :                                         return err
     220           0 :                                 }
     221             :                                 // TODO(jackson): colblk ignores fmtKV, because it doesn't
     222             :                                 // make sense in the context.
     223           1 :                                 formatting.formatKeyspanBlock(tpNode, r, *b, h.BlockData(), fmtKV)
     224             : 
     225           1 :                         case "index", "top-index":
     226           1 :                                 h, err = r.readIndexBlock(ctx, noEnv, noReadHandle, b.Handle)
     227           1 :                                 if err != nil {
     228           0 :                                         return err
     229           0 :                                 }
     230           1 :                                 formatting.formatIndexBlock(tpNode, r, *b, h.BlockData())
     231             : 
     232           1 :                         case "properties":
     233           1 :                                 h, err = r.readBlockInternal(ctx, noEnv, noReadHandle, b.Handle, noInitBlockMetadataFn)
     234           1 :                                 if err != nil {
     235           0 :                                         return err
     236           0 :                                 }
     237           1 :                                 iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
     238           1 :                                 iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
     239           1 :                                         fmt.Fprintf(w, "%05d    %s (%d)", enc.Offset, key.UserKey, enc.Length)
     240           1 :                                 })
     241             : 
     242           1 :                         case "meta-index":
     243           1 :                                 if b.Handle != r.metaindexBH {
     244           0 :                                         return base.AssertionFailedf("range-del block handle does not match rangeDelBH")
     245           0 :                                 }
     246           1 :                                 h, err = r.readMetaindexBlock(ctx, noEnv, noReadHandle)
     247           1 :                                 if err != nil {
     248           0 :                                         return err
     249           0 :                                 }
     250           1 :                                 iter, _ := rowblk.NewRawIter(r.Compare, h.BlockData())
     251           1 :                                 iter.Describe(tpNode, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
     252           1 :                                         var bh block.Handle
     253           1 :                                         var n int
     254           1 :                                         var vbih valueBlocksIndexHandle
     255           1 :                                         isValueBlocksIndexHandle := false
     256           1 :                                         if bytes.Equal(iter.Key().UserKey, []byte(metaValueIndexName)) {
     257           1 :                                                 vbih, n, err = decodeValueBlocksIndexHandle(value)
     258           1 :                                                 bh = vbih.h
     259           1 :                                                 isValueBlocksIndexHandle = true
     260           1 :                                         } else {
     261           1 :                                                 bh, n = block.DecodeHandle(value)
     262           1 :                                         }
     263           1 :                                         if n == 0 || n != len(value) {
     264           0 :                                                 fmt.Fprintf(w, "%04d    [err: %s]\n", enc.Offset, err)
     265           0 :                                                 return
     266           0 :                                         }
     267           1 :                                         var vbihStr string
     268           1 :                                         if isValueBlocksIndexHandle {
     269           1 :                                                 vbihStr = fmt.Sprintf(" value-blocks-index-lengths: %d(num), %d(offset), %d(length)",
     270           1 :                                                         vbih.blockNumByteLength, vbih.blockOffsetByteLength, vbih.blockLengthByteLength)
     271           1 :                                         }
     272           1 :                                         fmt.Fprintf(w, "%04d    %s block:%d/%d%s",
     273           1 :                                                 uint64(enc.Offset), iter.Key().UserKey, bh.Offset, bh.Length, vbihStr)
     274             :                                 })
     275             : 
     276           1 :                         case "value-block":
     277             :                                 // We don't peer into the value-block since it can't be interpreted
     278             :                                 // without the valueHandles.
     279           1 :                         case "value-index":
     280             :                                 // We have already read the value-index to construct the list of
     281             :                                 // value-blocks, so no need to do it again.
     282             :                         }
     283             : 
     284             :                         // Format the trailer.
     285           1 :                         trailer := make([]byte, block.TrailerLen)
     286           1 :                         _ = r.readable.ReadAt(ctx, trailer, int64(b.Offset+b.Length))
     287           1 :                         algo := block.CompressionIndicator(trailer[0])
     288           1 :                         checksum := binary.LittleEndian.Uint32(trailer[1:])
     289           1 :                         tpNode.Childf("trailer [compression=%s checksum=0x%04x]", algo, checksum)
     290           1 :                         return nil
     291             :                 }()
     292           1 :                 if err != nil {
     293           0 :                         tpNode.Childf("error reading block: %v", err)
     294           0 :                 }
     295             :         }
     296           1 :         return tp.String()
     297             : }
     298             : 
     299             : type blockFormatting struct {
     300             :         formatIndexBlock   formatBlockFunc
     301             :         formatDataBlock    formatBlockFuncKV
     302             :         formatKeyspanBlock formatBlockFuncKV
     303             : }
     304             : 
     305             : type (
     306             :         formatBlockFunc   func(treeprinter.Node, *Reader, NamedBlockHandle, []byte) error
     307             :         formatBlockFuncKV func(treeprinter.Node, *Reader, NamedBlockHandle, []byte, func(*base.InternalKey, []byte) string) error
     308             : )
     309             : 
     310             : var (
     311             :         rowblkFormatting = blockFormatting{
     312             :                 formatIndexBlock:   formatRowblkIndexBlock,
     313             :                 formatDataBlock:    formatRowblkDataBlock,
     314             :                 formatKeyspanBlock: formatRowblkDataBlock,
     315             :         }
     316             :         colblkFormatting = blockFormatting{
     317             :                 formatIndexBlock:   formatColblkIndexBlock,
     318             :                 formatDataBlock:    formatColblkDataBlock,
     319             :                 formatKeyspanBlock: formatColblkKeyspanBlock,
     320             :         }
     321             : )
     322             : 
     323           1 : func formatColblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
     324           1 :         var iter colblk.IndexIter
     325           1 :         if err := iter.Init(r.Compare, r.Split, data, NoTransforms); err != nil {
     326           0 :                 return err
     327           0 :         }
     328           1 :         defer iter.Close()
     329           1 :         i := 0
     330           1 :         for v := iter.First(); v; v = iter.Next() {
     331           1 :                 bh, err := iter.BlockHandleWithProperties()
     332           1 :                 if err != nil {
     333           0 :                         return err
     334           0 :                 }
     335           1 :                 tp.Childf("%05d    block:%d/%d\n", i, bh.Offset, bh.Length)
     336           1 :                 i++
     337             :         }
     338           1 :         return nil
     339             : }
     340             : 
     341             : func formatColblkDataBlock(
     342             :         tp treeprinter.Node,
     343             :         r *Reader,
     344             :         b NamedBlockHandle,
     345             :         data []byte,
     346             :         fmtKV func(key *base.InternalKey, value []byte) string,
     347           1 : ) error {
     348           1 :         var decoder colblk.DataBlockDecoder
     349           1 :         decoder.Init(r.keySchema, data)
     350           1 :         f := binfmt.New(data)
     351           1 :         decoder.Describe(f, tp)
     352           1 : 
     353           1 :         if fmtKV != nil {
     354           1 :                 var iter colblk.DataBlockIter
     355           1 :                 iter.InitOnce(r.keySchema, r.Compare, r.Split, describingLazyValueHandler{})
     356           1 :                 if err := iter.Init(&decoder, block.IterTransforms{}); err != nil {
     357           0 :                         return err
     358           0 :                 }
     359           1 :                 defer iter.Close()
     360           1 :                 for kv := iter.First(); kv != nil; kv = iter.Next() {
     361           1 :                         tp.Child(fmtKV(&kv.K, kv.V.ValueOrHandle))
     362           1 :                 }
     363             :         }
     364           1 :         return nil
     365             : }
     366             : 
     367             : // describingLazyValueHandler is a block.GetLazyValueForPrefixAndValueHandler
     368             : // that replaces a value handle with an in-place value describing the handle.
     369             : type describingLazyValueHandler struct{}
     370             : 
     371             : // Assert that debugLazyValueHandler implements the
     372             : // block.GetLazyValueForPrefixAndValueHandler interface.
     373             : var _ block.GetLazyValueForPrefixAndValueHandler = describingLazyValueHandler{}
     374             : 
     375             : func (describingLazyValueHandler) GetLazyValueForPrefixAndValueHandle(
     376             :         handle []byte,
     377           1 : ) base.LazyValue {
     378           1 :         vh := decodeValueHandle(handle[1:])
     379           1 :         return base.LazyValue{ValueOrHandle: []byte(fmt.Sprintf("value handle %+v", vh))}
     380           1 : }
     381             : 
     382             : func formatColblkKeyspanBlock(
     383             :         tp treeprinter.Node,
     384             :         r *Reader,
     385             :         b NamedBlockHandle,
     386             :         data []byte,
     387             :         _ func(*base.InternalKey, []byte) string,
     388           1 : ) error {
     389           1 :         var decoder colblk.KeyspanDecoder
     390           1 :         decoder.Init(data)
     391           1 :         f := binfmt.New(data)
     392           1 :         decoder.Describe(f, tp)
     393           1 :         return nil
     394           1 : }
     395             : 
     396           1 : func formatRowblkIndexBlock(tp treeprinter.Node, r *Reader, b NamedBlockHandle, data []byte) error {
     397           1 :         iter, err := rowblk.NewIter(r.Compare, r.Split, data, NoTransforms)
     398           1 :         if err != nil {
     399           0 :                 return err
     400           0 :         }
     401           1 :         iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
     402           1 :                 bh, err := block.DecodeHandleWithProperties(value)
     403           1 :                 if err != nil {
     404           0 :                         fmt.Fprintf(w, "%05d    [err: %s]\n", enc.Offset, err)
     405           0 :                         return
     406           0 :                 }
     407           1 :                 fmt.Fprintf(w, "%05d    block:%d/%d", enc.Offset, bh.Offset, bh.Length)
     408           1 :                 if enc.IsRestart {
     409           1 :                         fmt.Fprintf(w, " [restart]")
     410           1 :                 }
     411             :         })
     412           1 :         return nil
     413             : }
     414             : 
     415             : func formatRowblkDataBlock(
     416             :         tp treeprinter.Node,
     417             :         r *Reader,
     418             :         b NamedBlockHandle,
     419             :         data []byte,
     420             :         fmtRecord func(key *base.InternalKey, value []byte) string,
     421           1 : ) error {
     422           1 :         iter, err := rowblk.NewIter(r.Compare, r.Split, data, NoTransforms)
     423           1 :         if err != nil {
     424           0 :                 return err
     425           0 :         }
     426           1 :         iter.Describe(tp, func(w io.Writer, key *base.InternalKey, value []byte, enc rowblk.KVEncoding) {
     427           1 :                 // The format of the numbers in the record line is:
     428           1 :                 //
     429           1 :                 //   (<total> = <length> [<shared>] + <unshared> + <value>)
     430           1 :                 //
     431           1 :                 // <total>    is the total number of bytes for the record.
     432           1 :                 // <length>   is the size of the 3 varint encoded integers for <shared>,
     433           1 :                 //            <unshared>, and <value>.
     434           1 :                 // <shared>   is the number of key bytes shared with the previous key.
     435           1 :                 // <unshared> is the number of unshared key bytes.
     436           1 :                 // <value>    is the number of value bytes.
     437           1 :                 fmt.Fprintf(w, "%05d    record (%d = %d [%d] + %d + %d)",
     438           1 :                         uint64(enc.Offset), enc.Length,
     439           1 :                         enc.Length-int32(enc.KeyUnshared+enc.ValueLen), enc.KeyShared, enc.KeyUnshared, enc.ValueLen)
     440           1 :                 if enc.IsRestart {
     441           1 :                         fmt.Fprint(w, " [restart]")
     442           1 :                 }
     443           1 :                 if fmtRecord != nil {
     444           1 :                         if r.tableFormat < TableFormatPebblev3 || key.Kind() != InternalKeyKindSet {
     445           1 :                                 fmt.Fprintf(w, "\n         %s", fmtRecord(key, value))
     446           1 :                         } else if !block.ValuePrefix(value[0]).IsValueHandle() {
     447           1 :                                 fmt.Fprintf(w, "\n         %s", fmtRecord(key, value[1:]))
     448           1 :                         } else {
     449           1 :                                 vh := decodeValueHandle(value[1:])
     450           1 :                                 fmt.Fprintf(w, "\n         %s", fmtRecord(key, []byte(fmt.Sprintf("value handle %+v", vh))))
     451           1 :                         }
     452             :                 }
     453             :         })
     454           1 :         return nil
     455             : }
     456             : 
     457           1 : func decodeLayout(comparer *base.Comparer, data []byte) (Layout, error) {
     458           1 :         foot, err := parseFooter(data, 0, int64(len(data)))
     459           1 :         if err != nil {
     460           0 :                 return Layout{}, err
     461           0 :         }
     462           1 :         decompressedMeta, err := decompressInMemory(data, foot.metaindexBH)
     463           1 :         if err != nil {
     464           0 :                 return Layout{}, errors.Wrap(err, "decompressing metaindex")
     465           0 :         }
     466           1 :         meta, vbih, err := decodeMetaindex(decompressedMeta)
     467           1 :         if err != nil {
     468           0 :                 return Layout{}, err
     469           0 :         }
     470           1 :         layout := Layout{
     471           1 :                 MetaIndex:  foot.metaindexBH,
     472           1 :                 Properties: meta[metaPropertiesName],
     473           1 :                 RangeDel:   meta[metaRangeDelV2Name],
     474           1 :                 RangeKey:   meta[metaRangeKeyName],
     475           1 :                 ValueIndex: vbih.h,
     476           1 :                 Footer:     foot.footerBH,
     477           1 :                 Format:     foot.format,
     478           1 :         }
     479           1 :         var props Properties
     480           1 :         decompressedProps, err := decompressInMemory(data, layout.Properties)
     481           1 :         if err != nil {
     482           0 :                 return Layout{}, errors.Wrap(err, "decompressing properties")
     483           0 :         }
     484           1 :         if err := props.load(decompressedProps, map[string]struct{}{}); err != nil {
     485           0 :                 return Layout{}, err
     486           0 :         }
     487             : 
     488           1 :         if props.IndexType == twoLevelIndex {
     489           1 :                 decompressed, err := decompressInMemory(data, foot.indexBH)
     490           1 :                 if err != nil {
     491           0 :                         return Layout{}, errors.Wrap(err, "decompressing two-level index")
     492           0 :                 }
     493           1 :                 layout.TopIndex = foot.indexBH
     494           1 :                 topLevelIter, err := newIndexIter(foot.format, comparer, decompressed)
     495           1 :                 if err != nil {
     496           0 :                         return Layout{}, err
     497           0 :                 }
     498           1 :                 err = forEachIndexEntry(topLevelIter, func(bhp block.HandleWithProperties) {
     499           1 :                         layout.Index = append(layout.Index, bhp.Handle)
     500           1 :                 })
     501           1 :                 if err != nil {
     502           0 :                         return Layout{}, err
     503           0 :                 }
     504           0 :         } else {
     505           0 :                 layout.Index = append(layout.Index, foot.indexBH)
     506           0 :         }
     507           1 :         for _, indexBH := range layout.Index {
     508           1 :                 decompressed, err := decompressInMemory(data, indexBH)
     509           1 :                 if err != nil {
     510           0 :                         return Layout{}, errors.Wrap(err, "decompressing index block")
     511           0 :                 }
     512           1 :                 indexIter, err := newIndexIter(foot.format, comparer, decompressed)
     513           1 :                 if err != nil {
     514           0 :                         return Layout{}, err
     515           0 :                 }
     516           1 :                 err = forEachIndexEntry(indexIter, func(bhp block.HandleWithProperties) {
     517           1 :                         layout.Data = append(layout.Data, bhp)
     518           1 :                 })
     519           1 :                 if err != nil {
     520           0 :                         return Layout{}, err
     521           0 :                 }
     522             :         }
     523             : 
     524           1 :         if layout.ValueIndex.Length > 0 {
     525           0 :                 vbiBlock, err := decompressInMemory(data, layout.ValueIndex)
     526           0 :                 if err != nil {
     527           0 :                         return Layout{}, errors.Wrap(err, "decompressing value index")
     528           0 :                 }
     529           0 :                 layout.ValueBlock, err = decodeValueBlockIndex(vbiBlock, vbih)
     530           0 :                 if err != nil {
     531           0 :                         return Layout{}, err
     532           0 :                 }
     533             :         }
     534             : 
     535           1 :         return layout, nil
     536             : }
     537             : 
     538           1 : func decompressInMemory(data []byte, bh block.Handle) ([]byte, error) {
     539           1 :         typ := block.CompressionIndicator(data[bh.Offset+bh.Length])
     540           1 :         var decompressed []byte
     541           1 :         if typ == block.NoCompressionIndicator {
     542           1 :                 return data[bh.Offset : bh.Offset+bh.Length], nil
     543           1 :         }
     544             :         // Decode the length of the decompressed value.
     545           1 :         decodedLen, prefixLen, err := block.DecompressedLen(typ, data[bh.Offset:bh.Offset+bh.Length])
     546           1 :         if err != nil {
     547           0 :                 return nil, err
     548           0 :         }
     549           1 :         decompressed = make([]byte, decodedLen)
     550           1 :         if err := block.DecompressInto(typ, data[int(bh.Offset)+prefixLen:bh.Offset+bh.Length], decompressed); err != nil {
     551           0 :                 return nil, err
     552           0 :         }
     553           1 :         return decompressed, nil
     554             : }
     555             : 
     556             : func newIndexIter(
     557             :         tableFormat TableFormat, comparer *base.Comparer, data []byte,
     558           1 : ) (block.IndexBlockIterator, error) {
     559           1 :         var iter block.IndexBlockIterator
     560           1 :         var err error
     561           1 :         if tableFormat <= TableFormatPebblev4 {
     562           1 :                 iter = new(rowblk.IndexIter)
     563           1 :                 err = iter.Init(comparer.Compare, comparer.Split, data, block.NoTransforms)
     564           1 :         } else {
     565           1 :                 iter = new(colblk.IndexIter)
     566           1 :                 err = iter.Init(comparer.Compare, comparer.Split, data, block.NoTransforms)
     567           1 :         }
     568           1 :         if err != nil {
     569           0 :                 return nil, err
     570           0 :         }
     571           1 :         return iter, nil
     572             : }
     573             : 
     574             : func forEachIndexEntry(
     575             :         indexIter block.IndexBlockIterator, fn func(block.HandleWithProperties),
     576           1 : ) error {
     577           1 :         for v := indexIter.First(); v; v = indexIter.Next() {
     578           1 :                 bhp, err := indexIter.BlockHandleWithProperties()
     579           1 :                 if err != nil {
     580           0 :                         return err
     581           0 :                 }
     582           1 :                 fn(bhp)
     583             :         }
     584           1 :         return indexIter.Close()
     585             : }
     586             : 
     587             : func decodeMetaindex(
     588             :         data []byte,
     589           2 : ) (meta map[string]block.Handle, vbih valueBlocksIndexHandle, err error) {
     590           2 :         i, err := rowblk.NewRawIter(bytes.Compare, data)
     591           2 :         if err != nil {
     592           0 :                 return nil, valueBlocksIndexHandle{}, err
     593           0 :         }
     594           2 :         defer func() { err = firstError(err, i.Close()) }()
     595             : 
     596           2 :         meta = map[string]block.Handle{}
     597           2 :         for valid := i.First(); valid; valid = i.Next() {
     598           2 :                 value := i.Value()
     599           2 :                 if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
     600           2 :                         var n int
     601           2 :                         vbih, n, err = decodeValueBlocksIndexHandle(i.Value())
     602           2 :                         if err != nil {
     603           0 :                                 return nil, vbih, err
     604           0 :                         }
     605           2 :                         if n == 0 || n != len(value) {
     606           0 :                                 return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
     607           0 :                         }
     608           2 :                 } else {
     609           2 :                         bh, n := block.DecodeHandle(value)
     610           2 :                         if n == 0 || n != len(value) {
     611           0 :                                 return nil, vbih, base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
     612           0 :                         }
     613           2 :                         meta[string(i.Key().UserKey)] = bh
     614             :                 }
     615             :         }
     616           2 :         return meta, vbih, nil
     617             : }
     618             : 
     619           2 : func decodeValueBlockIndex(data []byte, vbih valueBlocksIndexHandle) ([]block.Handle, error) {
     620           2 :         var valueBlocks []block.Handle
     621           2 :         indexEntryLen := int(vbih.blockNumByteLength + vbih.blockOffsetByteLength +
     622           2 :                 vbih.blockLengthByteLength)
     623           2 :         i := 0
     624           2 :         for len(data) != 0 {
     625           2 :                 if len(data) < indexEntryLen {
     626           0 :                         return nil, errors.Errorf(
     627           0 :                                 "remaining value index block %d does not contain a full entry of length %d",
     628           0 :                                 len(data), indexEntryLen)
     629           0 :                 }
     630           2 :                 n := int(vbih.blockNumByteLength)
     631           2 :                 bn := int(littleEndianGet(data, n))
     632           2 :                 if bn != i {
     633           0 :                         return nil, errors.Errorf("unexpected block num %d, expected %d",
     634           0 :                                 bn, i)
     635           0 :                 }
     636           2 :                 i++
     637           2 :                 data = data[n:]
     638           2 :                 n = int(vbih.blockOffsetByteLength)
     639           2 :                 blockOffset := littleEndianGet(data, n)
     640           2 :                 data = data[n:]
     641           2 :                 n = int(vbih.blockLengthByteLength)
     642           2 :                 blockLen := littleEndianGet(data, n)
     643           2 :                 data = data[n:]
     644           2 :                 valueBlocks = append(valueBlocks, block.Handle{Offset: blockOffset, Length: blockLen})
     645             :         }
     646           2 :         return valueBlocks, nil
     647             : }
     648             : 
     649             : // layoutWriter writes the structure of an sstable to durable storage. It
     650             : // accepts serialized blocks, writes them to storage and returns a block handle
     651             : // describing the offset and length of the block.
     652             : type layoutWriter struct {
     653             :         writable objstorage.Writable
     654             : 
     655             :         // cacheOpts are used to remove blocks written to the sstable from the cache,
     656             :         // providing a defense in depth against bugs which cause cache collisions.
     657             :         cacheOpts sstableinternal.CacheOptions
     658             : 
     659             :         // options copied from WriterOptions
     660             :         tableFormat  TableFormat
     661             :         compression  block.Compression
     662             :         checksumType block.ChecksumType
     663             : 
     664             :         // offset tracks the current write offset within the writable.
     665             :         offset uint64
     666             :         // lastIndexBlockHandle holds the handle to the most recently-written index
     667             :         // block.  It's updated by writeIndexBlock. When writing sstables with a
     668             :         // single-level index, this field will be updated once. When writing
     669             :         // sstables with a two-level index, the last update will set the two-level
     670             :         // index.
     671             :         lastIndexBlockHandle block.Handle
     672             :         handles              []metaIndexHandle
     673             :         handlesBuf           bytealloc.A
     674             :         tmp                  [blockHandleLikelyMaxLen]byte
     675             :         buf                  blockBuf
     676             : }
     677             : 
     678           2 : func makeLayoutWriter(w objstorage.Writable, opts WriterOptions) layoutWriter {
     679           2 :         return layoutWriter{
     680           2 :                 writable:     w,
     681           2 :                 cacheOpts:    opts.internal.CacheOpts,
     682           2 :                 tableFormat:  opts.TableFormat,
     683           2 :                 compression:  opts.Compression,
     684           2 :                 checksumType: opts.Checksum,
     685           2 :                 buf: blockBuf{
     686           2 :                         checksummer: block.Checksummer{Type: opts.Checksum},
     687           2 :                 },
     688           2 :         }
     689           2 : }
     690             : 
     691             : type metaIndexHandle struct {
     692             :         key                string
     693             :         encodedBlockHandle []byte
     694             : }
     695             : 
     696             : // Abort aborts writing the table, aborting the underlying writable too. Abort
     697             : // is idempotent.
     698           2 : func (w *layoutWriter) Abort() {
     699           2 :         if w.writable != nil {
     700           1 :                 w.writable.Abort()
     701           1 :                 w.writable = nil
     702           1 :         }
     703             : }
     704             : 
     705             : // WriteDataBlock constructs a trailer for the provided data block and writes
     706             : // the block and trailer to the writer. It returns the block's handle.
     707           2 : func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
     708           2 :         return w.writeBlock(b, w.compression, buf)
     709           2 : }
     710             : 
     711             : // WritePrecompressedDataBlock writes a pre-compressed data block and its
     712             : // pre-computed trailer to the writer, returning it's block handle.
     713           2 : func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (block.Handle, error) {
     714           2 :         return w.writePrecompressedBlock(blk)
     715           2 : }
     716             : 
     717             : // WriteIndexBlock constructs a trailer for the provided index (first or
     718             : // second-level) and writes the block and trailer to the writer. It remembers
     719             : // the last-written index block's handle and adds it to the file's meta index
     720             : // when the writer is finished.
     721           2 : func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
     722           2 :         h, err := w.writeBlock(b, w.compression, &w.buf)
     723           2 :         if err == nil {
     724           2 :                 w.lastIndexBlockHandle = h
     725           2 :         }
     726           2 :         return h, err
     727             : }
     728             : 
     729             : // WriteFilterBlock finishes the provided filter, constructs a trailer and
     730             : // writes the block and trailer to the writer. It automatically adds the filter
     731             : // block to the file's meta index when the writer is finished.
     732           2 : func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err error) {
     733           2 :         b, err := f.finish()
     734           2 :         if err != nil {
     735           0 :                 return block.Handle{}, err
     736           0 :         }
     737           2 :         return w.writeNamedBlock(b, f.metaName())
     738             : }
     739             : 
     740             : // WritePropertiesBlock constructs a trailer for the provided properties block
     741             : // and writes the block and trailer to the writer. It automatically adds the
     742             : // properties block to the file's meta index when the writer is finished.
     743           2 : func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
     744           2 :         return w.writeNamedBlock(b, metaPropertiesName)
     745           2 : }
     746             : 
     747             : // WriteRangeKeyBlock constructs a trailer for the provided range key block and
     748             : // writes the block and trailer to the writer. It automatically adds the range
     749             : // key block to the file's meta index when the writer is finished.
     750           2 : func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
     751           2 :         return w.writeNamedBlock(b, metaRangeKeyName)
     752           2 : }
     753             : 
     754             : // WriteRangeDeletionBlock constructs a trailer for the provided range deletion
     755             : // block and writes the block and trailer to the writer. It automatically adds
     756             : // the range deletion block to the file's meta index when the writer is
     757             : // finished.
     758           2 : func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
     759           2 :         return w.writeNamedBlock(b, metaRangeDelV2Name)
     760           2 : }
     761             : 
     762           2 : func (w *layoutWriter) writeNamedBlock(b []byte, name string) (bh block.Handle, err error) {
     763           2 :         bh, err = w.writeBlock(b, block.NoCompression, &w.buf)
     764           2 :         if err == nil {
     765           2 :                 w.recordToMetaindex(name, bh)
     766           2 :         }
     767           2 :         return bh, err
     768             : }
     769             : 
     770             : // WriteValueBlock writes a pre-finished value block (with the trailer) to the
     771             : // writer.
     772           2 : func (w *layoutWriter) WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error) {
     773           2 :         return w.writePrecompressedBlock(blk)
     774           2 : }
     775             : 
     776             : func (w *layoutWriter) WriteValueIndexBlock(
     777             :         blk []byte, vbih valueBlocksIndexHandle,
     778           2 : ) (block.Handle, error) {
     779           2 :         // NB: value index blocks are already finished and contain the block
     780           2 :         // trailer.
     781           2 :         // TODO(jackson): can this be refactored to make value blocks less
     782           2 :         // of a snowflake?
     783           2 :         off := w.offset
     784           2 :         w.clearFromCache(off)
     785           2 :         // Write the bytes to the file.
     786           2 :         if err := w.writable.Write(blk); err != nil {
     787           0 :                 return block.Handle{}, err
     788           0 :         }
     789           2 :         l := uint64(len(blk))
     790           2 :         w.offset += l
     791           2 : 
     792           2 :         n := encodeValueBlocksIndexHandle(w.tmp[:], vbih)
     793           2 :         w.recordToMetaindexRaw(metaValueIndexName, w.tmp[:n])
     794           2 : 
     795           2 :         return block.Handle{Offset: off, Length: l}, nil
     796             : }
     797             : 
     798             : func (w *layoutWriter) writeBlock(
     799             :         b []byte, compression block.Compression, buf *blockBuf,
     800           2 : ) (block.Handle, error) {
     801           2 :         return w.writePrecompressedBlock(block.CompressAndChecksum(
     802           2 :                 &buf.compressedBuf, b, compression, &buf.checksummer))
     803           2 : }
     804             : 
     805             : // writePrecompressedBlock writes a pre-compressed block and its
     806             : // pre-computed trailer to the writer, returning it's block handle.
     807           2 : func (w *layoutWriter) writePrecompressedBlock(blk block.PhysicalBlock) (block.Handle, error) {
     808           2 :         w.clearFromCache(w.offset)
     809           2 :         // Write the bytes to the file.
     810           2 :         n, err := blk.WriteTo(w.writable)
     811           2 :         if err != nil {
     812           0 :                 return block.Handle{}, err
     813           0 :         }
     814           2 :         bh := block.Handle{Offset: w.offset, Length: uint64(blk.LengthWithoutTrailer())}
     815           2 :         w.offset += uint64(n)
     816           2 :         return bh, nil
     817             : }
     818             : 
     819             : // Write implements io.Writer. This is analogous to writePrecompressedBlock for
     820             : // blocks that already incorporate the trailer, and don't need the callee to
     821             : // return a BlockHandle.
     822           0 : func (w *layoutWriter) Write(blockWithTrailer []byte) (n int, err error) {
     823           0 :         offset := w.offset
     824           0 :         w.clearFromCache(offset)
     825           0 :         w.offset += uint64(len(blockWithTrailer))
     826           0 :         if err := w.writable.Write(blockWithTrailer); err != nil {
     827           0 :                 return 0, err
     828           0 :         }
     829           0 :         return len(blockWithTrailer), nil
     830             : }
     831             : 
     832             : // clearFromCache removes the block at the provided offset from the cache. This provides defense in
     833             : // depth against bugs which cause cache collisions.
     834           2 : func (w *layoutWriter) clearFromCache(offset uint64) {
     835           2 :         if w.cacheOpts.Cache != nil {
     836           2 :                 // TODO(peter): Alternatively, we could add the uncompressed value to the
     837           2 :                 // cache.
     838           2 :                 w.cacheOpts.Cache.Delete(w.cacheOpts.CacheID, w.cacheOpts.FileNum, offset)
     839           2 :         }
     840             : }
     841             : 
     842           2 : func (w *layoutWriter) recordToMetaindex(key string, h block.Handle) {
     843           2 :         n := h.EncodeVarints(w.tmp[:])
     844           2 :         w.recordToMetaindexRaw(key, w.tmp[:n])
     845           2 : }
     846             : 
     847           2 : func (w *layoutWriter) recordToMetaindexRaw(key string, h []byte) {
     848           2 :         var encodedHandle []byte
     849           2 :         w.handlesBuf, encodedHandle = w.handlesBuf.Alloc(len(h))
     850           2 :         copy(encodedHandle, h)
     851           2 :         w.handles = append(w.handles, metaIndexHandle{key: key, encodedBlockHandle: encodedHandle})
     852           2 : }
     853             : 
     854           2 : func (w *layoutWriter) IsFinished() bool { return w.writable == nil }
     855             : 
     856             : // Finish serializes the sstable, writing out the meta index block and sstable
     857             : // footer and closing the file. It returns the total size of the resulting
     858             : // ssatable.
     859           2 : func (w *layoutWriter) Finish() (size uint64, err error) {
     860           2 :         // Sort the meta index handles by key and write the meta index block.
     861           2 :         slices.SortFunc(w.handles, func(a, b metaIndexHandle) int {
     862           2 :                 return cmp.Compare(a.key, b.key)
     863           2 :         })
     864           2 :         bw := rowblk.Writer{RestartInterval: 1}
     865           2 :         for _, h := range w.handles {
     866           2 :                 bw.AddRaw(unsafe.Slice(unsafe.StringData(h.key), len(h.key)), h.encodedBlockHandle)
     867           2 :         }
     868           2 :         metaIndexHandle, err := w.writeBlock(bw.Finish(), block.NoCompression, &w.buf)
     869           2 :         if err != nil {
     870           0 :                 return 0, err
     871           0 :         }
     872             : 
     873             :         // Write the table footer.
     874           2 :         footer := footer{
     875           2 :                 format:      w.tableFormat,
     876           2 :                 checksum:    w.checksumType,
     877           2 :                 metaindexBH: metaIndexHandle,
     878           2 :                 indexBH:     w.lastIndexBlockHandle,
     879           2 :         }
     880           2 :         encodedFooter := footer.encode(w.tmp[:])
     881           2 :         if err := w.writable.Write(encodedFooter); err != nil {
     882           0 :                 return 0, err
     883           0 :         }
     884           2 :         w.offset += uint64(len(encodedFooter))
     885           2 : 
     886           2 :         err = w.writable.Finish()
     887           2 :         w.writable = nil
     888           2 :         return w.offset, err
     889             : }

Generated by: LCOV version 1.14