LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2024-03-27 08:24Z 5babbee7 - tests only.lcov Lines: 740 850 87.1 %
Date: 2024-03-27 08:24:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "os"
      15             :         "slices"
      16             :         "time"
      17             : 
      18             :         "github.com/cespare/xxhash/v2"
      19             :         "github.com/cockroachdb/errors"
      20             :         "github.com/cockroachdb/pebble/internal/base"
      21             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      22             :         "github.com/cockroachdb/pebble/internal/cache"
      23             :         "github.com/cockroachdb/pebble/internal/crc"
      24             :         "github.com/cockroachdb/pebble/internal/invariants"
      25             :         "github.com/cockroachdb/pebble/internal/keyspan"
      26             :         "github.com/cockroachdb/pebble/internal/private"
      27             :         "github.com/cockroachdb/pebble/objstorage"
      28             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      29             : )
      30             : 
      31             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      32             : 
      33             : // decodeBlockHandle returns the block handle encoded at the start of src, as
      34             : // well as the number of bytes it occupies. It returns zero if given invalid
      35             : // input. A block handle for a data block or a first/lower level index block
      36             : // should not be decoded using decodeBlockHandle since the caller may validate
      37             : // that the number of bytes decoded is equal to the length of src, which will
      38             : // be false if the properties are not decoded. In those cases the caller
      39             : // should use decodeBlockHandleWithProperties.
      40           1 : func decodeBlockHandle(src []byte) (BlockHandle, int) {
      41           1 :         offset, n := binary.Uvarint(src)
      42           1 :         length, m := binary.Uvarint(src[n:])
      43           1 :         if n == 0 || m == 0 {
      44           0 :                 return BlockHandle{}, 0
      45           0 :         }
      46           1 :         return BlockHandle{offset, length}, n + m
      47             : }
      48             : 
      49             : // decodeBlockHandleWithProperties returns the block handle and properties
      50             : // encoded in src. src needs to be exactly the length that was encoded. This
      51             : // method must be used for data block and first/lower level index blocks. The
      52             : // properties in the block handle point to the bytes in src.
      53           1 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
      54           1 :         bh, n := decodeBlockHandle(src)
      55           1 :         if n == 0 {
      56           0 :                 return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
      57           0 :         }
      58           1 :         return BlockHandleWithProperties{
      59           1 :                 BlockHandle: bh,
      60           1 :                 Props:       src[n:],
      61           1 :         }, nil
      62             : }
      63             : 
      64           1 : func encodeBlockHandle(dst []byte, b BlockHandle) int {
      65           1 :         n := binary.PutUvarint(dst, b.Offset)
      66           1 :         m := binary.PutUvarint(dst[n:], b.Length)
      67           1 :         return n + m
      68           1 : }
      69             : 
      70           1 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
      71           1 :         n := encodeBlockHandle(dst, b.BlockHandle)
      72           1 :         dst = append(dst[:n], b.Props...)
      73           1 :         return dst
      74           1 : }
      75             : 
      76             : // block is a []byte that holds a sequence of key/value pairs plus an index
      77             : // over those pairs.
      78             : type block []byte
      79             : 
      80             : type loadBlockResult int8
      81             : 
      82             : const (
      83             :         loadBlockOK loadBlockResult = iota
      84             :         // Could be due to error or because no block left to load.
      85             :         loadBlockFailed
      86             :         loadBlockIrrelevant
      87             : )
      88             : 
      89             : type blockTransform func([]byte) ([]byte, error)
      90             : 
      91             : // ReaderOption provide an interface to do work on Reader while it is being
      92             : // opened.
      93             : type ReaderOption interface {
      94             :         // readerApply is called on the reader during opening in order to set internal
      95             :         // parameters.
      96             :         readerApply(*Reader)
      97             : }
      98             : 
      99             : // Comparers is a map from comparer name to comparer. It is used for debugging
     100             : // tools which may be used on multiple databases configured with different
     101             : // comparers. Comparers implements the OpenOption interface and can be passed
     102             : // as a parameter to NewReader.
     103             : type Comparers map[string]*Comparer
     104             : 
     105           1 : func (c Comparers) readerApply(r *Reader) {
     106           1 :         if r.Compare != nil || r.Properties.ComparerName == "" {
     107           1 :                 return
     108           1 :         }
     109           1 :         if comparer, ok := c[r.Properties.ComparerName]; ok {
     110           1 :                 r.Compare = comparer.Compare
     111           1 :                 r.Equal = comparer.Equal
     112           1 :                 r.FormatKey = comparer.FormatKey
     113           1 :                 r.Split = comparer.Split
     114           1 :         }
     115             : }
     116             : 
     117             : // Mergers is a map from merger name to merger. It is used for debugging tools
     118             : // which may be used on multiple databases configured with different
     119             : // mergers. Mergers implements the OpenOption interface and can be passed as
     120             : // a parameter to NewReader.
     121             : type Mergers map[string]*Merger
     122             : 
     123           1 : func (m Mergers) readerApply(r *Reader) {
     124           1 :         if r.mergerOK || r.Properties.MergerName == "" {
     125           1 :                 return
     126           1 :         }
     127           1 :         _, r.mergerOK = m[r.Properties.MergerName]
     128             : }
     129             : 
     130             : // cacheOpts is a Reader open option for specifying the cache ID and sstable file
     131             : // number. If not specified, a unique cache ID will be used.
     132             : type cacheOpts struct {
     133             :         cacheID uint64
     134             :         fileNum base.DiskFileNum
     135             : }
     136             : 
     137             : // Marker function to indicate the option should be applied before reading the
     138             : // sstable properties and, in the write path, before writing the default
     139             : // sstable properties.
     140           0 : func (c *cacheOpts) preApply() {}
     141             : 
     142           1 : func (c *cacheOpts) readerApply(r *Reader) {
     143           1 :         if r.cacheID == 0 {
     144           1 :                 r.cacheID = c.cacheID
     145           1 :         }
     146           1 :         if r.fileNum == 0 {
     147           1 :                 r.fileNum = c.fileNum
     148           1 :         }
     149             : }
     150             : 
     151           1 : func (c *cacheOpts) writerApply(w *Writer) {
     152           1 :         if w.cacheID == 0 {
     153           1 :                 w.cacheID = c.cacheID
     154           1 :         }
     155           1 :         if w.fileNum == 0 {
     156           1 :                 w.fileNum = c.fileNum
     157           1 :         }
     158             : }
     159             : 
     160             : // SyntheticSuffix will replace every suffix of every key surfaced during block
     161             : // iteration. A synthetic suffix can be used if:
     162             : //  1. no two keys in the sst share the same prefix; and
     163             : //  2. pebble.Compare(prefix + replacementSuffix, prefix + originalSuffix) < 0,
     164             : //     for all keys in the backing sst which have a suffix (i.e. originalSuffix
     165             : //     is not empty).
     166             : type SyntheticSuffix []byte
     167             : 
     168             : // IsSet returns true if the synthetic suffix is not enpty.
     169           1 : func (ss SyntheticSuffix) IsSet() bool {
     170           1 :         return len(ss) > 0
     171           1 : }
     172             : 
     173             : // SyntheticPrefix represents a byte slice that is implicitly prepended to every
     174             : // key in a file being read or accessed by a reader.  Note that the table is
     175             : // assumed to contain "prefix-less" keys that become full keys when prepended
     176             : // with the synthetic prefix. The table's bloom filters are constructed only on
     177             : // the "prefix-less" keys in the table, but interactions with the file including
     178             : // seeks and reads, will all behave as if the file had been constructed from
     179             : // keys that did include the prefix. Note that all Compare operations may act on
     180             : // a prefix-less key as the synthetic prefix will never modify key metadata
     181             : // stored in the key suffix.
     182             : //
     183             : // NB: Since this transformation currently only applies to point keys, a block
     184             : // with range keys cannot be iterated over with a synthetic prefix.
     185             : type SyntheticPrefix []byte
     186             : 
     187             : // IsSet returns true if the synthetic prefix is not enpty.
     188           1 : func (sp SyntheticPrefix) IsSet() bool {
     189           1 :         return len(sp) > 0
     190           1 : }
     191             : 
     192             : // Apply prepends the synthetic prefix to a key.
     193           1 : func (sp SyntheticPrefix) Apply(key []byte) []byte {
     194           1 :         res := make([]byte, 0, len(sp)+len(key))
     195           1 :         res = append(res, sp...)
     196           1 :         res = append(res, key...)
     197           1 :         return res
     198           1 : }
     199             : 
     200             : // Invert removes the synthetic prefix from a key.
     201           1 : func (sp SyntheticPrefix) Invert(key []byte) []byte {
     202           1 :         res, ok := bytes.CutPrefix(key, sp)
     203           1 :         if !ok {
     204           0 :                 panic(fmt.Sprintf("unexpected prefix: %s", key))
     205             :         }
     206           1 :         return res
     207             : }
     208             : 
     209             : // rawTombstonesOpt is a Reader open option for specifying that range
     210             : // tombstones returned by Reader.NewRangeDelIter() should not be
     211             : // fragmented. Used by debug tools to get a raw view of the tombstones
     212             : // contained in an sstable.
     213             : type rawTombstonesOpt struct{}
     214             : 
     215           0 : func (rawTombstonesOpt) preApply() {}
     216             : 
     217           1 : func (rawTombstonesOpt) readerApply(r *Reader) {
     218           1 :         r.rawTombstones = true
     219           1 : }
     220             : 
     221           1 : func init() {
     222           1 :         private.SSTableCacheOpts = func(cacheID uint64, fileNum base.DiskFileNum) interface{} {
     223           1 :                 return &cacheOpts{cacheID, fileNum}
     224           1 :         }
     225           1 :         private.SSTableRawTombstonesOpt = rawTombstonesOpt{}
     226             : }
     227             : 
     228             : // Reader is a table reader.
     229             : type Reader struct {
     230             :         readable          objstorage.Readable
     231             :         cacheID           uint64
     232             :         fileNum           base.DiskFileNum
     233             :         err               error
     234             :         indexBH           BlockHandle
     235             :         filterBH          BlockHandle
     236             :         rangeDelBH        BlockHandle
     237             :         rangeKeyBH        BlockHandle
     238             :         rangeDelTransform blockTransform
     239             :         valueBIH          valueBlocksIndexHandle
     240             :         propertiesBH      BlockHandle
     241             :         metaIndexBH       BlockHandle
     242             :         footerBH          BlockHandle
     243             :         opts              ReaderOptions
     244             :         Compare           Compare
     245             :         Equal             Equal
     246             :         FormatKey         base.FormatKey
     247             :         Split             Split
     248             :         tableFilter       *tableFilterReader
     249             :         // Keep types that are not multiples of 8 bytes at the end and with
     250             :         // decreasing size.
     251             :         Properties    Properties
     252             :         tableFormat   TableFormat
     253             :         rawTombstones bool
     254             :         mergerOK      bool
     255             :         checksumType  ChecksumType
     256             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
     257             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
     258             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
     259             :         // capacity 3 to accommodate the meta block (1), and both the compressed
     260             :         // properties block (1) and decompressed properties block (1)
     261             :         // simultaneously.
     262             :         metaBufferPool      BufferPool
     263             :         metaBufferPoolAlloc [3]allocedBuffer
     264             : }
     265             : 
     266             : var _ CommonReader = (*Reader)(nil)
     267             : 
     268             : // Close implements DB.Close, as documented in the pebble package.
     269           1 : func (r *Reader) Close() error {
     270           1 :         r.opts.Cache.Unref()
     271           1 : 
     272           1 :         if r.readable != nil {
     273           1 :                 r.err = firstError(r.err, r.readable.Close())
     274           1 :                 r.readable = nil
     275           1 :         }
     276             : 
     277           1 :         if r.err != nil {
     278           1 :                 return r.err
     279           1 :         }
     280             :         // Make any future calls to Get, NewIter or Close return an error.
     281           1 :         r.err = errReaderClosed
     282           1 :         return nil
     283             : }
     284             : 
     285             : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
     286             : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
     287             : // itself and returns a nil iterator.
     288             : func (r *Reader) NewIterWithBlockPropertyFilters(
     289             :         transforms IterTransforms,
     290             :         lower, upper []byte,
     291             :         filterer *BlockPropertiesFilterer,
     292             :         useFilterBlock bool,
     293             :         stats *base.InternalIteratorStats,
     294             :         categoryAndQoS CategoryAndQoS,
     295             :         statsCollector *CategoryStatsCollector,
     296             :         rp ReaderProvider,
     297           1 : ) (Iterator, error) {
     298           1 :         return r.newIterWithBlockPropertyFiltersAndContext(
     299           1 :                 context.Background(), transforms, lower, upper, filterer, useFilterBlock,
     300           1 :                 stats, categoryAndQoS, statsCollector, rp, nil)
     301           1 : }
     302             : 
     303             : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
     304             : // NewIterWithBlockPropertyFilters and additionally accepts a context for
     305             : // tracing.
     306             : //
     307             : // If transform.HideObsoletePoints is set, the callee assumes that filterer
     308             : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
     309             : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     310             : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
     311             :         ctx context.Context,
     312             :         transforms IterTransforms,
     313             :         lower, upper []byte,
     314             :         filterer *BlockPropertiesFilterer,
     315             :         useFilterBlock bool,
     316             :         stats *base.InternalIteratorStats,
     317             :         categoryAndQoS CategoryAndQoS,
     318             :         statsCollector *CategoryStatsCollector,
     319             :         rp ReaderProvider,
     320           1 : ) (Iterator, error) {
     321           1 :         return r.newIterWithBlockPropertyFiltersAndContext(
     322           1 :                 ctx, transforms, lower, upper, filterer, useFilterBlock,
     323           1 :                 stats, categoryAndQoS, statsCollector, rp, nil)
     324           1 : }
     325             : 
     326             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     327             : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
     328             : // value of hideObsoletePoints and potentially add a block property filter.
     329             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     330             :         snapshotForHideObsoletePoints uint64,
     331             :         fileLargestSeqNum uint64,
     332             :         pointKeyFilters []BlockPropertyFilter,
     333           1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     334           1 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     335           1 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     336           1 :         if hideObsoletePoints {
     337           1 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     338           1 :         }
     339           1 :         return hideObsoletePoints, pointKeyFilters
     340             : }
     341             : 
     342             : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
     343             :         ctx context.Context,
     344             :         transforms IterTransforms,
     345             :         lower, upper []byte,
     346             :         filterer *BlockPropertiesFilterer,
     347             :         useFilterBlock bool,
     348             :         stats *base.InternalIteratorStats,
     349             :         categoryAndQoS CategoryAndQoS,
     350             :         statsCollector *CategoryStatsCollector,
     351             :         rp ReaderProvider,
     352             :         vState *virtualState,
     353           1 : ) (Iterator, error) {
     354           1 :         // NB: pebble.tableCache wraps the returned iterator with one which performs
     355           1 :         // reference counting on the Reader, preventing the Reader from being closed
     356           1 :         // until the final iterator closes.
     357           1 :         if r.Properties.IndexType == twoLevelIndex {
     358           1 :                 i := twoLevelIterPool.Get().(*twoLevelIterator)
     359           1 :                 err := i.init(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
     360           1 :                         stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     361           1 :                 if err != nil {
     362           1 :                         return nil, err
     363           1 :                 }
     364           1 :                 return i, nil
     365             :         }
     366             : 
     367           1 :         i := singleLevelIterPool.Get().(*singleLevelIterator)
     368           1 :         err := i.init(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
     369           1 :                 stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     370           1 :         if err != nil {
     371           1 :                 return nil, err
     372           1 :         }
     373           1 :         return i, nil
     374             : }
     375             : 
     376             : // NewIter returns an iterator for the contents of the table. If an error
     377             : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
     378             : // must only be used when the Reader is guaranteed to outlive any LazyValues
     379             : // returned from the iter.
     380           1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
     381           1 :         return r.NewIterWithBlockPropertyFilters(
     382           1 :                 transforms, lower, upper, nil, true, /* useFilterBlock */
     383           1 :                 nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, TrivialReaderProvider{Reader: r})
     384           1 : }
     385             : 
     386             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     387             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     388             : // after itself and returns a nil iterator.
     389             : func (r *Reader) NewCompactionIter(
     390             :         transforms IterTransforms,
     391             :         bytesIterated *uint64,
     392             :         categoryAndQoS CategoryAndQoS,
     393             :         statsCollector *CategoryStatsCollector,
     394             :         rp ReaderProvider,
     395             :         bufferPool *BufferPool,
     396           1 : ) (Iterator, error) {
     397           1 :         return r.newCompactionIter(transforms, bytesIterated, categoryAndQoS, statsCollector, rp, nil, bufferPool)
     398           1 : }
     399             : 
     400             : func (r *Reader) newCompactionIter(
     401             :         transforms IterTransforms,
     402             :         bytesIterated *uint64,
     403             :         categoryAndQoS CategoryAndQoS,
     404             :         statsCollector *CategoryStatsCollector,
     405             :         rp ReaderProvider,
     406             :         vState *virtualState,
     407             :         bufferPool *BufferPool,
     408           1 : ) (Iterator, error) {
     409           1 :         if vState != nil && vState.isSharedIngested {
     410           1 :                 transforms.HideObsoletePoints = true
     411           1 :         }
     412           1 :         if r.Properties.IndexType == twoLevelIndex {
     413           1 :                 i := twoLevelIterPool.Get().(*twoLevelIterator)
     414           1 :                 err := i.init(
     415           1 :                         context.Background(),
     416           1 :                         r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     417           1 :                         false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     418           1 :                 )
     419           1 :                 if err != nil {
     420           0 :                         return nil, err
     421           0 :                 }
     422           1 :                 i.setupForCompaction()
     423           1 :                 return &twoLevelCompactionIterator{
     424           1 :                         twoLevelIterator: i,
     425           1 :                         bytesIterated:    bytesIterated,
     426           1 :                 }, nil
     427             :         }
     428           1 :         i := singleLevelIterPool.Get().(*singleLevelIterator)
     429           1 :         err := i.init(
     430           1 :                 context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     431           1 :                 nil, false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     432           1 :         )
     433           1 :         if err != nil {
     434           0 :                 return nil, err
     435           0 :         }
     436           1 :         i.setupForCompaction()
     437           1 :         return &compactionIterator{
     438           1 :                 singleLevelIterator: i,
     439           1 :                 bytesIterated:       bytesIterated,
     440           1 :         }, nil
     441             : }
     442             : 
     443             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     444             : // range-del block for the table. Returns nil if the table does not contain
     445             : // any range deletions.
     446             : //
     447             : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
     448             : // iterator. Add WithContext methods since the existing ones are public.
     449           1 : func (r *Reader) NewRawRangeDelIter(transforms IterTransforms) (keyspan.FragmentIterator, error) {
     450           1 :         if r.rangeDelBH.Length == 0 {
     451           1 :                 return nil, nil
     452           1 :         }
     453           1 :         if transforms.SyntheticSuffix.IsSet() {
     454           0 :                 return nil, errors.AssertionFailedf("synthetic suffix not supported with range del iterator")
     455           0 :         }
     456           1 :         if transforms.SyntheticPrefix.IsSet() {
     457           0 :                 return nil, errors.AssertionFailedf("synthetic prefix not supported with range del iterator")
     458           0 :         }
     459           1 :         h, err := r.readRangeDel(nil /* stats */, nil /* iterStats */)
     460           1 :         if err != nil {
     461           1 :                 return nil, err
     462           1 :         }
     463           1 :         i := &fragmentBlockIter{elideSameSeqnum: true}
     464           1 :         // It's okay for hideObsoletePoints to be false here, even for shared ingested
     465           1 :         // sstables. This is because rangedels do not apply to points in the same
     466           1 :         // sstable at the same sequence number anyway, so exposing obsolete rangedels
     467           1 :         // is harmless.
     468           1 :         if err := i.blockIter.initHandle(r.Compare, r.Split, h, transforms); err != nil {
     469           0 :                 return nil, err
     470           0 :         }
     471           1 :         return i, nil
     472             : }
     473             : 
     474             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     475             : // range-key block for the table. Returns nil if the table does not contain any
     476             : // range keys.
     477             : //
     478             : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
     479             : // iterator. Add WithContext methods since the existing ones are public.
     480           1 : func (r *Reader) NewRawRangeKeyIter(transforms IterTransforms) (keyspan.FragmentIterator, error) {
     481           1 :         if r.rangeKeyBH.Length == 0 {
     482           1 :                 return nil, nil
     483           1 :         }
     484           1 :         if transforms.SyntheticSuffix.IsSet() {
     485           0 :                 return nil, errors.AssertionFailedf("synthetic suffix not supported with range key iterator")
     486           0 :         }
     487           1 :         if transforms.SyntheticPrefix.IsSet() {
     488           0 :                 return nil, errors.AssertionFailedf("synthetic prefix not supported with range key iterator")
     489           0 :         }
     490           1 :         h, err := r.readRangeKey(nil /* stats */, nil /* iterStats */)
     491           1 :         if err != nil {
     492           1 :                 return nil, err
     493           1 :         }
     494           1 :         i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter)
     495           1 : 
     496           1 :         if err := i.blockIter.initHandle(r.Compare, r.Split, h, transforms); err != nil {
     497           0 :                 return nil, err
     498           0 :         }
     499           1 :         return i, nil
     500             : }
     501             : 
     502             : type rangeKeyFragmentBlockIter struct {
     503             :         fragmentBlockIter
     504             : }
     505             : 
     506           1 : func (i *rangeKeyFragmentBlockIter) Close() error {
     507           1 :         err := i.fragmentBlockIter.Close()
     508           1 :         i.fragmentBlockIter = i.fragmentBlockIter.resetForReuse()
     509           1 :         rangeKeyFragmentBlockIterPool.Put(i)
     510           1 :         return err
     511           1 : }
     512             : 
     513             : func (r *Reader) readIndex(
     514             :         ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     515           1 : ) (bufferHandle, error) {
     516           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     517           1 :         return r.readBlock(ctx, r.indexBH, nil, nil, stats, iterStats, nil /* buffer pool */)
     518           1 : }
     519             : 
     520             : func (r *Reader) readFilter(
     521             :         ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     522           1 : ) (bufferHandle, error) {
     523           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     524           1 :         return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     525           1 : }
     526             : 
     527             : func (r *Reader) readRangeDel(
     528             :         stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     529           1 : ) (bufferHandle, error) {
     530           1 :         ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
     531           1 :         return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     532           1 : }
     533             : 
     534             : func (r *Reader) readRangeKey(
     535             :         stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     536           1 : ) (bufferHandle, error) {
     537           1 :         ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
     538           1 :         return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     539           1 : }
     540             : 
     541             : func checkChecksum(
     542             :         checksumType ChecksumType, b []byte, bh BlockHandle, fileNum base.DiskFileNum,
     543           1 : ) error {
     544           1 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     545           1 :         var computedChecksum uint32
     546           1 :         switch checksumType {
     547           1 :         case ChecksumTypeCRC32c:
     548           1 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     549           1 :         case ChecksumTypeXXHash64:
     550           1 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     551           0 :         default:
     552           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     553             :         }
     554             : 
     555           1 :         if expectedChecksum != computedChecksum {
     556           1 :                 return base.CorruptionErrorf(
     557           1 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     558           1 :                         fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
     559           1 :         }
     560           1 :         return nil
     561             : }
     562             : 
     563             : type cacheValueOrBuf struct {
     564             :         // buf.Valid() returns true if backed by a BufferPool.
     565             :         buf Buf
     566             :         // v is non-nil if backed by the block cache.
     567             :         v *cache.Value
     568             : }
     569             : 
     570           1 : func (b cacheValueOrBuf) get() []byte {
     571           1 :         if b.buf.Valid() {
     572           1 :                 return b.buf.p.pool[b.buf.i].b
     573           1 :         }
     574           1 :         return b.v.Buf()
     575             : }
     576             : 
     577           1 : func (b cacheValueOrBuf) release() {
     578           1 :         if b.buf.Valid() {
     579           1 :                 b.buf.Release()
     580           1 :         } else {
     581           1 :                 cache.Free(b.v)
     582           1 :         }
     583             : }
     584             : 
     585           1 : func (b cacheValueOrBuf) truncate(n int) {
     586           1 :         if b.buf.Valid() {
     587           1 :                 b.buf.p.pool[b.buf.i].b = b.buf.p.pool[b.buf.i].b[:n]
     588           1 :         } else {
     589           1 :                 b.v.Truncate(n)
     590           1 :         }
     591             : }
     592             : 
     593             : // DeterministicReadBlockDurationForTesting is for tests that want a
     594             : // deterministic value of the time to read a block (that is not in the cache).
     595             : // The return value is a function that must be called before the test exits.
     596           1 : func DeterministicReadBlockDurationForTesting() func() {
     597           1 :         drbdForTesting := deterministicReadBlockDurationForTesting
     598           1 :         deterministicReadBlockDurationForTesting = true
     599           1 :         return func() {
     600           1 :                 deterministicReadBlockDurationForTesting = drbdForTesting
     601           1 :         }
     602             : }
     603             : 
     604             : var deterministicReadBlockDurationForTesting = false
     605             : 
     606             : func (r *Reader) readBlock(
     607             :         ctx context.Context,
     608             :         bh BlockHandle,
     609             :         transform blockTransform,
     610             :         readHandle objstorage.ReadHandle,
     611             :         stats *base.InternalIteratorStats,
     612             :         iterStats *iterStatsAccumulator,
     613             :         bufferPool *BufferPool,
     614           1 : ) (handle bufferHandle, _ error) {
     615           1 :         if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
     616           1 :                 // Cache hit.
     617           1 :                 if readHandle != nil {
     618           1 :                         readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+blockTrailerLen))
     619           1 :                 }
     620           1 :                 if stats != nil {
     621           1 :                         stats.BlockBytes += bh.Length
     622           1 :                         stats.BlockBytesInCache += bh.Length
     623           1 :                 }
     624           1 :                 if iterStats != nil {
     625           1 :                         iterStats.reportStats(bh.Length, bh.Length, 0)
     626           1 :                 }
     627             :                 // This block is already in the cache; return a handle to existing vlaue
     628             :                 // in the cache.
     629           1 :                 return bufferHandle{h: h}, nil
     630             :         }
     631             : 
     632             :         // Cache miss.
     633           1 :         var compressed cacheValueOrBuf
     634           1 :         if bufferPool != nil {
     635           1 :                 compressed = cacheValueOrBuf{
     636           1 :                         buf: bufferPool.Alloc(int(bh.Length + blockTrailerLen)),
     637           1 :                 }
     638           1 :         } else {
     639           1 :                 compressed = cacheValueOrBuf{
     640           1 :                         v: cache.Alloc(int(bh.Length + blockTrailerLen)),
     641           1 :                 }
     642           1 :         }
     643             : 
     644           1 :         readStartTime := time.Now()
     645           1 :         var err error
     646           1 :         if readHandle != nil {
     647           1 :                 err = readHandle.ReadAt(ctx, compressed.get(), int64(bh.Offset))
     648           1 :         } else {
     649           1 :                 err = r.readable.ReadAt(ctx, compressed.get(), int64(bh.Offset))
     650           1 :         }
     651           1 :         readDuration := time.Since(readStartTime)
     652           1 :         // TODO(sumeer): should the threshold be configurable.
     653           1 :         const slowReadTracingThreshold = 5 * time.Millisecond
     654           1 :         // For deterministic testing.
     655           1 :         if deterministicReadBlockDurationForTesting {
     656           1 :                 readDuration = slowReadTracingThreshold
     657           1 :         }
     658             :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     659             :         // interface{}, unless necessary.
     660           1 :         if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) {
     661           1 :                 r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s",
     662           1 :                         int(bh.Length+blockTrailerLen), readDuration.String())
     663           1 :         }
     664           1 :         if stats != nil {
     665           1 :                 stats.BlockReadDuration += readDuration
     666           1 :         }
     667           1 :         if err != nil {
     668           1 :                 compressed.release()
     669           1 :                 return bufferHandle{}, err
     670           1 :         }
     671           1 :         if err := checkChecksum(r.checksumType, compressed.get(), bh, r.fileNum); err != nil {
     672           1 :                 compressed.release()
     673           1 :                 return bufferHandle{}, err
     674           1 :         }
     675             : 
     676           1 :         typ := blockType(compressed.get()[bh.Length])
     677           1 :         compressed.truncate(int(bh.Length))
     678           1 : 
     679           1 :         var decompressed cacheValueOrBuf
     680           1 :         if typ == noCompressionBlockType {
     681           1 :                 decompressed = compressed
     682           1 :         } else {
     683           1 :                 // Decode the length of the decompressed value.
     684           1 :                 decodedLen, prefixLen, err := decompressedLen(typ, compressed.get())
     685           1 :                 if err != nil {
     686           0 :                         compressed.release()
     687           0 :                         return bufferHandle{}, err
     688           0 :                 }
     689             : 
     690           1 :                 if bufferPool != nil {
     691           1 :                         decompressed = cacheValueOrBuf{buf: bufferPool.Alloc(decodedLen)}
     692           1 :                 } else {
     693           1 :                         decompressed = cacheValueOrBuf{v: cache.Alloc(decodedLen)}
     694           1 :                 }
     695           1 :                 if err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil {
     696           0 :                         compressed.release()
     697           0 :                         return bufferHandle{}, err
     698           0 :                 }
     699           1 :                 compressed.release()
     700             :         }
     701             : 
     702           1 :         if transform != nil {
     703           1 :                 // Transforming blocks is very rare, so the extra copy of the
     704           1 :                 // transformed data is not problematic.
     705           1 :                 tmpTransformed, err := transform(decompressed.get())
     706           1 :                 if err != nil {
     707           0 :                         decompressed.release()
     708           0 :                         return bufferHandle{}, err
     709           0 :                 }
     710             : 
     711           1 :                 var transformed cacheValueOrBuf
     712           1 :                 if bufferPool != nil {
     713           0 :                         transformed = cacheValueOrBuf{buf: bufferPool.Alloc(len(tmpTransformed))}
     714           1 :                 } else {
     715           1 :                         transformed = cacheValueOrBuf{v: cache.Alloc(len(tmpTransformed))}
     716           1 :                 }
     717           1 :                 copy(transformed.get(), tmpTransformed)
     718           1 :                 decompressed.release()
     719           1 :                 decompressed = transformed
     720             :         }
     721             : 
     722           1 :         if stats != nil {
     723           1 :                 stats.BlockBytes += bh.Length
     724           1 :         }
     725           1 :         if iterStats != nil {
     726           1 :                 iterStats.reportStats(bh.Length, 0, readDuration)
     727           1 :         }
     728           1 :         if decompressed.buf.Valid() {
     729           1 :                 return bufferHandle{b: decompressed.buf}, nil
     730           1 :         }
     731           1 :         h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v)
     732           1 :         return bufferHandle{h: h}, nil
     733             : }
     734             : 
     735           1 : func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) {
     736           1 :         // Convert v1 (RocksDB format) range-del blocks to v2 blocks on the fly. The
     737           1 :         // v1 format range-del blocks have unfragmented and unsorted range
     738           1 :         // tombstones. We need properly fragmented and sorted range tombstones in
     739           1 :         // order to serve from them directly.
     740           1 :         iter := &blockIter{}
     741           1 :         if err := iter.init(r.Compare, r.Split, b, NoTransforms); err != nil {
     742           0 :                 return nil, err
     743           0 :         }
     744           1 :         var tombstones []keyspan.Span
     745           1 :         for key, value := iter.First(); key != nil; key, value = iter.Next() {
     746           1 :                 t := keyspan.Span{
     747           1 :                         Start: key.UserKey,
     748           1 :                         End:   value.InPlaceValue(),
     749           1 :                         Keys:  []keyspan.Key{{Trailer: key.Trailer}},
     750           1 :                 }
     751           1 :                 tombstones = append(tombstones, t)
     752           1 :         }
     753           1 :         keyspan.Sort(r.Compare, tombstones)
     754           1 : 
     755           1 :         // Fragment the tombstones, outputting them directly to a block writer.
     756           1 :         rangeDelBlock := blockWriter{
     757           1 :                 restartInterval: 1,
     758           1 :         }
     759           1 :         frag := keyspan.Fragmenter{
     760           1 :                 Cmp:    r.Compare,
     761           1 :                 Format: r.FormatKey,
     762           1 :                 Emit: func(s keyspan.Span) {
     763           1 :                         for _, k := range s.Keys {
     764           1 :                                 startIK := InternalKey{UserKey: s.Start, Trailer: k.Trailer}
     765           1 :                                 rangeDelBlock.add(startIK, s.End)
     766           1 :                         }
     767             :                 },
     768             :         }
     769           1 :         for i := range tombstones {
     770           1 :                 frag.Add(tombstones[i])
     771           1 :         }
     772           1 :         frag.Finish()
     773           1 : 
     774           1 :         // Return the contents of the constructed v2 format range-del block.
     775           1 :         return rangeDelBlock.finish(), nil
     776             : }
     777             : 
     778           1 : func (r *Reader) readMetaindex(metaindexBH BlockHandle) error {
     779           1 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     780           1 :         // populating the block cache with these blocks. In heavy-write workloads,
     781           1 :         // especially with high compaction concurrency, new tables may be created
     782           1 :         // frequently. Populating the block cache with these metaindex blocks adds
     783           1 :         // additional contention on the block cache mutexes (see #1997).
     784           1 :         // Additionally, these blocks are exceedingly unlikely to be read again
     785           1 :         // while they're still in the block cache except in misconfigurations with
     786           1 :         // excessive sstables counts or a table cache that's far too small.
     787           1 :         r.metaBufferPool.initPreallocated(r.metaBufferPoolAlloc[:0])
     788           1 :         // When we're finished, release the buffers we've allocated back to memory
     789           1 :         // allocator. We don't expect to use metaBufferPool again.
     790           1 :         defer r.metaBufferPool.Release()
     791           1 : 
     792           1 :         b, err := r.readBlock(
     793           1 :                 context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil, /* stats */
     794           1 :                 nil /* iterStats */, &r.metaBufferPool)
     795           1 :         if err != nil {
     796           1 :                 return err
     797           1 :         }
     798           1 :         data := b.Get()
     799           1 :         defer b.Release()
     800           1 : 
     801           1 :         if uint64(len(data)) != metaindexBH.Length {
     802           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     803           0 :                         errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
     804           0 :         }
     805             : 
     806           1 :         i, err := newRawBlockIter(bytes.Compare, data)
     807           1 :         if err != nil {
     808           0 :                 return err
     809           0 :         }
     810             : 
     811           1 :         meta := map[string]BlockHandle{}
     812           1 :         for valid := i.First(); valid; valid = i.Next() {
     813           1 :                 value := i.Value()
     814           1 :                 if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
     815           1 :                         vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
     816           1 :                         if err != nil {
     817           0 :                                 return err
     818           0 :                         }
     819           1 :                         if n == 0 || n != len(value) {
     820           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
     821           0 :                         }
     822           1 :                         r.valueBIH = vbih
     823           1 :                 } else {
     824           1 :                         bh, n := decodeBlockHandle(value)
     825           1 :                         if n == 0 || n != len(value) {
     826           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
     827           0 :                         }
     828           1 :                         meta[string(i.Key().UserKey)] = bh
     829             :                 }
     830             :         }
     831           1 :         if err := i.Close(); err != nil {
     832           0 :                 return err
     833           0 :         }
     834             : 
     835           1 :         if bh, ok := meta[metaPropertiesName]; ok {
     836           1 :                 b, err = r.readBlock(
     837           1 :                         context.Background(), bh, nil /* transform */, nil /* readHandle */, nil, /* stats */
     838           1 :                         nil /* iterStats */, nil /* buffer pool */)
     839           1 :                 if err != nil {
     840           1 :                         return err
     841           1 :                 }
     842           1 :                 r.propertiesBH = bh
     843           1 :                 err := r.Properties.load(b.Get(), bh.Offset, r.opts.DeniedUserProperties)
     844           1 :                 b.Release()
     845           1 :                 if err != nil {
     846           0 :                         return err
     847           0 :                 }
     848             :         }
     849             : 
     850           1 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     851           1 :                 r.rangeDelBH = bh
     852           1 :         } else if bh, ok := meta[metaRangeDelName]; ok {
     853           1 :                 r.rangeDelBH = bh
     854           1 :                 if !r.rawTombstones {
     855           1 :                         r.rangeDelTransform = r.transformRangeDelV1
     856           1 :                 }
     857             :         }
     858             : 
     859           1 :         if bh, ok := meta[metaRangeKeyName]; ok {
     860           1 :                 r.rangeKeyBH = bh
     861           1 :         }
     862             : 
     863           1 :         for name, fp := range r.opts.Filters {
     864           1 :                 types := []struct {
     865           1 :                         ftype  FilterType
     866           1 :                         prefix string
     867           1 :                 }{
     868           1 :                         {TableFilter, "fullfilter."},
     869           1 :                 }
     870           1 :                 var done bool
     871           1 :                 for _, t := range types {
     872           1 :                         if bh, ok := meta[t.prefix+name]; ok {
     873           1 :                                 r.filterBH = bh
     874           1 : 
     875           1 :                                 switch t.ftype {
     876           1 :                                 case TableFilter:
     877           1 :                                         r.tableFilter = newTableFilterReader(fp)
     878           0 :                                 default:
     879           0 :                                         return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
     880             :                                 }
     881             : 
     882           1 :                                 done = true
     883           1 :                                 break
     884             :                         }
     885             :                 }
     886           1 :                 if done {
     887           1 :                         break
     888             :                 }
     889             :         }
     890           1 :         return nil
     891             : }
     892             : 
     893             : // Layout returns the layout (block organization) for an sstable.
     894           1 : func (r *Reader) Layout() (*Layout, error) {
     895           1 :         if r.err != nil {
     896           0 :                 return nil, r.err
     897           0 :         }
     898             : 
     899           1 :         l := &Layout{
     900           1 :                 Data:       make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
     901           1 :                 Filter:     r.filterBH,
     902           1 :                 RangeDel:   r.rangeDelBH,
     903           1 :                 RangeKey:   r.rangeKeyBH,
     904           1 :                 ValueIndex: r.valueBIH.h,
     905           1 :                 Properties: r.propertiesBH,
     906           1 :                 MetaIndex:  r.metaIndexBH,
     907           1 :                 Footer:     r.footerBH,
     908           1 :                 Format:     r.tableFormat,
     909           1 :         }
     910           1 : 
     911           1 :         indexH, err := r.readIndex(context.Background(), nil, nil)
     912           1 :         if err != nil {
     913           1 :                 return nil, err
     914           1 :         }
     915           1 :         defer indexH.Release()
     916           1 : 
     917           1 :         var alloc bytealloc.A
     918           1 : 
     919           1 :         if r.Properties.IndexPartitions == 0 {
     920           1 :                 l.Index = append(l.Index, r.indexBH)
     921           1 :                 iter, _ := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     922           1 :                 for key, value := iter.First(); key != nil; key, value = iter.Next() {
     923           1 :                         dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
     924           1 :                         if err != nil {
     925           0 :                                 return nil, errCorruptIndexEntry(err)
     926           0 :                         }
     927           1 :                         if len(dataBH.Props) > 0 {
     928           1 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     929           1 :                         }
     930           1 :                         l.Data = append(l.Data, dataBH)
     931             :                 }
     932           1 :         } else {
     933           1 :                 l.TopIndex = r.indexBH
     934           1 :                 topIter, _ := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     935           1 :                 iter := &blockIter{}
     936           1 :                 for key, value := topIter.First(); key != nil; key, value = topIter.Next() {
     937           1 :                         indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
     938           1 :                         if err != nil {
     939           0 :                                 return nil, errCorruptIndexEntry(err)
     940           0 :                         }
     941           1 :                         l.Index = append(l.Index, indexBH.BlockHandle)
     942           1 : 
     943           1 :                         subIndex, err := r.readBlock(context.Background(), indexBH.BlockHandle,
     944           1 :                                 nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     945           1 :                         if err != nil {
     946           1 :                                 return nil, err
     947           1 :                         }
     948             :                         // TODO(msbutler): figure out how to pass virtualState to layout call.
     949           1 :                         if err := iter.init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
     950           0 :                                 return nil, err
     951           0 :                         }
     952           1 :                         for key, value := iter.First(); key != nil; key, value = iter.Next() {
     953           1 :                                 dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
     954           1 :                                 if len(dataBH.Props) > 0 {
     955           1 :                                         alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     956           1 :                                 }
     957           1 :                                 if err != nil {
     958           0 :                                         return nil, errCorruptIndexEntry(err)
     959           0 :                                 }
     960           1 :                                 l.Data = append(l.Data, dataBH)
     961             :                         }
     962           1 :                         subIndex.Release()
     963           1 :                         *iter = iter.resetForReuse()
     964             :                 }
     965             :         }
     966           1 :         if r.valueBIH.h.Length != 0 {
     967           1 :                 vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
     968           1 :                 if err != nil {
     969           0 :                         return nil, err
     970           0 :                 }
     971           1 :                 defer vbiH.Release()
     972           1 :                 vbiBlock := vbiH.Get()
     973           1 :                 indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
     974           1 :                         r.valueBIH.blockLengthByteLength)
     975           1 :                 i := 0
     976           1 :                 for len(vbiBlock) != 0 {
     977           1 :                         if len(vbiBlock) < indexEntryLen {
     978           0 :                                 return nil, errors.Errorf(
     979           0 :                                         "remaining value index block %d does not contain a full entry of length %d",
     980           0 :                                         len(vbiBlock), indexEntryLen)
     981           0 :                         }
     982           1 :                         n := int(r.valueBIH.blockNumByteLength)
     983           1 :                         bn := int(littleEndianGet(vbiBlock, n))
     984           1 :                         if bn != i {
     985           0 :                                 return nil, errors.Errorf("unexpected block num %d, expected %d",
     986           0 :                                         bn, i)
     987           0 :                         }
     988           1 :                         i++
     989           1 :                         vbiBlock = vbiBlock[n:]
     990           1 :                         n = int(r.valueBIH.blockOffsetByteLength)
     991           1 :                         blockOffset := littleEndianGet(vbiBlock, n)
     992           1 :                         vbiBlock = vbiBlock[n:]
     993           1 :                         n = int(r.valueBIH.blockLengthByteLength)
     994           1 :                         blockLen := littleEndianGet(vbiBlock, n)
     995           1 :                         vbiBlock = vbiBlock[n:]
     996           1 :                         l.ValueBlock = append(l.ValueBlock, BlockHandle{Offset: blockOffset, Length: blockLen})
     997             :                 }
     998             :         }
     999             : 
    1000           1 :         return l, nil
    1001             : }
    1002             : 
    1003             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
    1004           1 : func (r *Reader) ValidateBlockChecksums() error {
    1005           1 :         // Pre-compute the BlockHandles for the underlying file.
    1006           1 :         l, err := r.Layout()
    1007           1 :         if err != nil {
    1008           1 :                 return err
    1009           1 :         }
    1010             : 
    1011             :         // Construct the set of blocks to check. Note that the footer is not checked
    1012             :         // as it is not a block with a checksum.
    1013           1 :         blocks := make([]BlockHandle, len(l.Data))
    1014           1 :         for i := range l.Data {
    1015           1 :                 blocks[i] = l.Data[i].BlockHandle
    1016           1 :         }
    1017           1 :         blocks = append(blocks, l.Index...)
    1018           1 :         blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
    1019           1 : 
    1020           1 :         // Sorting by offset ensures we are performing a sequential scan of the
    1021           1 :         // file.
    1022           1 :         slices.SortFunc(blocks, func(a, b BlockHandle) int {
    1023           1 :                 return cmp.Compare(a.Offset, b.Offset)
    1024           1 :         })
    1025             : 
    1026             :         // Check all blocks sequentially. Make use of read-ahead, given we are
    1027             :         // scanning the entire file from start to end.
    1028           1 :         rh := r.readable.NewReadHandle(context.TODO())
    1029           1 :         defer rh.Close()
    1030           1 : 
    1031           1 :         for _, bh := range blocks {
    1032           1 :                 // Certain blocks may not be present, in which case we skip them.
    1033           1 :                 if bh.Length == 0 {
    1034           1 :                         continue
    1035             :                 }
    1036             : 
    1037             :                 // Read the block, which validates the checksum.
    1038           1 :                 h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
    1039           1 :                 if err != nil {
    1040           1 :                         return err
    1041           1 :                 }
    1042           1 :                 h.Release()
    1043             :         }
    1044             : 
    1045           1 :         return nil
    1046             : }
    1047             : 
    1048             : // CommonProperties implemented the CommonReader interface.
    1049           1 : func (r *Reader) CommonProperties() *CommonProperties {
    1050           1 :         return &r.Properties.CommonProperties
    1051           1 : }
    1052             : 
    1053             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
    1054             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
    1055             : // determine overlap due to abbreviated index keys, the full data block size is
    1056             : // included in the estimation.
    1057             : //
    1058             : // This function does not account for any metablock space usage. Assumes there
    1059             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
    1060             : // before nor completely after the file's range.
    1061             : //
    1062             : // Only blocks containing point keys are considered. Range deletion and range
    1063             : // key blocks are not considered.
    1064             : //
    1065             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
    1066             : // data blocks overlapped and add that same fraction of the metadata blocks to the
    1067             : // estimate.
    1068           1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
    1069           1 :         if r.err != nil {
    1070           0 :                 return 0, r.err
    1071           0 :         }
    1072             : 
    1073           1 :         indexH, err := r.readIndex(context.Background(), nil, nil)
    1074           1 :         if err != nil {
    1075           1 :                 return 0, err
    1076           1 :         }
    1077           1 :         defer indexH.Release()
    1078           1 : 
    1079           1 :         // Iterators over the bottom-level index blocks containing start and end.
    1080           1 :         // These may be different in case of partitioned index but will both point
    1081           1 :         // to the same blockIter over the single index in the unpartitioned case.
    1082           1 :         var startIdxIter, endIdxIter *blockIter
    1083           1 :         if r.Properties.IndexPartitions == 0 {
    1084           1 :                 iter, err := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
    1085           1 :                 if err != nil {
    1086           0 :                         return 0, err
    1087           0 :                 }
    1088           1 :                 startIdxIter = iter
    1089           1 :                 endIdxIter = iter
    1090           1 :         } else {
    1091           1 :                 topIter, err := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
    1092           1 :                 if err != nil {
    1093           0 :                         return 0, err
    1094           0 :                 }
    1095             : 
    1096           1 :                 key, val := topIter.SeekGE(start, base.SeekGEFlagsNone)
    1097           1 :                 if key == nil {
    1098           0 :                         // The range falls completely after this file, or an error occurred.
    1099           0 :                         return 0, topIter.Error()
    1100           0 :                 }
    1101           1 :                 startIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
    1102           1 :                 if err != nil {
    1103           0 :                         return 0, errCorruptIndexEntry(err)
    1104           0 :                 }
    1105           1 :                 startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.BlockHandle,
    1106           1 :                         nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
    1107           1 :                 if err != nil {
    1108           1 :                         return 0, err
    1109           1 :                 }
    1110           1 :                 defer startIdxBlock.Release()
    1111           1 :                 startIdxIter, err = newBlockIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
    1112           1 :                 if err != nil {
    1113           0 :                         return 0, err
    1114           0 :                 }
    1115             : 
    1116           1 :                 key, val = topIter.SeekGE(end, base.SeekGEFlagsNone)
    1117           1 :                 if key == nil {
    1118           0 :                         if err := topIter.Error(); err != nil {
    1119           0 :                                 return 0, err
    1120           0 :                         }
    1121           1 :                 } else {
    1122           1 :                         endIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
    1123           1 :                         if err != nil {
    1124           0 :                                 return 0, errCorruptIndexEntry(err)
    1125           0 :                         }
    1126           1 :                         endIdxBlock, err := r.readBlock(context.Background(),
    1127           1 :                                 endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
    1128           1 :                         if err != nil {
    1129           1 :                                 return 0, err
    1130           1 :                         }
    1131           1 :                         defer endIdxBlock.Release()
    1132           1 :                         endIdxIter, err = newBlockIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
    1133           1 :                         if err != nil {
    1134           0 :                                 return 0, err
    1135           0 :                         }
    1136             :                 }
    1137             :         }
    1138             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
    1139             :         // range spans past the end of the file.
    1140             : 
    1141           1 :         key, val := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
    1142           1 :         if key == nil {
    1143           1 :                 // The range falls completely after this file, or an error occurred.
    1144           1 :                 return 0, startIdxIter.Error()
    1145           1 :         }
    1146           1 :         startBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
    1147           1 :         if err != nil {
    1148           0 :                 return 0, errCorruptIndexEntry(err)
    1149           0 :         }
    1150             : 
    1151           1 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
    1152           1 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
    1153           1 :                 // Linearly interpolate what is stored in value blocks.
    1154           1 :                 //
    1155           1 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
    1156           1 :                 // (which contain the value handles, and which may also be insufficient if
    1157           1 :                 // the values are in separate files), we will need to accumulate the
    1158           1 :                 // logical size of the key-value pairs and store the cumulative value for
    1159           1 :                 // each data block in the index block entry. This increases the size of
    1160           1 :                 // the BlockHandle, so wait until this becomes necessary.
    1161           1 :                 return dataBlockSize +
    1162           1 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
    1163           1 :                                 float64(r.Properties.ValueBlocksSize))
    1164           1 :         }
    1165           1 :         if endIdxIter == nil {
    1166           0 :                 // The range spans beyond this file. Include data blocks through the last.
    1167           0 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1168           0 :         }
    1169           1 :         key, val = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
    1170           1 :         if key == nil {
    1171           1 :                 if err := endIdxIter.Error(); err != nil {
    1172           0 :                         return 0, err
    1173           0 :                 }
    1174             :                 // The range spans beyond this file. Include data blocks through the last.
    1175           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1176             :         }
    1177           1 :         endBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
    1178           1 :         if err != nil {
    1179           0 :                 return 0, errCorruptIndexEntry(err)
    1180           0 :         }
    1181           1 :         return includeInterpolatedValueBlocksSize(
    1182           1 :                 endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset), nil
    1183             : }
    1184             : 
    1185             : // TableFormat returns the format version for the table.
    1186           1 : func (r *Reader) TableFormat() (TableFormat, error) {
    1187           1 :         if r.err != nil {
    1188           0 :                 return TableFormatUnspecified, r.err
    1189           0 :         }
    1190           1 :         return r.tableFormat, nil
    1191             : }
    1192             : 
    1193             : // NewReader returns a new table reader for the file. Closing the reader will
    1194             : // close the file.
    1195           1 : func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
    1196           1 :         o = o.ensureDefaults()
    1197           1 :         r := &Reader{
    1198           1 :                 readable: f,
    1199           1 :                 opts:     o,
    1200           1 :         }
    1201           1 :         if r.opts.Cache == nil {
    1202           1 :                 r.opts.Cache = cache.New(0)
    1203           1 :         } else {
    1204           1 :                 r.opts.Cache.Ref()
    1205           1 :         }
    1206             : 
    1207           1 :         if f == nil {
    1208           1 :                 r.err = errors.New("pebble/table: nil file")
    1209           1 :                 return nil, r.Close()
    1210           1 :         }
    1211             : 
    1212             :         // Note that the extra options are applied twice. First here for pre-apply
    1213             :         // options, and then below for post-apply options. Pre and post refer to
    1214             :         // before and after reading the metaindex and properties.
    1215           1 :         type preApply interface{ preApply() }
    1216           1 :         for _, opt := range extraOpts {
    1217           1 :                 if _, ok := opt.(preApply); ok {
    1218           1 :                         opt.readerApply(r)
    1219           1 :                 }
    1220             :         }
    1221           1 :         if r.cacheID == 0 {
    1222           1 :                 r.cacheID = r.opts.Cache.NewID()
    1223           1 :         }
    1224             : 
    1225           1 :         footer, err := readFooter(f)
    1226           1 :         if err != nil {
    1227           1 :                 r.err = err
    1228           1 :                 return nil, r.Close()
    1229           1 :         }
    1230           1 :         r.checksumType = footer.checksum
    1231           1 :         r.tableFormat = footer.format
    1232           1 :         // Read the metaindex.
    1233           1 :         if err := r.readMetaindex(footer.metaindexBH); err != nil {
    1234           1 :                 r.err = err
    1235           1 :                 return nil, r.Close()
    1236           1 :         }
    1237           1 :         r.indexBH = footer.indexBH
    1238           1 :         r.metaIndexBH = footer.metaindexBH
    1239           1 :         r.footerBH = footer.footerBH
    1240           1 : 
    1241           1 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
    1242           1 :                 r.Compare = o.Comparer.Compare
    1243           1 :                 r.Equal = o.Comparer.Equal
    1244           1 :                 r.FormatKey = o.Comparer.FormatKey
    1245           1 :                 r.Split = o.Comparer.Split
    1246           1 :         }
    1247             : 
    1248           1 :         if o.MergerName == r.Properties.MergerName {
    1249           1 :                 r.mergerOK = true
    1250           1 :         }
    1251             : 
    1252             :         // Apply the extra options again now that the comparer and merger names are
    1253             :         // known.
    1254           1 :         for _, opt := range extraOpts {
    1255           1 :                 if _, ok := opt.(preApply); !ok {
    1256           1 :                         opt.readerApply(r)
    1257           1 :                 }
    1258             :         }
    1259             : 
    1260           1 :         if r.Compare == nil {
    1261           1 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
    1262           1 :                         errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName))
    1263           1 :         }
    1264           1 :         if !r.mergerOK {
    1265           1 :                 if name := r.Properties.MergerName; name != "" && name != "nullptr" {
    1266           1 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
    1267           1 :                                 errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName))
    1268           1 :                 }
    1269             :         }
    1270           1 :         if r.err != nil {
    1271           1 :                 return nil, r.Close()
    1272           1 :         }
    1273             : 
    1274           1 :         return r, nil
    1275             : }
    1276             : 
    1277             : // ReadableFile describes the smallest subset of vfs.File that is required for
    1278             : // reading SSTs.
    1279             : type ReadableFile interface {
    1280             :         io.ReaderAt
    1281             :         io.Closer
    1282             :         Stat() (os.FileInfo, error)
    1283             : }
    1284             : 
    1285             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
    1286             : // implementation (which does not support read-ahead)
    1287           1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
    1288           1 :         info, err := r.Stat()
    1289           1 :         if err != nil {
    1290           1 :                 return nil, err
    1291           1 :         }
    1292           1 :         res := &simpleReadable{
    1293           1 :                 f:    r,
    1294           1 :                 size: info.Size(),
    1295           1 :         }
    1296           1 :         res.rh = objstorage.MakeNoopReadHandle(res)
    1297           1 :         return res, nil
    1298             : }
    1299             : 
    1300             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
    1301             : type simpleReadable struct {
    1302             :         f    ReadableFile
    1303             :         size int64
    1304             :         rh   objstorage.NoopReadHandle
    1305             : }
    1306             : 
    1307             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1308             : 
    1309             : // ReadAt is part of the objstorage.Readable interface.
    1310           1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1311           1 :         n, err := s.f.ReadAt(p, off)
    1312           1 :         if invariants.Enabled && err == nil && n != len(p) {
    1313           0 :                 panic("short read")
    1314             :         }
    1315           1 :         return err
    1316             : }
    1317             : 
    1318             : // Close is part of the objstorage.Readable interface.
    1319           1 : func (s *simpleReadable) Close() error {
    1320           1 :         return s.f.Close()
    1321           1 : }
    1322             : 
    1323             : // Size is part of the objstorage.Readable interface.
    1324           1 : func (s *simpleReadable) Size() int64 {
    1325           1 :         return s.size
    1326           1 : }
    1327             : 
    1328             : // NewReaddHandle is part of the objstorage.Readable interface.
    1329           1 : func (s *simpleReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
    1330           1 :         return &s.rh
    1331           1 : }
    1332             : 
    1333           0 : func errCorruptIndexEntry(err error) error {
    1334           0 :         err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
    1335           0 :         if invariants.Enabled {
    1336           0 :                 panic(err)
    1337             :         }
    1338           0 :         return err
    1339             : }

Generated by: LCOV version 1.14