LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2024-06-28 08:15Z 3ef2e5b1 - tests + meta.lcov Lines: 632 743 85.1 %
Date: 2024-06-28 08:16:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "encoding/binary"
      12             :         "io"
      13             :         "os"
      14             :         "slices"
      15             :         "time"
      16             : 
      17             :         "github.com/cespare/xxhash/v2"
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble/internal/base"
      20             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      21             :         "github.com/cockroachdb/pebble/internal/cache"
      22             :         "github.com/cockroachdb/pebble/internal/crc"
      23             :         "github.com/cockroachdb/pebble/internal/invariants"
      24             :         "github.com/cockroachdb/pebble/internal/keyspan"
      25             :         "github.com/cockroachdb/pebble/internal/private"
      26             :         "github.com/cockroachdb/pebble/objstorage"
      27             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      28             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      29             :         "github.com/cockroachdb/pebble/sstable/block"
      30             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      31             : )
      32             : 
      33             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      34             : 
      35             : // decodeBlockHandle returns the block handle encoded at the start of src, as
      36             : // well as the number of bytes it occupies. It returns zero if given invalid
      37             : // input. A block handle for a data block or a first/lower level index block
      38             : // should not be decoded using decodeBlockHandle since the caller may validate
      39             : // that the number of bytes decoded is equal to the length of src, which will
      40             : // be false if the properties are not decoded. In those cases the caller
      41             : // should use decodeBlockHandleWithProperties.
      42           2 : func decodeBlockHandle(src []byte) (block.Handle, int) {
      43           2 :         offset, n := binary.Uvarint(src)
      44           2 :         length, m := binary.Uvarint(src[n:])
      45           2 :         if n == 0 || m == 0 {
      46           0 :                 return block.Handle{}, 0
      47           0 :         }
      48           2 :         return block.Handle{Offset: offset, Length: length}, n + m
      49             : }
      50             : 
      51             : // decodeBlockHandleWithProperties returns the block handle and properties
      52             : // encoded in src. src needs to be exactly the length that was encoded. This
      53             : // method must be used for data block and first/lower level index blocks. The
      54             : // properties in the block handle point to the bytes in src.
      55           2 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
      56           2 :         bh, n := decodeBlockHandle(src)
      57           2 :         if n == 0 {
      58           0 :                 return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
      59           0 :         }
      60           2 :         return BlockHandleWithProperties{
      61           2 :                 Handle: bh,
      62           2 :                 Props:  src[n:],
      63           2 :         }, nil
      64             : }
      65             : 
      66           2 : func encodeBlockHandle(dst []byte, b block.Handle) int {
      67           2 :         n := binary.PutUvarint(dst, b.Offset)
      68           2 :         m := binary.PutUvarint(dst[n:], b.Length)
      69           2 :         return n + m
      70           2 : }
      71             : 
      72           2 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
      73           2 :         n := encodeBlockHandle(dst, b.Handle)
      74           2 :         dst = append(dst[:n], b.Props...)
      75           2 :         return dst
      76           2 : }
      77             : 
      78             : type loadBlockResult int8
      79             : 
      80             : const (
      81             :         loadBlockOK loadBlockResult = iota
      82             :         // Could be due to error or because no block left to load.
      83             :         loadBlockFailed
      84             :         loadBlockIrrelevant
      85             : )
      86             : 
      87             : type blockTransform func([]byte) ([]byte, error)
      88             : 
      89             : // ReaderOption provide an interface to do work on Reader while it is being
      90             : // opened.
      91             : type ReaderOption interface {
      92             :         // readerApply is called on the reader during opening in order to set internal
      93             :         // parameters.
      94             :         readerApply(*Reader)
      95             : }
      96             : 
      97             : // Comparers is a map from comparer name to comparer. It is used for debugging
      98             : // tools which may be used on multiple databases configured with different
      99             : // comparers. Comparers implements the OpenOption interface and can be passed
     100             : // as a parameter to NewReader.
     101             : type Comparers map[string]*Comparer
     102             : 
     103           1 : func (c Comparers) readerApply(r *Reader) {
     104           1 :         if r.Compare != nil || r.Properties.ComparerName == "" {
     105           1 :                 return
     106           1 :         }
     107           1 :         if comparer, ok := c[r.Properties.ComparerName]; ok {
     108           1 :                 r.Compare = comparer.Compare
     109           1 :                 r.Equal = comparer.Equal
     110           1 :                 r.FormatKey = comparer.FormatKey
     111           1 :                 r.Split = comparer.Split
     112           1 :         }
     113             : }
     114             : 
     115             : // Mergers is a map from merger name to merger. It is used for debugging tools
     116             : // which may be used on multiple databases configured with different
     117             : // mergers. Mergers implements the OpenOption interface and can be passed as
     118             : // a parameter to NewReader.
     119             : type Mergers map[string]*Merger
     120             : 
     121           1 : func (m Mergers) readerApply(r *Reader) {
     122           1 :         if r.mergerOK || r.Properties.MergerName == "" {
     123           1 :                 return
     124           1 :         }
     125           1 :         _, r.mergerOK = m[r.Properties.MergerName]
     126             : }
     127             : 
     128             : // cacheOpts is a Reader open option for specifying the cache ID and sstable file
     129             : // number. If not specified, a unique cache ID will be used.
     130             : type cacheOpts struct {
     131             :         cacheID uint64
     132             :         fileNum base.DiskFileNum
     133             : }
     134             : 
     135             : // Marker function to indicate the option should be applied before reading the
     136             : // sstable properties and, in the write path, before writing the default
     137             : // sstable properties.
     138           0 : func (c *cacheOpts) preApply() {}
     139             : 
     140           2 : func (c *cacheOpts) readerApply(r *Reader) {
     141           2 :         if r.cacheID == 0 {
     142           2 :                 r.cacheID = c.cacheID
     143           2 :         }
     144           2 :         if r.fileNum == 0 {
     145           2 :                 r.fileNum = c.fileNum
     146           2 :         }
     147             : }
     148             : 
     149           2 : func (c *cacheOpts) writerApply(w *Writer) {
     150           2 :         if w.layout.cacheID == 0 {
     151           2 :                 w.layout.cacheID = c.cacheID
     152           2 :         }
     153           2 :         if w.layout.fileNum == 0 {
     154           2 :                 w.layout.fileNum = c.fileNum
     155           2 :         }
     156             : }
     157             : 
     158             : // rawTombstonesOpt is a Reader open option for specifying that range
     159             : // tombstones returned by Reader.NewRangeDelIter() should not be
     160             : // fragmented. Used by debug tools to get a raw view of the tombstones
     161             : // contained in an sstable.
     162             : type rawTombstonesOpt struct{}
     163             : 
     164           0 : func (rawTombstonesOpt) preApply() {}
     165             : 
     166           1 : func (rawTombstonesOpt) readerApply(r *Reader) {
     167           1 :         r.rawTombstones = true
     168           1 : }
     169             : 
     170           2 : func init() {
     171           2 :         private.SSTableCacheOpts = func(cacheID uint64, fileNum base.DiskFileNum) interface{} {
     172           2 :                 return &cacheOpts{cacheID, fileNum}
     173           2 :         }
     174           2 :         private.SSTableRawTombstonesOpt = rawTombstonesOpt{}
     175             : }
     176             : 
     177             : // Reader is a table reader.
     178             : type Reader struct {
     179             :         readable     objstorage.Readable
     180             :         cacheID      uint64
     181             :         fileNum      base.DiskFileNum
     182             :         err          error
     183             :         indexBH      block.Handle
     184             :         filterBH     block.Handle
     185             :         rangeDelBH   block.Handle
     186             :         rangeKeyBH   block.Handle
     187             :         valueBIH     valueBlocksIndexHandle
     188             :         propertiesBH block.Handle
     189             :         metaIndexBH  block.Handle
     190             :         footerBH     block.Handle
     191             :         opts         ReaderOptions
     192             :         Compare      Compare
     193             :         Equal        Equal
     194             :         FormatKey    base.FormatKey
     195             :         Split        Split
     196             :         tableFilter  *tableFilterReader
     197             :         // Keep types that are not multiples of 8 bytes at the end and with
     198             :         // decreasing size.
     199             :         Properties    Properties
     200             :         tableFormat   TableFormat
     201             :         rawTombstones bool
     202             :         mergerOK      bool
     203             :         checksumType  block.ChecksumType
     204             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
     205             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
     206             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
     207             :         // capacity 3 to accommodate the meta block (1), and both the compressed
     208             :         // properties block (1) and decompressed properties block (1)
     209             :         // simultaneously.
     210             :         metaBufferPool      block.BufferPool
     211             :         metaBufferPoolAlloc [3]block.AllocedBuffer
     212             : }
     213             : 
     214             : var _ CommonReader = (*Reader)(nil)
     215             : 
     216             : // Close the reader and the underlying objstorage.Readable.
     217           2 : func (r *Reader) Close() error {
     218           2 :         r.opts.Cache.Unref()
     219           2 : 
     220           2 :         if r.readable != nil {
     221           2 :                 r.err = firstError(r.err, r.readable.Close())
     222           2 :                 r.readable = nil
     223           2 :         }
     224             : 
     225           2 :         if r.err != nil {
     226           1 :                 return r.err
     227           1 :         }
     228             :         // Make any future calls to Get, NewIter or Close return an error.
     229           2 :         r.err = errReaderClosed
     230           2 :         return nil
     231             : }
     232             : 
     233             : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
     234             : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
     235             : // itself and returns a nil iterator.
     236             : func (r *Reader) NewIterWithBlockPropertyFilters(
     237             :         transforms IterTransforms,
     238             :         lower, upper []byte,
     239             :         filterer *BlockPropertiesFilterer,
     240             :         useFilterBlock bool,
     241             :         stats *base.InternalIteratorStats,
     242             :         categoryAndQoS CategoryAndQoS,
     243             :         statsCollector *CategoryStatsCollector,
     244             :         rp ReaderProvider,
     245           2 : ) (Iterator, error) {
     246           2 :         return r.newIterWithBlockPropertyFiltersAndContext(
     247           2 :                 context.Background(), transforms, lower, upper, filterer, useFilterBlock,
     248           2 :                 stats, categoryAndQoS, statsCollector, rp, nil)
     249           2 : }
     250             : 
     251             : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
     252             : // NewIterWithBlockPropertyFilters and additionally accepts a context for
     253             : // tracing.
     254             : //
     255             : // If transform.HideObsoletePoints is set, the callee assumes that filterer
     256             : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
     257             : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     258             : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
     259             :         ctx context.Context,
     260             :         transforms IterTransforms,
     261             :         lower, upper []byte,
     262             :         filterer *BlockPropertiesFilterer,
     263             :         useFilterBlock bool,
     264             :         stats *base.InternalIteratorStats,
     265             :         categoryAndQoS CategoryAndQoS,
     266             :         statsCollector *CategoryStatsCollector,
     267             :         rp ReaderProvider,
     268           2 : ) (Iterator, error) {
     269           2 :         return r.newIterWithBlockPropertyFiltersAndContext(
     270           2 :                 ctx, transforms, lower, upper, filterer, useFilterBlock,
     271           2 :                 stats, categoryAndQoS, statsCollector, rp, nil)
     272           2 : }
     273             : 
     274             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     275             : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
     276             : // value of hideObsoletePoints and potentially add a block property filter.
     277             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     278             :         snapshotForHideObsoletePoints base.SeqNum,
     279             :         fileLargestSeqNum base.SeqNum,
     280             :         pointKeyFilters []BlockPropertyFilter,
     281           2 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     282           2 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     283           2 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     284           2 :         if hideObsoletePoints {
     285           2 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     286           2 :         }
     287           2 :         return hideObsoletePoints, pointKeyFilters
     288             : }
     289             : 
     290             : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
     291             :         ctx context.Context,
     292             :         transforms IterTransforms,
     293             :         lower, upper []byte,
     294             :         filterer *BlockPropertiesFilterer,
     295             :         useFilterBlock bool,
     296             :         stats *base.InternalIteratorStats,
     297             :         categoryAndQoS CategoryAndQoS,
     298             :         statsCollector *CategoryStatsCollector,
     299             :         rp ReaderProvider,
     300             :         vState *virtualState,
     301           2 : ) (Iterator, error) {
     302           2 :         // NB: pebble.tableCache wraps the returned iterator with one which performs
     303           2 :         // reference counting on the Reader, preventing the Reader from being closed
     304           2 :         // until the final iterator closes.
     305           2 :         if r.Properties.IndexType == twoLevelIndex {
     306           2 :                 i := twoLevelIterPool.Get().(*twoLevelIterator)
     307           2 :                 err := i.init(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
     308           2 :                         stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     309           2 :                 if err != nil {
     310           1 :                         return nil, err
     311           1 :                 }
     312           2 :                 return i, nil
     313             :         }
     314             : 
     315           2 :         i := singleLevelIterPool.Get().(*singleLevelIterator)
     316           2 :         err := i.init(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
     317           2 :                 stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     318           2 :         if err != nil {
     319           1 :                 return nil, err
     320           1 :         }
     321           2 :         return i, nil
     322             : }
     323             : 
     324             : // NewIter returns an iterator for the contents of the table. If an error
     325             : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
     326             : // must only be used when the Reader is guaranteed to outlive any LazyValues
     327             : // returned from the iter.
     328           2 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
     329           2 :         return r.NewIterWithBlockPropertyFilters(
     330           2 :                 transforms, lower, upper, nil, true, /* useFilterBlock */
     331           2 :                 nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, TrivialReaderProvider{Reader: r})
     332           2 : }
     333             : 
     334             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     335             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     336             : // after itself and returns a nil iterator.
     337             : func (r *Reader) NewCompactionIter(
     338             :         transforms IterTransforms,
     339             :         categoryAndQoS CategoryAndQoS,
     340             :         statsCollector *CategoryStatsCollector,
     341             :         rp ReaderProvider,
     342             :         bufferPool *block.BufferPool,
     343           2 : ) (Iterator, error) {
     344           2 :         return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool)
     345           2 : }
     346             : 
     347             : func (r *Reader) newCompactionIter(
     348             :         transforms IterTransforms,
     349             :         categoryAndQoS CategoryAndQoS,
     350             :         statsCollector *CategoryStatsCollector,
     351             :         rp ReaderProvider,
     352             :         vState *virtualState,
     353             :         bufferPool *block.BufferPool,
     354           2 : ) (Iterator, error) {
     355           2 :         if vState != nil && vState.isSharedIngested {
     356           2 :                 transforms.HideObsoletePoints = true
     357           2 :         }
     358           2 :         if r.Properties.IndexType == twoLevelIndex {
     359           2 :                 i := twoLevelIterPool.Get().(*twoLevelIterator)
     360           2 :                 err := i.init(
     361           2 :                         context.Background(),
     362           2 :                         r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     363           2 :                         false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     364           2 :                 )
     365           2 :                 if err != nil {
     366           0 :                         return nil, err
     367           0 :                 }
     368           2 :                 i.setupForCompaction()
     369           2 :                 return &twoLevelCompactionIterator{twoLevelIterator: i}, nil
     370             :         }
     371           2 :         i := singleLevelIterPool.Get().(*singleLevelIterator)
     372           2 :         err := i.init(
     373           2 :                 context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     374           2 :                 nil, false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     375           2 :         )
     376           2 :         if err != nil {
     377           0 :                 return nil, err
     378           0 :         }
     379           2 :         i.setupForCompaction()
     380           2 :         return &compactionIterator{singleLevelIterator: i}, nil
     381             : }
     382             : 
     383             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     384             : // range-del block for the table. Returns nil if the table does not contain
     385             : // any range deletions.
     386             : //
     387             : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
     388             : // iterator. Add WithContext methods since the existing ones are public.
     389             : func (r *Reader) NewRawRangeDelIter(
     390             :         transforms FragmentIterTransforms,
     391           2 : ) (keyspan.FragmentIterator, error) {
     392           2 :         if r.rangeDelBH.Length == 0 {
     393           2 :                 return nil, nil
     394           2 :         }
     395           2 :         h, err := r.readRangeDel(nil /* stats */, nil /* iterStats */)
     396           2 :         if err != nil {
     397           1 :                 return nil, err
     398           1 :         }
     399           2 :         transforms.ElideSameSeqNum = true
     400           2 :         i, err := rowblk.NewFragmentIter(r.Compare, r.Split, h, transforms)
     401           2 :         if err != nil {
     402           0 :                 return nil, err
     403           0 :         }
     404           2 :         return keyspan.MaybeAssert(i, r.Compare), nil
     405             : }
     406             : 
     407             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     408             : // range-key block for the table. Returns nil if the table does not contain any
     409             : // range keys.
     410             : //
     411             : // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
     412             : // iterator. Add WithContext methods since the existing ones are public.
     413             : func (r *Reader) NewRawRangeKeyIter(
     414             :         transforms FragmentIterTransforms,
     415           2 : ) (keyspan.FragmentIterator, error) {
     416           2 :         if r.rangeKeyBH.Length == 0 {
     417           2 :                 return nil, nil
     418           2 :         }
     419           2 :         h, err := r.readRangeKey(nil /* stats */, nil /* iterStats */)
     420           2 :         if err != nil {
     421           1 :                 return nil, err
     422           1 :         }
     423           2 :         i, err := rowblk.NewFragmentIter(r.Compare, r.Split, h, transforms)
     424           2 :         if err != nil {
     425           0 :                 return nil, err
     426           0 :         }
     427           2 :         return keyspan.MaybeAssert(i, r.Compare), nil
     428             : }
     429             : 
     430             : func (r *Reader) readIndex(
     431             :         ctx context.Context,
     432             :         readHandle objstorage.ReadHandle,
     433             :         stats *base.InternalIteratorStats,
     434             :         iterStats *iterStatsAccumulator,
     435           2 : ) (block.BufferHandle, error) {
     436           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     437           2 :         return r.readBlock(ctx, r.indexBH, nil, readHandle, stats, iterStats, nil /* buffer pool */)
     438           2 : }
     439             : 
     440             : func (r *Reader) readFilter(
     441             :         ctx context.Context,
     442             :         readHandle objstorage.ReadHandle,
     443             :         stats *base.InternalIteratorStats,
     444             :         iterStats *iterStatsAccumulator,
     445           2 : ) (block.BufferHandle, error) {
     446           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     447           2 :         return r.readBlock(ctx, r.filterBH, nil /* transform */, readHandle, stats, iterStats, nil /* buffer pool */)
     448           2 : }
     449             : 
     450             : func (r *Reader) readRangeDel(
     451             :         stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     452           2 : ) (block.BufferHandle, error) {
     453           2 :         ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
     454           2 :         return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     455           2 : }
     456             : 
     457             : func (r *Reader) readRangeKey(
     458             :         stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     459           2 : ) (block.BufferHandle, error) {
     460           2 :         ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
     461           2 :         return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     462           2 : }
     463             : 
     464             : func checkChecksum(
     465             :         checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
     466           2 : ) error {
     467           2 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     468           2 :         var computedChecksum uint32
     469           2 :         switch checksumType {
     470           2 :         case block.ChecksumTypeCRC32c:
     471           2 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     472           1 :         case block.ChecksumTypeXXHash64:
     473           1 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     474           0 :         default:
     475           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     476             :         }
     477             : 
     478           2 :         if expectedChecksum != computedChecksum {
     479           1 :                 return base.CorruptionErrorf(
     480           1 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     481           1 :                         fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
     482           1 :         }
     483           2 :         return nil
     484             : }
     485             : 
     486             : // DeterministicReadBlockDurationForTesting is for tests that want a
     487             : // deterministic value of the time to read a block (that is not in the cache).
     488             : // The return value is a function that must be called before the test exits.
     489           1 : func DeterministicReadBlockDurationForTesting() func() {
     490           1 :         drbdForTesting := deterministicReadBlockDurationForTesting
     491           1 :         deterministicReadBlockDurationForTesting = true
     492           1 :         return func() {
     493           1 :                 deterministicReadBlockDurationForTesting = drbdForTesting
     494           1 :         }
     495             : }
     496             : 
     497             : var deterministicReadBlockDurationForTesting = false
     498             : 
     499             : func (r *Reader) readBlock(
     500             :         ctx context.Context,
     501             :         bh block.Handle,
     502             :         transform blockTransform,
     503             :         readHandle objstorage.ReadHandle,
     504             :         stats *base.InternalIteratorStats,
     505             :         iterStats *iterStatsAccumulator,
     506             :         bufferPool *block.BufferPool,
     507           2 : ) (handle block.BufferHandle, _ error) {
     508           2 :         if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
     509           2 :                 // Cache hit.
     510           2 :                 if readHandle != nil {
     511           2 :                         readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
     512           2 :                 }
     513           2 :                 if stats != nil {
     514           2 :                         stats.BlockBytes += bh.Length
     515           2 :                         stats.BlockBytesInCache += bh.Length
     516           2 :                 }
     517           2 :                 if iterStats != nil {
     518           2 :                         iterStats.reportStats(bh.Length, bh.Length, 0)
     519           2 :                 }
     520             :                 // This block is already in the cache; return a handle to existing vlaue
     521             :                 // in the cache.
     522           2 :                 return block.CacheBufferHandle(h), nil
     523             :         }
     524             : 
     525             :         // Cache miss.
     526             : 
     527           2 :         if sema := r.opts.LoadBlockSema; sema != nil {
     528           1 :                 if err := sema.Acquire(ctx, 1); err != nil {
     529           0 :                         // An error here can only come from the context.
     530           0 :                         return block.BufferHandle{}, err
     531           0 :                 }
     532           1 :                 defer sema.Release(1)
     533             :         }
     534             : 
     535           2 :         compressed := block.Alloc(int(bh.Length+block.TrailerLen), bufferPool)
     536           2 :         readStartTime := time.Now()
     537           2 :         var err error
     538           2 :         if readHandle != nil {
     539           2 :                 err = readHandle.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
     540           2 :         } else {
     541           2 :                 err = r.readable.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
     542           2 :         }
     543           2 :         readDuration := time.Since(readStartTime)
     544           2 :         // TODO(sumeer): should the threshold be configurable.
     545           2 :         const slowReadTracingThreshold = 5 * time.Millisecond
     546           2 :         // For deterministic testing.
     547           2 :         if deterministicReadBlockDurationForTesting {
     548           1 :                 readDuration = slowReadTracingThreshold
     549           1 :         }
     550             :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     551             :         // interface{}, unless necessary.
     552           2 :         if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) {
     553           1 :                 r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s",
     554           1 :                         int(bh.Length+block.TrailerLen), readDuration.String())
     555           1 :         }
     556           2 :         if stats != nil {
     557           2 :                 stats.BlockBytes += bh.Length
     558           2 :                 stats.BlockReadDuration += readDuration
     559           2 :         }
     560           2 :         if err != nil {
     561           1 :                 compressed.Release()
     562           1 :                 return block.BufferHandle{}, err
     563           1 :         }
     564           2 :         if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.fileNum); err != nil {
     565           1 :                 compressed.Release()
     566           1 :                 return block.BufferHandle{}, err
     567           1 :         }
     568             : 
     569           2 :         typ := blockType(compressed.Get()[bh.Length])
     570           2 :         compressed.Truncate(int(bh.Length))
     571           2 : 
     572           2 :         var decompressed block.Value
     573           2 :         if typ == noCompressionBlockType {
     574           2 :                 decompressed = compressed
     575           2 :         } else {
     576           2 :                 // Decode the length of the decompressed value.
     577           2 :                 decodedLen, prefixLen, err := decompressedLen(typ, compressed.Get())
     578           2 :                 if err != nil {
     579           0 :                         compressed.Release()
     580           0 :                         return block.BufferHandle{}, err
     581           0 :                 }
     582             : 
     583           2 :                 decompressed = block.Alloc(decodedLen, bufferPool)
     584           2 :                 if err := decompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
     585           0 :                         compressed.Release()
     586           0 :                         return block.BufferHandle{}, err
     587           0 :                 }
     588           2 :                 compressed.Release()
     589             :         }
     590             : 
     591           2 :         if transform != nil {
     592           0 :                 // Transforming blocks is very rare, so the extra copy of the
     593           0 :                 // transformed data is not problematic.
     594           0 :                 tmpTransformed, err := transform(decompressed.Get())
     595           0 :                 if err != nil {
     596           0 :                         decompressed.Release()
     597           0 :                         return block.BufferHandle{}, err
     598           0 :                 }
     599             : 
     600           0 :                 transformed := block.Alloc(len(tmpTransformed), bufferPool)
     601           0 :                 copy(transformed.Get(), tmpTransformed)
     602           0 :                 decompressed.Release()
     603           0 :                 decompressed = transformed
     604             :         }
     605             : 
     606           2 :         if iterStats != nil {
     607           2 :                 iterStats.reportStats(bh.Length, 0, readDuration)
     608           2 :         }
     609           2 :         h := decompressed.MakeHandle(r.opts.Cache, r.cacheID, r.fileNum, bh.Offset)
     610           2 :         return h, nil
     611             : }
     612             : 
     613           2 : func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.ReadHandle) error {
     614           2 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     615           2 :         // populating the block cache with these blocks. In heavy-write workloads,
     616           2 :         // especially with high compaction concurrency, new tables may be created
     617           2 :         // frequently. Populating the block cache with these metaindex blocks adds
     618           2 :         // additional contention on the block cache mutexes (see #1997).
     619           2 :         // Additionally, these blocks are exceedingly unlikely to be read again
     620           2 :         // while they're still in the block cache except in misconfigurations with
     621           2 :         // excessive sstables counts or a table cache that's far too small.
     622           2 :         r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
     623           2 :         // When we're finished, release the buffers we've allocated back to memory
     624           2 :         // allocator. We don't expect to use metaBufferPool again.
     625           2 :         defer r.metaBufferPool.Release()
     626           2 : 
     627           2 :         b, err := r.readBlock(
     628           2 :                 context.Background(), metaindexBH, nil /* transform */, readHandle, nil, /* stats */
     629           2 :                 nil /* iterStats */, &r.metaBufferPool)
     630           2 :         if err != nil {
     631           1 :                 return err
     632           1 :         }
     633           2 :         data := b.Get()
     634           2 :         defer b.Release()
     635           2 : 
     636           2 :         if uint64(len(data)) != metaindexBH.Length {
     637           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     638           0 :                         errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
     639           0 :         }
     640             : 
     641           2 :         i, err := rowblk.NewRawIter(bytes.Compare, data)
     642           2 :         if err != nil {
     643           0 :                 return err
     644           0 :         }
     645             : 
     646           2 :         meta := map[string]block.Handle{}
     647           2 :         for valid := i.First(); valid; valid = i.Next() {
     648           2 :                 value := i.Value()
     649           2 :                 if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
     650           2 :                         vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
     651           2 :                         if err != nil {
     652           0 :                                 return err
     653           0 :                         }
     654           2 :                         if n == 0 || n != len(value) {
     655           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
     656           0 :                         }
     657           2 :                         r.valueBIH = vbih
     658           2 :                 } else {
     659           2 :                         bh, n := decodeBlockHandle(value)
     660           2 :                         if n == 0 || n != len(value) {
     661           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
     662           0 :                         }
     663           2 :                         meta[string(i.Key().UserKey)] = bh
     664             :                 }
     665             :         }
     666           2 :         if err := i.Close(); err != nil {
     667           0 :                 return err
     668           0 :         }
     669             : 
     670           2 :         if bh, ok := meta[metaPropertiesName]; ok {
     671           2 :                 b, err = r.readBlock(
     672           2 :                         context.Background(), bh, nil /* transform */, readHandle, nil, /* stats */
     673           2 :                         nil /* iterStats */, nil /* buffer pool */)
     674           2 :                 if err != nil {
     675           1 :                         return err
     676           1 :                 }
     677           2 :                 r.propertiesBH = bh
     678           2 :                 err := r.Properties.load(b.Get(), r.opts.DeniedUserProperties)
     679           2 :                 b.Release()
     680           2 :                 if err != nil {
     681           0 :                         return err
     682           0 :                 }
     683             :         }
     684             : 
     685           2 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     686           2 :                 r.rangeDelBH = bh
     687           2 :         } else if _, ok := meta[metaRangeDelV1Name]; ok {
     688           0 :                 // This version of Pebble requires a format major version at least as
     689           0 :                 // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
     690           0 :                 // this format major verison, we have a guarantee that we've compacted
     691           0 :                 // away all RocksDB sstables. It should not be possible to encounter an
     692           0 :                 // sstable with a v1 range deletion block but not a v2 range deletion
     693           0 :                 // block.
     694           0 :                 err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
     695           0 :                 return errors.Mark(err, base.ErrCorruption)
     696           0 :         }
     697             : 
     698           2 :         if bh, ok := meta[metaRangeKeyName]; ok {
     699           2 :                 r.rangeKeyBH = bh
     700           2 :         }
     701             : 
     702           2 :         for name, fp := range r.opts.Filters {
     703           2 :                 types := []struct {
     704           2 :                         ftype  FilterType
     705           2 :                         prefix string
     706           2 :                 }{
     707           2 :                         {TableFilter, "fullfilter."},
     708           2 :                 }
     709           2 :                 var done bool
     710           2 :                 for _, t := range types {
     711           2 :                         if bh, ok := meta[t.prefix+name]; ok {
     712           2 :                                 r.filterBH = bh
     713           2 : 
     714           2 :                                 switch t.ftype {
     715           2 :                                 case TableFilter:
     716           2 :                                         r.tableFilter = newTableFilterReader(fp)
     717           0 :                                 default:
     718           0 :                                         return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
     719             :                                 }
     720             : 
     721           2 :                                 done = true
     722           2 :                                 break
     723             :                         }
     724             :                 }
     725           2 :                 if done {
     726           2 :                         break
     727             :                 }
     728             :         }
     729           2 :         return nil
     730             : }
     731             : 
     732             : // Layout returns the layout (block organization) for an sstable.
     733           2 : func (r *Reader) Layout() (*Layout, error) {
     734           2 :         if r.err != nil {
     735           0 :                 return nil, r.err
     736           0 :         }
     737             : 
     738           2 :         l := &Layout{
     739           2 :                 Data:       make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
     740           2 :                 Filter:     r.filterBH,
     741           2 :                 RangeDel:   r.rangeDelBH,
     742           2 :                 RangeKey:   r.rangeKeyBH,
     743           2 :                 ValueIndex: r.valueBIH.h,
     744           2 :                 Properties: r.propertiesBH,
     745           2 :                 MetaIndex:  r.metaIndexBH,
     746           2 :                 Footer:     r.footerBH,
     747           2 :                 Format:     r.tableFormat,
     748           2 :         }
     749           2 : 
     750           2 :         indexH, err := r.readIndex(context.Background(), nil, nil, nil)
     751           2 :         if err != nil {
     752           1 :                 return nil, err
     753           1 :         }
     754           2 :         defer indexH.Release()
     755           2 : 
     756           2 :         var alloc bytealloc.A
     757           2 : 
     758           2 :         if r.Properties.IndexPartitions == 0 {
     759           2 :                 l.Index = append(l.Index, r.indexBH)
     760           2 :                 iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     761           2 :                 for kv := iter.First(); kv != nil; kv = iter.Next() {
     762           2 :                         dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     763           2 :                         if err != nil {
     764           0 :                                 return nil, errCorruptIndexEntry(err)
     765           0 :                         }
     766           2 :                         if len(dataBH.Props) > 0 {
     767           2 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     768           2 :                         }
     769           2 :                         l.Data = append(l.Data, dataBH)
     770             :                 }
     771           2 :         } else {
     772           2 :                 l.TopIndex = r.indexBH
     773           2 :                 topIter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     774           2 :                 iter := &rowblk.Iter{}
     775           2 :                 for kv := topIter.First(); kv != nil; kv = topIter.Next() {
     776           2 :                         indexBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     777           2 :                         if err != nil {
     778           0 :                                 return nil, errCorruptIndexEntry(err)
     779           0 :                         }
     780           2 :                         l.Index = append(l.Index, indexBH.Handle)
     781           2 : 
     782           2 :                         subIndex, err := r.readBlock(context.Background(), indexBH.Handle,
     783           2 :                                 nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     784           2 :                         if err != nil {
     785           1 :                                 return nil, err
     786           1 :                         }
     787             :                         // TODO(msbutler): figure out how to pass virtualState to layout call.
     788           2 :                         if err := iter.Init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
     789           0 :                                 return nil, err
     790           0 :                         }
     791           2 :                         for kv := iter.First(); kv != nil; kv = iter.Next() {
     792           2 :                                 dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     793           2 :                                 if len(dataBH.Props) > 0 {
     794           2 :                                         alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     795           2 :                                 }
     796           2 :                                 if err != nil {
     797           0 :                                         return nil, errCorruptIndexEntry(err)
     798           0 :                                 }
     799           2 :                                 l.Data = append(l.Data, dataBH)
     800             :                         }
     801           2 :                         subIndex.Release()
     802           2 :                         *iter = iter.ResetForReuse()
     803             :                 }
     804             :         }
     805           2 :         if r.valueBIH.h.Length != 0 {
     806           2 :                 vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
     807           2 :                 if err != nil {
     808           0 :                         return nil, err
     809           0 :                 }
     810           2 :                 defer vbiH.Release()
     811           2 :                 vbiBlock := vbiH.Get()
     812           2 :                 indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
     813           2 :                         r.valueBIH.blockLengthByteLength)
     814           2 :                 i := 0
     815           2 :                 for len(vbiBlock) != 0 {
     816           2 :                         if len(vbiBlock) < indexEntryLen {
     817           0 :                                 return nil, errors.Errorf(
     818           0 :                                         "remaining value index block %d does not contain a full entry of length %d",
     819           0 :                                         len(vbiBlock), indexEntryLen)
     820           0 :                         }
     821           2 :                         n := int(r.valueBIH.blockNumByteLength)
     822           2 :                         bn := int(littleEndianGet(vbiBlock, n))
     823           2 :                         if bn != i {
     824           0 :                                 return nil, errors.Errorf("unexpected block num %d, expected %d",
     825           0 :                                         bn, i)
     826           0 :                         }
     827           2 :                         i++
     828           2 :                         vbiBlock = vbiBlock[n:]
     829           2 :                         n = int(r.valueBIH.blockOffsetByteLength)
     830           2 :                         blockOffset := littleEndianGet(vbiBlock, n)
     831           2 :                         vbiBlock = vbiBlock[n:]
     832           2 :                         n = int(r.valueBIH.blockLengthByteLength)
     833           2 :                         blockLen := littleEndianGet(vbiBlock, n)
     834           2 :                         vbiBlock = vbiBlock[n:]
     835           2 :                         l.ValueBlock = append(l.ValueBlock, block.Handle{Offset: blockOffset, Length: blockLen})
     836             :                 }
     837             :         }
     838             : 
     839           2 :         return l, nil
     840             : }
     841             : 
     842             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
     843           2 : func (r *Reader) ValidateBlockChecksums() error {
     844           2 :         // Pre-compute the BlockHandles for the underlying file.
     845           2 :         l, err := r.Layout()
     846           2 :         if err != nil {
     847           1 :                 return err
     848           1 :         }
     849             : 
     850             :         // Construct the set of blocks to check. Note that the footer is not checked
     851             :         // as it is not a block with a checksum.
     852           2 :         blocks := make([]block.Handle, len(l.Data))
     853           2 :         for i := range l.Data {
     854           2 :                 blocks[i] = l.Data[i].Handle
     855           2 :         }
     856           2 :         blocks = append(blocks, l.Index...)
     857           2 :         blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
     858           2 : 
     859           2 :         // Sorting by offset ensures we are performing a sequential scan of the
     860           2 :         // file.
     861           2 :         slices.SortFunc(blocks, func(a, b block.Handle) int {
     862           2 :                 return cmp.Compare(a.Offset, b.Offset)
     863           2 :         })
     864             : 
     865             :         // Check all blocks sequentially. Make use of read-ahead, given we are
     866             :         // scanning the entire file from start to end.
     867           2 :         rh := r.readable.NewReadHandle(context.TODO(), objstorage.NoReadBefore)
     868           2 :         defer rh.Close()
     869           2 : 
     870           2 :         for _, bh := range blocks {
     871           2 :                 // Certain blocks may not be present, in which case we skip them.
     872           2 :                 if bh.Length == 0 {
     873           2 :                         continue
     874             :                 }
     875             : 
     876             :                 // Read the block, which validates the checksum.
     877           2 :                 h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
     878           2 :                 if err != nil {
     879           1 :                         return err
     880           1 :                 }
     881           2 :                 h.Release()
     882             :         }
     883             : 
     884           2 :         return nil
     885             : }
     886             : 
     887             : // CommonProperties implemented the CommonReader interface.
     888           2 : func (r *Reader) CommonProperties() *CommonProperties {
     889           2 :         return &r.Properties.CommonProperties
     890           2 : }
     891             : 
     892             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
     893             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
     894             : // determine overlap due to abbreviated index keys, the full data block size is
     895             : // included in the estimation.
     896             : //
     897             : // This function does not account for any metablock space usage. Assumes there
     898             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
     899             : // before nor completely after the file's range.
     900             : //
     901             : // Only blocks containing point keys are considered. Range deletion and range
     902             : // key blocks are not considered.
     903             : //
     904             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
     905             : // data blocks overlapped and add that same fraction of the metadata blocks to the
     906             : // estimate.
     907           2 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
     908           2 :         if r.err != nil {
     909           0 :                 return 0, r.err
     910           0 :         }
     911             : 
     912           2 :         indexH, err := r.readIndex(context.Background(), nil, nil, nil)
     913           2 :         if err != nil {
     914           1 :                 return 0, err
     915           1 :         }
     916           2 :         defer indexH.Release()
     917           2 : 
     918           2 :         // Iterators over the bottom-level index blocks containing start and end.
     919           2 :         // These may be different in case of partitioned index but will both point
     920           2 :         // to the same blockIter over the single index in the unpartitioned case.
     921           2 :         var startIdxIter, endIdxIter *rowblk.Iter
     922           2 :         if r.Properties.IndexPartitions == 0 {
     923           2 :                 iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     924           2 :                 if err != nil {
     925           0 :                         return 0, err
     926           0 :                 }
     927           2 :                 startIdxIter = iter
     928           2 :                 endIdxIter = iter
     929           2 :         } else {
     930           2 :                 topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     931           2 :                 if err != nil {
     932           0 :                         return 0, err
     933           0 :                 }
     934             : 
     935           2 :                 kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
     936           2 :                 if kv == nil {
     937           1 :                         // The range falls completely after this file, or an error occurred.
     938           1 :                         return 0, topIter.Error()
     939           1 :                 }
     940           2 :                 startIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     941           2 :                 if err != nil {
     942           0 :                         return 0, errCorruptIndexEntry(err)
     943           0 :                 }
     944           2 :                 startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
     945           2 :                         nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     946           2 :                 if err != nil {
     947           1 :                         return 0, err
     948           1 :                 }
     949           2 :                 defer startIdxBlock.Release()
     950           2 :                 startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
     951           2 :                 if err != nil {
     952           0 :                         return 0, err
     953           0 :                 }
     954             : 
     955           2 :                 kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
     956           2 :                 if kv == nil {
     957           1 :                         if err := topIter.Error(); err != nil {
     958           0 :                                 return 0, err
     959           0 :                         }
     960           2 :                 } else {
     961           2 :                         endIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     962           2 :                         if err != nil {
     963           0 :                                 return 0, errCorruptIndexEntry(err)
     964           0 :                         }
     965           2 :                         endIdxBlock, err := r.readBlock(context.Background(),
     966           2 :                                 endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     967           2 :                         if err != nil {
     968           1 :                                 return 0, err
     969           1 :                         }
     970           2 :                         defer endIdxBlock.Release()
     971           2 :                         endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
     972           2 :                         if err != nil {
     973           0 :                                 return 0, err
     974           0 :                         }
     975             :                 }
     976             :         }
     977             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
     978             :         // range spans past the end of the file.
     979             : 
     980           2 :         kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
     981           2 :         if kv == nil {
     982           2 :                 // The range falls completely after this file, or an error occurred.
     983           2 :                 return 0, startIdxIter.Error()
     984           2 :         }
     985           2 :         startBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     986           2 :         if err != nil {
     987           0 :                 return 0, errCorruptIndexEntry(err)
     988           0 :         }
     989             : 
     990           2 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
     991           2 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
     992           2 :                 // Linearly interpolate what is stored in value blocks.
     993           2 :                 //
     994           2 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
     995           2 :                 // (which contain the value handles, and which may also be insufficient if
     996           2 :                 // the values are in separate files), we will need to accumulate the
     997           2 :                 // logical size of the key-value pairs and store the cumulative value for
     998           2 :                 // each data block in the index block entry. This increases the size of
     999           2 :                 // the BlockHandle, so wait until this becomes necessary.
    1000           2 :                 return dataBlockSize +
    1001           2 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
    1002           2 :                                 float64(r.Properties.ValueBlocksSize))
    1003           2 :         }
    1004           2 :         if endIdxIter == nil {
    1005           1 :                 // The range spans beyond this file. Include data blocks through the last.
    1006           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1007           1 :         }
    1008           2 :         kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
    1009           2 :         if kv == nil {
    1010           2 :                 if err := endIdxIter.Error(); err != nil {
    1011           0 :                         return 0, err
    1012           0 :                 }
    1013             :                 // The range spans beyond this file. Include data blocks through the last.
    1014           2 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1015             :         }
    1016           2 :         endBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
    1017           2 :         if err != nil {
    1018           0 :                 return 0, errCorruptIndexEntry(err)
    1019           0 :         }
    1020           2 :         return includeInterpolatedValueBlocksSize(
    1021           2 :                 endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
    1022             : }
    1023             : 
    1024             : // TableFormat returns the format version for the table.
    1025           2 : func (r *Reader) TableFormat() (TableFormat, error) {
    1026           2 :         if r.err != nil {
    1027           0 :                 return TableFormatUnspecified, r.err
    1028           0 :         }
    1029           2 :         return r.tableFormat, nil
    1030             : }
    1031             : 
    1032             : // NewReader returns a new table reader for the file. Closing the reader will
    1033             : // close the file.
    1034           2 : func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
    1035           2 :         o = o.ensureDefaults()
    1036           2 :         r := &Reader{
    1037           2 :                 readable: f,
    1038           2 :                 opts:     o,
    1039           2 :         }
    1040           2 :         if r.opts.Cache == nil {
    1041           2 :                 r.opts.Cache = cache.New(0)
    1042           2 :         } else {
    1043           2 :                 r.opts.Cache.Ref()
    1044           2 :         }
    1045             : 
    1046           2 :         if f == nil {
    1047           1 :                 r.err = errors.New("pebble/table: nil file")
    1048           1 :                 return nil, r.Close()
    1049           1 :         }
    1050             : 
    1051             :         // Note that the extra options are applied twice. First here for pre-apply
    1052             :         // options, and then below for post-apply options. Pre and post refer to
    1053             :         // before and after reading the metaindex and properties.
    1054           2 :         type preApply interface{ preApply() }
    1055           2 :         for _, opt := range extraOpts {
    1056           2 :                 if _, ok := opt.(preApply); ok {
    1057           2 :                         opt.readerApply(r)
    1058           2 :                 }
    1059             :         }
    1060           2 :         if r.cacheID == 0 {
    1061           2 :                 r.cacheID = r.opts.Cache.NewID()
    1062           2 :         }
    1063             : 
    1064           2 :         var preallocRH objstorageprovider.PreallocatedReadHandle
    1065           2 :         ctx := context.TODO()
    1066           2 :         rh := objstorageprovider.UsePreallocatedReadHandle(
    1067           2 :                 ctx, r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
    1068           2 :         defer rh.Close()
    1069           2 : 
    1070           2 :         footer, err := readFooter(ctx, f, rh)
    1071           2 :         if err != nil {
    1072           1 :                 r.err = err
    1073           1 :                 return nil, r.Close()
    1074           1 :         }
    1075           2 :         r.checksumType = footer.checksum
    1076           2 :         r.tableFormat = footer.format
    1077           2 :         // Read the metaindex and properties blocks.
    1078           2 :         if err := r.readMetaindex(footer.metaindexBH, rh); err != nil {
    1079           1 :                 r.err = err
    1080           1 :                 return nil, r.Close()
    1081           1 :         }
    1082           2 :         r.indexBH = footer.indexBH
    1083           2 :         r.metaIndexBH = footer.metaindexBH
    1084           2 :         r.footerBH = footer.footerBH
    1085           2 : 
    1086           2 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
    1087           2 :                 r.Compare = o.Comparer.Compare
    1088           2 :                 r.Equal = o.Comparer.Equal
    1089           2 :                 r.FormatKey = o.Comparer.FormatKey
    1090           2 :                 r.Split = o.Comparer.Split
    1091           2 :         }
    1092             : 
    1093           2 :         if o.MergerName == r.Properties.MergerName {
    1094           2 :                 r.mergerOK = true
    1095           2 :         }
    1096             : 
    1097             :         // Apply the extra options again now that the comparer and merger names are
    1098             :         // known.
    1099           2 :         for _, opt := range extraOpts {
    1100           2 :                 if _, ok := opt.(preApply); !ok {
    1101           2 :                         opt.readerApply(r)
    1102           2 :                 }
    1103             :         }
    1104             : 
    1105           2 :         if r.Compare == nil {
    1106           1 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
    1107           1 :                         errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName))
    1108           1 :         }
    1109           2 :         if !r.mergerOK {
    1110           1 :                 if name := r.Properties.MergerName; name != "" && name != "nullptr" {
    1111           1 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
    1112           1 :                                 errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName))
    1113           1 :                 }
    1114             :         }
    1115           2 :         if r.err != nil {
    1116           1 :                 return nil, r.Close()
    1117           1 :         }
    1118             : 
    1119           2 :         return r, nil
    1120             : }
    1121             : 
    1122             : // ReadableFile describes the smallest subset of vfs.File that is required for
    1123             : // reading SSTs.
    1124             : type ReadableFile interface {
    1125             :         io.ReaderAt
    1126             :         io.Closer
    1127             :         Stat() (os.FileInfo, error)
    1128             : }
    1129             : 
    1130             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
    1131             : // implementation (which does not support read-ahead)
    1132           2 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
    1133           2 :         info, err := r.Stat()
    1134           2 :         if err != nil {
    1135           1 :                 return nil, err
    1136           1 :         }
    1137           2 :         res := &simpleReadable{
    1138           2 :                 f:    r,
    1139           2 :                 size: info.Size(),
    1140           2 :         }
    1141           2 :         res.rh = objstorage.MakeNoopReadHandle(res)
    1142           2 :         return res, nil
    1143             : }
    1144             : 
    1145             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
    1146             : type simpleReadable struct {
    1147             :         f    ReadableFile
    1148             :         size int64
    1149             :         rh   objstorage.NoopReadHandle
    1150             : }
    1151             : 
    1152             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1153             : 
    1154             : // ReadAt is part of the objstorage.Readable interface.
    1155           2 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1156           2 :         n, err := s.f.ReadAt(p, off)
    1157           2 :         if invariants.Enabled && err == nil && n != len(p) {
    1158           0 :                 panic("short read")
    1159             :         }
    1160           2 :         return err
    1161             : }
    1162             : 
    1163             : // Close is part of the objstorage.Readable interface.
    1164           2 : func (s *simpleReadable) Close() error {
    1165           2 :         return s.f.Close()
    1166           2 : }
    1167             : 
    1168             : // Size is part of the objstorage.Readable interface.
    1169           2 : func (s *simpleReadable) Size() int64 {
    1170           2 :         return s.size
    1171           2 : }
    1172             : 
    1173             : // NewReaddHandle is part of the objstorage.Readable interface.
    1174             : func (s *simpleReadable) NewReadHandle(
    1175             :         ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
    1176           2 : ) objstorage.ReadHandle {
    1177           2 :         return &s.rh
    1178           2 : }
    1179             : 
    1180           0 : func errCorruptIndexEntry(err error) error {
    1181           0 :         err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
    1182           0 :         if invariants.Enabled {
    1183           0 :                 panic(err)
    1184             :         }
    1185           0 :         return err
    1186             : }

Generated by: LCOV version 1.14