LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2023-09-18 08:17Z be158640 - tests + meta.lcov Lines: 700 791 88.5 %
Date: 2023-09-18 08:18:25 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "io"
      12             :         "os"
      13             :         "sort"
      14             :         "time"
      15             : 
      16             :         "github.com/cespare/xxhash/v2"
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      20             :         "github.com/cockroachdb/pebble/internal/cache"
      21             :         "github.com/cockroachdb/pebble/internal/crc"
      22             :         "github.com/cockroachdb/pebble/internal/invariants"
      23             :         "github.com/cockroachdb/pebble/internal/keyspan"
      24             :         "github.com/cockroachdb/pebble/internal/private"
      25             :         "github.com/cockroachdb/pebble/objstorage"
      26             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      27             : )
      28             : 
      29             : var errCorruptIndexEntry = base.CorruptionErrorf("pebble/table: corrupt index entry")
      30             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      31             : 
      32             : // decodeBlockHandle returns the block handle encoded at the start of src, as
      33             : // well as the number of bytes it occupies. It returns zero if given invalid
      34             : // input. A block handle for a data block or a first/lower level index block
      35             : // should not be decoded using decodeBlockHandle since the caller may validate
      36             : // that the number of bytes decoded is equal to the length of src, which will
      37             : // be false if the properties are not decoded. In those cases the caller
      38             : // should use decodeBlockHandleWithProperties.
      39           2 : func decodeBlockHandle(src []byte) (BlockHandle, int) {
      40           2 :         offset, n := binary.Uvarint(src)
      41           2 :         length, m := binary.Uvarint(src[n:])
      42           2 :         if n == 0 || m == 0 {
      43           0 :                 return BlockHandle{}, 0
      44           0 :         }
      45           2 :         return BlockHandle{offset, length}, n + m
      46             : }
      47             : 
      48             : // decodeBlockHandleWithProperties returns the block handle and properties
      49             : // encoded in src. src needs to be exactly the length that was encoded. This
      50             : // method must be used for data block and first/lower level index blocks. The
      51             : // properties in the block handle point to the bytes in src.
      52           2 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
      53           2 :         bh, n := decodeBlockHandle(src)
      54           2 :         if n == 0 {
      55           0 :                 return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
      56           0 :         }
      57           2 :         return BlockHandleWithProperties{
      58           2 :                 BlockHandle: bh,
      59           2 :                 Props:       src[n:],
      60           2 :         }, nil
      61             : }
      62             : 
      63           2 : func encodeBlockHandle(dst []byte, b BlockHandle) int {
      64           2 :         n := binary.PutUvarint(dst, b.Offset)
      65           2 :         m := binary.PutUvarint(dst[n:], b.Length)
      66           2 :         return n + m
      67           2 : }
      68             : 
      69           2 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
      70           2 :         n := encodeBlockHandle(dst, b.BlockHandle)
      71           2 :         dst = append(dst[:n], b.Props...)
      72           2 :         return dst
      73           2 : }
      74             : 
      75             : // block is a []byte that holds a sequence of key/value pairs plus an index
      76             : // over those pairs.
      77             : type block []byte
      78             : 
      79             : type loadBlockResult int8
      80             : 
      81             : const (
      82             :         loadBlockOK loadBlockResult = iota
      83             :         // Could be due to error or because no block left to load.
      84             :         loadBlockFailed
      85             :         loadBlockIrrelevant
      86             : )
      87             : 
      88             : type blockTransform func([]byte) ([]byte, error)
      89             : 
      90             : // ReaderOption provide an interface to do work on Reader while it is being
      91             : // opened.
      92             : type ReaderOption interface {
      93             :         // readerApply is called on the reader during opening in order to set internal
      94             :         // parameters.
      95             :         readerApply(*Reader)
      96             : }
      97             : 
      98             : // Comparers is a map from comparer name to comparer. It is used for debugging
      99             : // tools which may be used on multiple databases configured with different
     100             : // comparers. Comparers implements the OpenOption interface and can be passed
     101             : // as a parameter to NewReader.
     102             : type Comparers map[string]*Comparer
     103             : 
     104           1 : func (c Comparers) readerApply(r *Reader) {
     105           1 :         if r.Compare != nil || r.Properties.ComparerName == "" {
     106           1 :                 return
     107           1 :         }
     108           1 :         if comparer, ok := c[r.Properties.ComparerName]; ok {
     109           1 :                 r.Compare = comparer.Compare
     110           1 :                 r.FormatKey = comparer.FormatKey
     111           1 :                 r.Split = comparer.Split
     112           1 :         }
     113             : }
     114             : 
     115             : // Mergers is a map from merger name to merger. It is used for debugging tools
     116             : // which may be used on multiple databases configured with different
     117             : // mergers. Mergers implements the OpenOption interface and can be passed as
     118             : // a parameter to NewReader.
     119             : type Mergers map[string]*Merger
     120             : 
     121           1 : func (m Mergers) readerApply(r *Reader) {
     122           1 :         if r.mergerOK || r.Properties.MergerName == "" {
     123           1 :                 return
     124           1 :         }
     125           1 :         _, r.mergerOK = m[r.Properties.MergerName]
     126             : }
     127             : 
     128             : // cacheOpts is a Reader open option for specifying the cache ID and sstable file
     129             : // number. If not specified, a unique cache ID will be used.
     130             : type cacheOpts struct {
     131             :         cacheID uint64
     132             :         fileNum base.DiskFileNum
     133             : }
     134             : 
     135             : // Marker function to indicate the option should be applied before reading the
     136             : // sstable properties and, in the write path, before writing the default
     137             : // sstable properties.
     138           0 : func (c *cacheOpts) preApply() {}
     139             : 
     140           2 : func (c *cacheOpts) readerApply(r *Reader) {
     141           2 :         if r.cacheID == 0 {
     142           2 :                 r.cacheID = c.cacheID
     143           2 :         }
     144           2 :         if r.fileNum.FileNum() == 0 {
     145           2 :                 r.fileNum = c.fileNum
     146           2 :         }
     147             : }
     148             : 
     149           2 : func (c *cacheOpts) writerApply(w *Writer) {
     150           2 :         if w.cacheID == 0 {
     151           2 :                 w.cacheID = c.cacheID
     152           2 :         }
     153           2 :         if w.fileNum.FileNum() == 0 {
     154           2 :                 w.fileNum = c.fileNum
     155           2 :         }
     156             : }
     157             : 
     158             : // rawTombstonesOpt is a Reader open option for specifying that range
     159             : // tombstones returned by Reader.NewRangeDelIter() should not be
     160             : // fragmented. Used by debug tools to get a raw view of the tombstones
     161             : // contained in an sstable.
     162             : type rawTombstonesOpt struct{}
     163             : 
     164           0 : func (rawTombstonesOpt) preApply() {}
     165             : 
     166           1 : func (rawTombstonesOpt) readerApply(r *Reader) {
     167           1 :         r.rawTombstones = true
     168           1 : }
     169             : 
     170           2 : func init() {
     171           2 :         private.SSTableCacheOpts = func(cacheID uint64, fileNum base.DiskFileNum) interface{} {
     172           2 :                 return &cacheOpts{cacheID, fileNum}
     173           2 :         }
     174           2 :         private.SSTableRawTombstonesOpt = rawTombstonesOpt{}
     175             : }
     176             : 
     177             : // Reader is a table reader.
     178             : type Reader struct {
     179             :         readable          objstorage.Readable
     180             :         cacheID           uint64
     181             :         fileNum           base.DiskFileNum
     182             :         err               error
     183             :         indexBH           BlockHandle
     184             :         filterBH          BlockHandle
     185             :         rangeDelBH        BlockHandle
     186             :         rangeKeyBH        BlockHandle
     187             :         rangeDelTransform blockTransform
     188             :         valueBIH          valueBlocksIndexHandle
     189             :         propertiesBH      BlockHandle
     190             :         metaIndexBH       BlockHandle
     191             :         footerBH          BlockHandle
     192             :         opts              ReaderOptions
     193             :         Compare           Compare
     194             :         FormatKey         base.FormatKey
     195             :         Split             Split
     196             :         tableFilter       *tableFilterReader
     197             :         // Keep types that are not multiples of 8 bytes at the end and with
     198             :         // decreasing size.
     199             :         Properties    Properties
     200             :         tableFormat   TableFormat
     201             :         rawTombstones bool
     202             :         mergerOK      bool
     203             :         checksumType  ChecksumType
     204             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
     205             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
     206             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
     207             :         // capacity 3 to accommodate the meta block (1), and both the compressed
     208             :         // properties block (1) and decompressed properties block (1)
     209             :         // simultaneously.
     210             :         metaBufferPool      BufferPool
     211             :         metaBufferPoolAlloc [3]allocedBuffer
     212             : }
     213             : 
     214             : // Close implements DB.Close, as documented in the pebble package.
     215           2 : func (r *Reader) Close() error {
     216           2 :         r.opts.Cache.Unref()
     217           2 : 
     218           2 :         if r.readable != nil {
     219           2 :                 r.err = firstError(r.err, r.readable.Close())
     220           2 :                 r.readable = nil
     221           2 :         }
     222             : 
     223           2 :         if r.err != nil {
     224           1 :                 return r.err
     225           1 :         }
     226             :         // Make any future calls to Get, NewIter or Close return an error.
     227           2 :         r.err = errReaderClosed
     228           2 :         return nil
     229             : }
     230             : 
     231             : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
     232             : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
     233             : // itself and returns a nil iterator.
     234             : func (r *Reader) NewIterWithBlockPropertyFilters(
     235             :         lower, upper []byte,
     236             :         filterer *BlockPropertiesFilterer,
     237             :         useFilterBlock bool,
     238             :         stats *base.InternalIteratorStats,
     239             :         rp ReaderProvider,
     240           2 : ) (Iterator, error) {
     241           2 :         return r.newIterWithBlockPropertyFiltersAndContext(
     242           2 :                 context.Background(),
     243           2 :                 lower, upper, filterer, false, useFilterBlock, stats, rp, nil,
     244           2 :         )
     245           2 : }
     246             : 
     247             : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
     248             : // NewIterWithBlockPropertyFilters and additionally accepts a context for
     249             : // tracing.
     250             : //
     251             : // If hideObsoletePoints, the callee assumes that filterer already includes
     252             : // obsoleteKeyBlockPropertyFilter. The caller can satisfy this contract by
     253             : // first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     254             : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
     255             :         ctx context.Context,
     256             :         lower, upper []byte,
     257             :         filterer *BlockPropertiesFilterer,
     258             :         hideObsoletePoints, useFilterBlock bool,
     259             :         stats *base.InternalIteratorStats,
     260             :         rp ReaderProvider,
     261           2 : ) (Iterator, error) {
     262           2 :         return r.newIterWithBlockPropertyFiltersAndContext(
     263           2 :                 ctx, lower, upper, filterer, hideObsoletePoints, useFilterBlock, stats, rp, nil,
     264           2 :         )
     265           2 : }
     266             : 
     267             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     268             : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
     269             : // value of hideObsoletePoints and potentially add a block property filter.
     270             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     271             :         snapshotForHideObsoletePoints uint64,
     272             :         fileLargestSeqNum uint64,
     273             :         pointKeyFilters []BlockPropertyFilter,
     274           2 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     275           2 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     276           2 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     277           2 :         if hideObsoletePoints {
     278           2 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     279           2 :         }
     280           2 :         return hideObsoletePoints, pointKeyFilters
     281             : }
     282             : 
     283             : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
     284             :         ctx context.Context,
     285             :         lower, upper []byte,
     286             :         filterer *BlockPropertiesFilterer,
     287             :         hideObsoletePoints bool,
     288             :         useFilterBlock bool,
     289             :         stats *base.InternalIteratorStats,
     290             :         rp ReaderProvider,
     291             :         v *virtualState,
     292           2 : ) (Iterator, error) {
     293           2 :         // NB: pebble.tableCache wraps the returned iterator with one which performs
     294           2 :         // reference counting on the Reader, preventing the Reader from being closed
     295           2 :         // until the final iterator closes.
     296           2 :         if r.Properties.IndexType == twoLevelIndex {
     297           2 :                 i := twoLevelIterPool.Get().(*twoLevelIterator)
     298           2 :                 err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp, nil /* bufferPool */)
     299           2 :                 if err != nil {
     300           1 :                         return nil, err
     301           1 :                 }
     302           2 :                 return i, nil
     303             :         }
     304             : 
     305           2 :         i := singleLevelIterPool.Get().(*singleLevelIterator)
     306           2 :         err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp, nil /* bufferPool */)
     307           2 :         if err != nil {
     308           1 :                 return nil, err
     309           1 :         }
     310           2 :         return i, nil
     311             : }
     312             : 
     313             : // NewIter returns an iterator for the contents of the table. If an error
     314             : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
     315             : // must only be used when the Reader is guaranteed to outlive any LazyValues
     316             : // returned from the iter.
     317           2 : func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) {
     318           2 :         return r.NewIterWithBlockPropertyFilters(
     319           2 :                 lower, upper, nil, true /* useFilterBlock */, nil, /* stats */
     320           2 :                 TrivialReaderProvider{Reader: r})
     321           2 : }
     322             : 
     323             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     324             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     325             : // after itself and returns a nil iterator.
     326             : func (r *Reader) NewCompactionIter(
     327             :         bytesIterated *uint64, rp ReaderProvider, bufferPool *BufferPool,
     328           2 : ) (Iterator, error) {
     329           2 :         return r.newCompactionIter(bytesIterated, rp, nil, bufferPool)
     330           2 : }
     331             : 
     332             : func (r *Reader) newCompactionIter(
     333             :         bytesIterated *uint64, rp ReaderProvider, v *virtualState, bufferPool *BufferPool,
     334           2 : ) (Iterator, error) {
     335           2 :         if r.Properties.IndexType == twoLevelIndex {
     336           2 :                 i := twoLevelIterPool.Get().(*twoLevelIterator)
     337           2 :                 err := i.init(
     338           2 :                         context.Background(),
     339           2 :                         r, v, nil /* lower */, nil /* upper */, nil,
     340           2 :                         false /* useFilter */, false, /* hideObsoletePoints */
     341           2 :                         nil /* stats */, rp, bufferPool,
     342           2 :                 )
     343           2 :                 if err != nil {
     344           0 :                         return nil, err
     345           0 :                 }
     346           2 :                 i.setupForCompaction()
     347           2 :                 return &twoLevelCompactionIterator{
     348           2 :                         twoLevelIterator: i,
     349           2 :                         bytesIterated:    bytesIterated,
     350           2 :                 }, nil
     351             :         }
     352           2 :         i := singleLevelIterPool.Get().(*singleLevelIterator)
     353           2 :         err := i.init(
     354           2 :                 context.Background(), r, v, nil /* lower */, nil, /* upper */
     355           2 :                 nil, false /* useFilter */, false, /* hideObsoletePoints */
     356           2 :                 nil /* stats */, rp, bufferPool,
     357           2 :         )
     358           2 :         if err != nil {
     359           0 :                 return nil, err
     360           0 :         }
     361           2 :         i.setupForCompaction()
     362           2 :         return &compactionIterator{
     363           2 :                 singleLevelIterator: i,
     364           2 :                 bytesIterated:       bytesIterated,
     365           2 :         }, nil
     366             : }
     367             : 
     368             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     369             : // range-del block for the table. Returns nil if the table does not contain
     370             : // any range deletions.
     371             : //
     372             : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
     373             : // iterator. Add WithContext methods since the existing ones are public.
     374           2 : func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {
     375           2 :         if r.rangeDelBH.Length == 0 {
     376           2 :                 return nil, nil
     377           2 :         }
     378           2 :         h, err := r.readRangeDel(nil /* stats */)
     379           2 :         if err != nil {
     380           1 :                 return nil, err
     381           1 :         }
     382           2 :         i := &fragmentBlockIter{elideSameSeqnum: true}
     383           2 :         if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
     384           0 :                 return nil, err
     385           0 :         }
     386           2 :         return i, nil
     387             : }
     388             : 
     389             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     390             : // range-key block for the table. Returns nil if the table does not contain any
     391             : // range keys.
     392             : //
     393             : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
     394             : // iterator. Add WithContext methods since the existing ones are public.
     395           2 : func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
     396           2 :         if r.rangeKeyBH.Length == 0 {
     397           2 :                 return nil, nil
     398           2 :         }
     399           2 :         h, err := r.readRangeKey(nil /* stats */)
     400           2 :         if err != nil {
     401           0 :                 return nil, err
     402           0 :         }
     403           2 :         i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter)
     404           2 :         if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
     405           0 :                 return nil, err
     406           0 :         }
     407           2 :         return i, nil
     408             : }
     409             : 
     410             : type rangeKeyFragmentBlockIter struct {
     411             :         fragmentBlockIter
     412             : }
     413             : 
     414           2 : func (i *rangeKeyFragmentBlockIter) Close() error {
     415           2 :         err := i.fragmentBlockIter.Close()
     416           2 :         i.fragmentBlockIter = i.fragmentBlockIter.resetForReuse()
     417           2 :         rangeKeyFragmentBlockIterPool.Put(i)
     418           2 :         return err
     419           2 : }
     420             : 
     421             : func (r *Reader) readIndex(
     422             :         ctx context.Context, stats *base.InternalIteratorStats,
     423           2 : ) (bufferHandle, error) {
     424           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     425           2 :         return r.readBlock(ctx, r.indexBH, nil, nil, stats, nil /* buffer pool */)
     426           2 : }
     427             : 
     428             : func (r *Reader) readFilter(
     429             :         ctx context.Context, stats *base.InternalIteratorStats,
     430           2 : ) (bufferHandle, error) {
     431           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     432           2 :         return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats, nil /* buffer pool */)
     433           2 : }
     434             : 
     435           2 : func (r *Reader) readRangeDel(stats *base.InternalIteratorStats) (bufferHandle, error) {
     436           2 :         ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
     437           2 :         return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats, nil /* buffer pool */)
     438           2 : }
     439             : 
     440           2 : func (r *Reader) readRangeKey(stats *base.InternalIteratorStats) (bufferHandle, error) {
     441           2 :         ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
     442           2 :         return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, nil /* buffer pool */)
     443           2 : }
     444             : 
     445             : func checkChecksum(
     446             :         checksumType ChecksumType, b []byte, bh BlockHandle, fileNum base.FileNum,
     447           2 : ) error {
     448           2 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     449           2 :         var computedChecksum uint32
     450           2 :         switch checksumType {
     451           2 :         case ChecksumTypeCRC32c:
     452           2 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     453           1 :         case ChecksumTypeXXHash64:
     454           1 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     455           0 :         default:
     456           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     457             :         }
     458             : 
     459           2 :         if expectedChecksum != computedChecksum {
     460           1 :                 return base.CorruptionErrorf(
     461           1 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     462           1 :                         errors.Safe(fileNum), errors.Safe(bh.Offset), errors.Safe(bh.Length))
     463           1 :         }
     464           2 :         return nil
     465             : }
     466             : 
     467             : type cacheValueOrBuf struct {
     468             :         // buf.Valid() returns true if backed by a BufferPool.
     469             :         buf Buf
     470             :         // v is non-nil if backed by the block cache.
     471             :         v *cache.Value
     472             : }
     473             : 
     474           2 : func (b cacheValueOrBuf) get() []byte {
     475           2 :         if b.buf.Valid() {
     476           2 :                 return b.buf.p.pool[b.buf.i].b
     477           2 :         }
     478           2 :         return b.v.Buf()
     479             : }
     480             : 
     481           2 : func (b cacheValueOrBuf) release() {
     482           2 :         if b.buf.Valid() {
     483           2 :                 b.buf.Release()
     484           2 :         } else {
     485           2 :                 cache.Free(b.v)
     486           2 :         }
     487             : }
     488             : 
     489           2 : func (b cacheValueOrBuf) truncate(n int) {
     490           2 :         if b.buf.Valid() {
     491           2 :                 b.buf.p.pool[b.buf.i].b = b.buf.p.pool[b.buf.i].b[:n]
     492           2 :         } else {
     493           2 :                 b.v.Truncate(n)
     494           2 :         }
     495             : }
     496             : 
     497             : func (r *Reader) readBlock(
     498             :         ctx context.Context,
     499             :         bh BlockHandle,
     500             :         transform blockTransform,
     501             :         readHandle objstorage.ReadHandle,
     502             :         stats *base.InternalIteratorStats,
     503             :         bufferPool *BufferPool,
     504           2 : ) (handle bufferHandle, _ error) {
     505           2 :         if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
     506           2 :                 // Cache hit.
     507           2 :                 if readHandle != nil {
     508           2 :                         readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+blockTrailerLen))
     509           2 :                 }
     510           2 :                 if stats != nil {
     511           2 :                         stats.BlockBytes += bh.Length
     512           2 :                         stats.BlockBytesInCache += bh.Length
     513           2 :                 }
     514             :                 // This block is already in the cache; return a handle to existing vlaue
     515             :                 // in the cache.
     516           2 :                 return bufferHandle{h: h}, nil
     517             :         }
     518             : 
     519             :         // Cache miss.
     520           2 :         var compressed cacheValueOrBuf
     521           2 :         if bufferPool != nil {
     522           2 :                 compressed = cacheValueOrBuf{
     523           2 :                         buf: bufferPool.Alloc(int(bh.Length + blockTrailerLen)),
     524           2 :                 }
     525           2 :         } else {
     526           2 :                 compressed = cacheValueOrBuf{
     527           2 :                         v: cache.Alloc(int(bh.Length + blockTrailerLen)),
     528           2 :                 }
     529           2 :         }
     530             : 
     531           2 :         readStartTime := time.Now()
     532           2 :         var err error
     533           2 :         if readHandle != nil {
     534           2 :                 err = readHandle.ReadAt(ctx, compressed.get(), int64(bh.Offset))
     535           2 :         } else {
     536           2 :                 err = r.readable.ReadAt(ctx, compressed.get(), int64(bh.Offset))
     537           2 :         }
     538           2 :         readDuration := time.Since(readStartTime)
     539           2 :         // TODO(sumeer): should the threshold be configurable.
     540           2 :         const slowReadTracingThreshold = 5 * time.Millisecond
     541           2 :         // The invariants.Enabled path is for deterministic testing.
     542           2 :         if invariants.Enabled {
     543           2 :                 readDuration = slowReadTracingThreshold
     544           2 :         }
     545             :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     546             :         // interface{}, unless necessary.
     547           2 :         if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) {
     548           1 :                 r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s",
     549           1 :                         int(bh.Length+blockTrailerLen), readDuration.String())
     550           1 :         }
     551           2 :         if stats != nil {
     552           2 :                 stats.BlockReadDuration += readDuration
     553           2 :         }
     554           2 :         if err != nil {
     555           1 :                 compressed.release()
     556           1 :                 return bufferHandle{}, err
     557           1 :         }
     558           2 :         if err := checkChecksum(r.checksumType, compressed.get(), bh, r.fileNum.FileNum()); err != nil {
     559           1 :                 compressed.release()
     560           1 :                 return bufferHandle{}, err
     561           1 :         }
     562             : 
     563           2 :         typ := blockType(compressed.get()[bh.Length])
     564           2 :         compressed.truncate(int(bh.Length))
     565           2 : 
     566           2 :         var decompressed cacheValueOrBuf
     567           2 :         if typ == noCompressionBlockType {
     568           2 :                 decompressed = compressed
     569           2 :         } else {
     570           2 :                 // Decode the length of the decompressed value.
     571           2 :                 decodedLen, prefixLen, err := decompressedLen(typ, compressed.get())
     572           2 :                 if err != nil {
     573           0 :                         compressed.release()
     574           0 :                         return bufferHandle{}, err
     575           0 :                 }
     576             : 
     577           2 :                 if bufferPool != nil {
     578           2 :                         decompressed = cacheValueOrBuf{buf: bufferPool.Alloc(decodedLen)}
     579           2 :                 } else {
     580           2 :                         decompressed = cacheValueOrBuf{v: cache.Alloc(decodedLen)}
     581           2 :                 }
     582           2 :                 if _, err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil {
     583           0 :                         compressed.release()
     584           0 :                         return bufferHandle{}, err
     585           0 :                 }
     586           2 :                 compressed.release()
     587             :         }
     588             : 
     589           2 :         if transform != nil {
     590           1 :                 // Transforming blocks is very rare, so the extra copy of the
     591           1 :                 // transformed data is not problematic.
     592           1 :                 tmpTransformed, err := transform(decompressed.get())
     593           1 :                 if err != nil {
     594           0 :                         decompressed.release()
     595           0 :                         return bufferHandle{}, err
     596           0 :                 }
     597             : 
     598           1 :                 var transformed cacheValueOrBuf
     599           1 :                 if bufferPool != nil {
     600           0 :                         transformed = cacheValueOrBuf{buf: bufferPool.Alloc(len(tmpTransformed))}
     601           1 :                 } else {
     602           1 :                         transformed = cacheValueOrBuf{v: cache.Alloc(len(tmpTransformed))}
     603           1 :                 }
     604           1 :                 copy(transformed.get(), tmpTransformed)
     605           1 :                 decompressed.release()
     606           1 :                 decompressed = transformed
     607             :         }
     608             : 
     609           2 :         if stats != nil {
     610           2 :                 stats.BlockBytes += bh.Length
     611           2 :         }
     612           2 :         if decompressed.buf.Valid() {
     613           2 :                 return bufferHandle{b: decompressed.buf}, nil
     614           2 :         }
     615           2 :         h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v)
     616           2 :         return bufferHandle{h: h}, nil
     617             : }
     618             : 
     619           1 : func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) {
     620           1 :         // Convert v1 (RocksDB format) range-del blocks to v2 blocks on the fly. The
     621           1 :         // v1 format range-del blocks have unfragmented and unsorted range
     622           1 :         // tombstones. We need properly fragmented and sorted range tombstones in
     623           1 :         // order to serve from them directly.
     624           1 :         iter := &blockIter{}
     625           1 :         if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false); err != nil {
     626           0 :                 return nil, err
     627           0 :         }
     628           1 :         var tombstones []keyspan.Span
     629           1 :         for key, value := iter.First(); key != nil; key, value = iter.Next() {
     630           1 :                 t := keyspan.Span{
     631           1 :                         Start: key.UserKey,
     632           1 :                         End:   value.InPlaceValue(),
     633           1 :                         Keys:  []keyspan.Key{{Trailer: key.Trailer}},
     634           1 :                 }
     635           1 :                 tombstones = append(tombstones, t)
     636           1 :         }
     637           1 :         keyspan.Sort(r.Compare, tombstones)
     638           1 : 
     639           1 :         // Fragment the tombstones, outputting them directly to a block writer.
     640           1 :         rangeDelBlock := blockWriter{
     641           1 :                 restartInterval: 1,
     642           1 :         }
     643           1 :         frag := keyspan.Fragmenter{
     644           1 :                 Cmp:    r.Compare,
     645           1 :                 Format: r.FormatKey,
     646           1 :                 Emit: func(s keyspan.Span) {
     647           1 :                         for _, k := range s.Keys {
     648           1 :                                 startIK := InternalKey{UserKey: s.Start, Trailer: k.Trailer}
     649           1 :                                 rangeDelBlock.add(startIK, s.End)
     650           1 :                         }
     651             :                 },
     652             :         }
     653           1 :         for i := range tombstones {
     654           1 :                 frag.Add(tombstones[i])
     655           1 :         }
     656           1 :         frag.Finish()
     657           1 : 
     658           1 :         // Return the contents of the constructed v2 format range-del block.
     659           1 :         return rangeDelBlock.finish(), nil
     660             : }
     661             : 
     662           2 : func (r *Reader) readMetaindex(metaindexBH BlockHandle) error {
     663           2 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     664           2 :         // populating the block cache with these blocks. In heavy-write workloads,
     665           2 :         // especially with high compaction concurrency, new tables may be created
     666           2 :         // frequently. Populating the block cache with these metaindex blocks adds
     667           2 :         // additional contention on the block cache mutexes (see #1997).
     668           2 :         // Additionally, these blocks are exceedingly unlikely to be read again
     669           2 :         // while they're still in the block cache except in misconfigurations with
     670           2 :         // excessive sstables counts or a table cache that's far too small.
     671           2 :         r.metaBufferPool.initPreallocated(r.metaBufferPoolAlloc[:0])
     672           2 :         // When we're finished, release the buffers we've allocated back to memory
     673           2 :         // allocator. We don't expect to use metaBufferPool again.
     674           2 :         defer r.metaBufferPool.Release()
     675           2 : 
     676           2 :         b, err := r.readBlock(
     677           2 :                 context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil /* stats */, &r.metaBufferPool)
     678           2 :         if err != nil {
     679           1 :                 return err
     680           1 :         }
     681           2 :         data := b.Get()
     682           2 :         defer b.Release()
     683           2 : 
     684           2 :         if uint64(len(data)) != metaindexBH.Length {
     685           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     686           0 :                         errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
     687           0 :         }
     688             : 
     689           2 :         i, err := newRawBlockIter(bytes.Compare, data)
     690           2 :         if err != nil {
     691           0 :                 return err
     692           0 :         }
     693             : 
     694           2 :         meta := map[string]BlockHandle{}
     695           2 :         for valid := i.First(); valid; valid = i.Next() {
     696           2 :                 value := i.Value()
     697           2 :                 if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
     698           2 :                         vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
     699           2 :                         if err != nil {
     700           0 :                                 return err
     701           0 :                         }
     702           2 :                         if n == 0 || n != len(value) {
     703           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
     704           0 :                         }
     705           2 :                         r.valueBIH = vbih
     706           2 :                 } else {
     707           2 :                         bh, n := decodeBlockHandle(value)
     708           2 :                         if n == 0 || n != len(value) {
     709           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
     710           0 :                         }
     711           2 :                         meta[string(i.Key().UserKey)] = bh
     712             :                 }
     713             :         }
     714           2 :         if err := i.Close(); err != nil {
     715           0 :                 return err
     716           0 :         }
     717             : 
     718           2 :         if bh, ok := meta[metaPropertiesName]; ok {
     719           2 :                 b, err = r.readBlock(
     720           2 :                         context.Background(), bh, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */)
     721           2 :                 if err != nil {
     722           1 :                         return err
     723           1 :                 }
     724           2 :                 r.propertiesBH = bh
     725           2 :                 err := r.Properties.load(b.Get(), bh.Offset, r.opts.DeniedUserProperties)
     726           2 :                 b.Release()
     727           2 :                 if err != nil {
     728           0 :                         return err
     729           0 :                 }
     730             :         }
     731             : 
     732           2 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     733           2 :                 r.rangeDelBH = bh
     734           2 :         } else if bh, ok := meta[metaRangeDelName]; ok {
     735           1 :                 r.rangeDelBH = bh
     736           1 :                 if !r.rawTombstones {
     737           1 :                         r.rangeDelTransform = r.transformRangeDelV1
     738           1 :                 }
     739             :         }
     740             : 
     741           2 :         if bh, ok := meta[metaRangeKeyName]; ok {
     742           2 :                 r.rangeKeyBH = bh
     743           2 :         }
     744             : 
     745           2 :         for name, fp := range r.opts.Filters {
     746           2 :                 types := []struct {
     747           2 :                         ftype  FilterType
     748           2 :                         prefix string
     749           2 :                 }{
     750           2 :                         {TableFilter, "fullfilter."},
     751           2 :                 }
     752           2 :                 var done bool
     753           2 :                 for _, t := range types {
     754           2 :                         if bh, ok := meta[t.prefix+name]; ok {
     755           2 :                                 r.filterBH = bh
     756           2 : 
     757           2 :                                 switch t.ftype {
     758           2 :                                 case TableFilter:
     759           2 :                                         r.tableFilter = newTableFilterReader(fp)
     760           0 :                                 default:
     761           0 :                                         return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
     762             :                                 }
     763             : 
     764           2 :                                 done = true
     765           2 :                                 break
     766             :                         }
     767             :                 }
     768           2 :                 if done {
     769           2 :                         break
     770             :                 }
     771             :         }
     772           2 :         return nil
     773             : }
     774             : 
     775             : // Layout returns the layout (block organization) for an sstable.
     776           2 : func (r *Reader) Layout() (*Layout, error) {
     777           2 :         if r.err != nil {
     778           0 :                 return nil, r.err
     779           0 :         }
     780             : 
     781           2 :         l := &Layout{
     782           2 :                 Data:       make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
     783           2 :                 Filter:     r.filterBH,
     784           2 :                 RangeDel:   r.rangeDelBH,
     785           2 :                 RangeKey:   r.rangeKeyBH,
     786           2 :                 ValueIndex: r.valueBIH.h,
     787           2 :                 Properties: r.propertiesBH,
     788           2 :                 MetaIndex:  r.metaIndexBH,
     789           2 :                 Footer:     r.footerBH,
     790           2 :                 Format:     r.tableFormat,
     791           2 :         }
     792           2 : 
     793           2 :         indexH, err := r.readIndex(context.Background(), nil)
     794           2 :         if err != nil {
     795           1 :                 return nil, err
     796           1 :         }
     797           2 :         defer indexH.Release()
     798           2 : 
     799           2 :         var alloc bytealloc.A
     800           2 : 
     801           2 :         if r.Properties.IndexPartitions == 0 {
     802           2 :                 l.Index = append(l.Index, r.indexBH)
     803           2 :                 iter, _ := newBlockIter(r.Compare, indexH.Get())
     804           2 :                 for key, value := iter.First(); key != nil; key, value = iter.Next() {
     805           2 :                         dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
     806           2 :                         if err != nil {
     807           0 :                                 return nil, errCorruptIndexEntry
     808           0 :                         }
     809           2 :                         if len(dataBH.Props) > 0 {
     810           1 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     811           1 :                         }
     812           2 :                         l.Data = append(l.Data, dataBH)
     813             :                 }
     814           2 :         } else {
     815           2 :                 l.TopIndex = r.indexBH
     816           2 :                 topIter, _ := newBlockIter(r.Compare, indexH.Get())
     817           2 :                 iter := &blockIter{}
     818           2 :                 for key, value := topIter.First(); key != nil; key, value = topIter.Next() {
     819           2 :                         indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
     820           2 :                         if err != nil {
     821           0 :                                 return nil, errCorruptIndexEntry
     822           0 :                         }
     823           2 :                         l.Index = append(l.Index, indexBH.BlockHandle)
     824           2 : 
     825           2 :                         subIndex, err := r.readBlock(context.Background(), indexBH.BlockHandle,
     826           2 :                                 nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */)
     827           2 :                         if err != nil {
     828           1 :                                 return nil, err
     829           1 :                         }
     830           2 :                         if err := iter.init(r.Compare, subIndex.Get(), 0, /* globalSeqNum */
     831           2 :                                 false /* hideObsoletePoints */); err != nil {
     832           0 :                                 return nil, err
     833           0 :                         }
     834           2 :                         for key, value := iter.First(); key != nil; key, value = iter.Next() {
     835           2 :                                 dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
     836           2 :                                 if len(dataBH.Props) > 0 {
     837           1 :                                         alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     838           1 :                                 }
     839           2 :                                 if err != nil {
     840           0 :                                         return nil, errCorruptIndexEntry
     841           0 :                                 }
     842           2 :                                 l.Data = append(l.Data, dataBH)
     843             :                         }
     844           2 :                         subIndex.Release()
     845           2 :                         *iter = iter.resetForReuse()
     846             :                 }
     847             :         }
     848           2 :         if r.valueBIH.h.Length != 0 {
     849           1 :                 vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil /* buffer pool */)
     850           1 :                 if err != nil {
     851           0 :                         return nil, err
     852           0 :                 }
     853           1 :                 defer vbiH.Release()
     854           1 :                 vbiBlock := vbiH.Get()
     855           1 :                 indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
     856           1 :                         r.valueBIH.blockLengthByteLength)
     857           1 :                 i := 0
     858           1 :                 for len(vbiBlock) != 0 {
     859           1 :                         if len(vbiBlock) < indexEntryLen {
     860           0 :                                 return nil, errors.Errorf(
     861           0 :                                         "remaining value index block %d does not contain a full entry of length %d",
     862           0 :                                         len(vbiBlock), indexEntryLen)
     863           0 :                         }
     864           1 :                         n := int(r.valueBIH.blockNumByteLength)
     865           1 :                         bn := int(littleEndianGet(vbiBlock, n))
     866           1 :                         if bn != i {
     867           0 :                                 return nil, errors.Errorf("unexpected block num %d, expected %d",
     868           0 :                                         bn, i)
     869           0 :                         }
     870           1 :                         i++
     871           1 :                         vbiBlock = vbiBlock[n:]
     872           1 :                         n = int(r.valueBIH.blockOffsetByteLength)
     873           1 :                         blockOffset := littleEndianGet(vbiBlock, n)
     874           1 :                         vbiBlock = vbiBlock[n:]
     875           1 :                         n = int(r.valueBIH.blockLengthByteLength)
     876           1 :                         blockLen := littleEndianGet(vbiBlock, n)
     877           1 :                         vbiBlock = vbiBlock[n:]
     878           1 :                         l.ValueBlock = append(l.ValueBlock, BlockHandle{Offset: blockOffset, Length: blockLen})
     879             :                 }
     880             :         }
     881             : 
     882           2 :         return l, nil
     883             : }
     884             : 
     885             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
     886           2 : func (r *Reader) ValidateBlockChecksums() error {
     887           2 :         // Pre-compute the BlockHandles for the underlying file.
     888           2 :         l, err := r.Layout()
     889           2 :         if err != nil {
     890           1 :                 return err
     891           1 :         }
     892             : 
     893             :         // Construct the set of blocks to check. Note that the footer is not checked
     894             :         // as it is not a block with a checksum.
     895           2 :         blocks := make([]BlockHandle, len(l.Data))
     896           2 :         for i := range l.Data {
     897           2 :                 blocks[i] = l.Data[i].BlockHandle
     898           2 :         }
     899           2 :         blocks = append(blocks, l.Index...)
     900           2 :         blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
     901           2 : 
     902           2 :         // Sorting by offset ensures we are performing a sequential scan of the
     903           2 :         // file.
     904           2 :         sort.Slice(blocks, func(i, j int) bool {
     905           2 :                 return blocks[i].Offset < blocks[j].Offset
     906           2 :         })
     907             : 
     908             :         // Check all blocks sequentially. Make use of read-ahead, given we are
     909             :         // scanning the entire file from start to end.
     910           2 :         rh := r.readable.NewReadHandle(context.TODO())
     911           2 :         defer rh.Close()
     912           2 : 
     913           2 :         for _, bh := range blocks {
     914           2 :                 // Certain blocks may not be present, in which case we skip them.
     915           2 :                 if bh.Length == 0 {
     916           2 :                         continue
     917             :                 }
     918             : 
     919             :                 // Read the block, which validates the checksum.
     920           2 :                 h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* buffer pool */)
     921           2 :                 if err != nil {
     922           1 :                         return err
     923           1 :                 }
     924           2 :                 h.Release()
     925             :         }
     926             : 
     927           2 :         return nil
     928             : }
     929             : 
     930             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
     931             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
     932             : // determine overlap due to abbreviated index keys, the full data block size is
     933             : // included in the estimation.
     934             : //
     935             : // This function does not account for any metablock space usage. Assumes there
     936             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
     937             : // before nor completely after the file's range.
     938             : //
     939             : // Only blocks containing point keys are considered. Range deletion and range
     940             : // key blocks are not considered.
     941             : //
     942             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
     943             : // data blocks overlapped and add that same fraction of the metadata blocks to the
     944             : // estimate.
     945           2 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
     946           2 :         if r.err != nil {
     947           0 :                 return 0, r.err
     948           0 :         }
     949             : 
     950           2 :         indexH, err := r.readIndex(context.Background(), nil)
     951           2 :         if err != nil {
     952           1 :                 return 0, err
     953           1 :         }
     954           2 :         defer indexH.Release()
     955           2 : 
     956           2 :         // Iterators over the bottom-level index blocks containing start and end.
     957           2 :         // These may be different in case of partitioned index but will both point
     958           2 :         // to the same blockIter over the single index in the unpartitioned case.
     959           2 :         var startIdxIter, endIdxIter *blockIter
     960           2 :         if r.Properties.IndexPartitions == 0 {
     961           2 :                 iter, err := newBlockIter(r.Compare, indexH.Get())
     962           2 :                 if err != nil {
     963           0 :                         return 0, err
     964           0 :                 }
     965           2 :                 startIdxIter = iter
     966           2 :                 endIdxIter = iter
     967           2 :         } else {
     968           2 :                 topIter, err := newBlockIter(r.Compare, indexH.Get())
     969           2 :                 if err != nil {
     970           0 :                         return 0, err
     971           0 :                 }
     972             : 
     973           2 :                 key, val := topIter.SeekGE(start, base.SeekGEFlagsNone)
     974           2 :                 if key == nil {
     975           1 :                         // The range falls completely after this file, or an error occurred.
     976           1 :                         return 0, topIter.Error()
     977           1 :                 }
     978           2 :                 startIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
     979           2 :                 if err != nil {
     980           0 :                         return 0, errCorruptIndexEntry
     981           0 :                 }
     982           2 :                 startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.BlockHandle,
     983           2 :                         nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */)
     984           2 :                 if err != nil {
     985           1 :                         return 0, err
     986           1 :                 }
     987           2 :                 defer startIdxBlock.Release()
     988           2 :                 startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get())
     989           2 :                 if err != nil {
     990           0 :                         return 0, err
     991           0 :                 }
     992             : 
     993           2 :                 key, val = topIter.SeekGE(end, base.SeekGEFlagsNone)
     994           2 :                 if key == nil {
     995           1 :                         if err := topIter.Error(); err != nil {
     996           0 :                                 return 0, err
     997           0 :                         }
     998           2 :                 } else {
     999           2 :                         endIdxBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
    1000           2 :                         if err != nil {
    1001           0 :                                 return 0, errCorruptIndexEntry
    1002           0 :                         }
    1003           2 :                         endIdxBlock, err := r.readBlock(context.Background(),
    1004           2 :                                 endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */)
    1005           2 :                         if err != nil {
    1006           1 :                                 return 0, err
    1007           1 :                         }
    1008           2 :                         defer endIdxBlock.Release()
    1009           2 :                         endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get())
    1010           2 :                         if err != nil {
    1011           0 :                                 return 0, err
    1012           0 :                         }
    1013             :                 }
    1014             :         }
    1015             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
    1016             :         // range spans past the end of the file.
    1017             : 
    1018           2 :         key, val := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
    1019           2 :         if key == nil {
    1020           2 :                 // The range falls completely after this file, or an error occurred.
    1021           2 :                 return 0, startIdxIter.Error()
    1022           2 :         }
    1023           2 :         startBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
    1024           2 :         if err != nil {
    1025           0 :                 return 0, errCorruptIndexEntry
    1026           0 :         }
    1027             : 
    1028           2 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
    1029           2 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
    1030           2 :                 // Linearly interpolate what is stored in value blocks.
    1031           2 :                 //
    1032           2 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
    1033           2 :                 // (which contain the value handles, and which may also be insufficient if
    1034           2 :                 // the values are in separate files), we will need to accumulate the
    1035           2 :                 // logical size of the key-value pairs and store the cumulative value for
    1036           2 :                 // each data block in the index block entry. This increases the size of
    1037           2 :                 // the BlockHandle, so wait until this becomes necessary.
    1038           2 :                 return dataBlockSize +
    1039           2 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
    1040           2 :                                 float64(r.Properties.ValueBlocksSize))
    1041           2 :         }
    1042           2 :         if endIdxIter == nil {
    1043           1 :                 // The range spans beyond this file. Include data blocks through the last.
    1044           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1045           1 :         }
    1046           2 :         key, val = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
    1047           2 :         if key == nil {
    1048           2 :                 if err := endIdxIter.Error(); err != nil {
    1049           0 :                         return 0, err
    1050           0 :                 }
    1051             :                 // The range spans beyond this file. Include data blocks through the last.
    1052           2 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1053             :         }
    1054           2 :         endBH, err := decodeBlockHandleWithProperties(val.InPlaceValue())
    1055           2 :         if err != nil {
    1056           0 :                 return 0, errCorruptIndexEntry
    1057           0 :         }
    1058           2 :         return includeInterpolatedValueBlocksSize(
    1059           2 :                 endBH.Offset + endBH.Length + blockTrailerLen - startBH.Offset), nil
    1060             : }
    1061             : 
    1062             : // TableFormat returns the format version for the table.
    1063           2 : func (r *Reader) TableFormat() (TableFormat, error) {
    1064           2 :         if r.err != nil {
    1065           0 :                 return TableFormatUnspecified, r.err
    1066           0 :         }
    1067           2 :         return r.tableFormat, nil
    1068             : }
    1069             : 
    1070             : // NewReader returns a new table reader for the file. Closing the reader will
    1071             : // close the file.
    1072           2 : func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
    1073           2 :         o = o.ensureDefaults()
    1074           2 :         r := &Reader{
    1075           2 :                 readable: f,
    1076           2 :                 opts:     o,
    1077           2 :         }
    1078           2 :         if r.opts.Cache == nil {
    1079           1 :                 r.opts.Cache = cache.New(0)
    1080           2 :         } else {
    1081           2 :                 r.opts.Cache.Ref()
    1082           2 :         }
    1083             : 
    1084           2 :         if f == nil {
    1085           1 :                 r.err = errors.New("pebble/table: nil file")
    1086           1 :                 return nil, r.Close()
    1087           1 :         }
    1088             : 
    1089             :         // Note that the extra options are applied twice. First here for pre-apply
    1090             :         // options, and then below for post-apply options. Pre and post refer to
    1091             :         // before and after reading the metaindex and properties.
    1092           2 :         type preApply interface{ preApply() }
    1093           2 :         for _, opt := range extraOpts {
    1094           2 :                 if _, ok := opt.(preApply); ok {
    1095           2 :                         opt.readerApply(r)
    1096           2 :                 }
    1097             :         }
    1098           2 :         if r.cacheID == 0 {
    1099           1 :                 r.cacheID = r.opts.Cache.NewID()
    1100           1 :         }
    1101             : 
    1102           2 :         footer, err := readFooter(f)
    1103           2 :         if err != nil {
    1104           1 :                 r.err = err
    1105           1 :                 return nil, r.Close()
    1106           1 :         }
    1107           2 :         r.checksumType = footer.checksum
    1108           2 :         r.tableFormat = footer.format
    1109           2 :         // Read the metaindex.
    1110           2 :         if err := r.readMetaindex(footer.metaindexBH); err != nil {
    1111           1 :                 r.err = err
    1112           1 :                 return nil, r.Close()
    1113           1 :         }
    1114           2 :         r.indexBH = footer.indexBH
    1115           2 :         r.metaIndexBH = footer.metaindexBH
    1116           2 :         r.footerBH = footer.footerBH
    1117           2 : 
    1118           2 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
    1119           2 :                 r.Compare = o.Comparer.Compare
    1120           2 :                 r.FormatKey = o.Comparer.FormatKey
    1121           2 :                 r.Split = o.Comparer.Split
    1122           2 :         }
    1123             : 
    1124           2 :         if o.MergerName == r.Properties.MergerName {
    1125           2 :                 r.mergerOK = true
    1126           2 :         }
    1127             : 
    1128             :         // Apply the extra options again now that the comparer and merger names are
    1129             :         // known.
    1130           2 :         for _, opt := range extraOpts {
    1131           2 :                 if _, ok := opt.(preApply); !ok {
    1132           2 :                         opt.readerApply(r)
    1133           2 :                 }
    1134             :         }
    1135             : 
    1136           2 :         if r.Compare == nil {
    1137           1 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
    1138           1 :                         errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName))
    1139           1 :         }
    1140           2 :         if !r.mergerOK {
    1141           1 :                 if name := r.Properties.MergerName; name != "" && name != "nullptr" {
    1142           1 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
    1143           1 :                                 errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName))
    1144           1 :                 }
    1145             :         }
    1146           2 :         if r.err != nil {
    1147           1 :                 return nil, r.Close()
    1148           1 :         }
    1149             : 
    1150           2 :         return r, nil
    1151             : }
    1152             : 
    1153             : // ReadableFile describes the smallest subset of vfs.File that is required for
    1154             : // reading SSTs.
    1155             : type ReadableFile interface {
    1156             :         io.ReaderAt
    1157             :         io.Closer
    1158             :         Stat() (os.FileInfo, error)
    1159             : }
    1160             : 
    1161             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
    1162             : // implementation (which does not support read-ahead)
    1163           2 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
    1164           2 :         info, err := r.Stat()
    1165           2 :         if err != nil {
    1166           1 :                 return nil, err
    1167           1 :         }
    1168           2 :         res := &simpleReadable{
    1169           2 :                 f:    r,
    1170           2 :                 size: info.Size(),
    1171           2 :         }
    1172           2 :         res.rh = objstorage.MakeNoopReadHandle(res)
    1173           2 :         return res, nil
    1174             : }
    1175             : 
    1176             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
    1177             : type simpleReadable struct {
    1178             :         f    ReadableFile
    1179             :         size int64
    1180             :         rh   objstorage.NoopReadHandle
    1181             : }
    1182             : 
    1183             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1184             : 
    1185             : // ReadAt is part of the objstorage.Readable interface.
    1186           2 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1187           2 :         n, err := s.f.ReadAt(p, off)
    1188           2 :         if invariants.Enabled && err == nil && n != len(p) {
    1189           0 :                 panic("short read")
    1190             :         }
    1191           2 :         return err
    1192             : }
    1193             : 
    1194             : // Close is part of the objstorage.Readable interface.
    1195           2 : func (s *simpleReadable) Close() error {
    1196           2 :         return s.f.Close()
    1197           2 : }
    1198             : 
    1199             : // Size is part of the objstorage.Readable interface.
    1200           2 : func (s *simpleReadable) Size() int64 {
    1201           2 :         return s.size
    1202           2 : }
    1203             : 
    1204             : // NewReaddHandle is part of the objstorage.Readable interface.
    1205           2 : func (s *simpleReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
    1206           2 :         return &s.rh
    1207           2 : }

Generated by: LCOV version 1.14