LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2024-11-18 08:17Z 9ed54bc4 - tests only.lcov Lines: 647 750 86.3 %
Date: 2024-11-18 08:17:44 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "path/filepath"
      14             :         "runtime"
      15             :         "slices"
      16             :         "time"
      17             : 
      18             :         "github.com/cespare/xxhash/v2"
      19             :         "github.com/cockroachdb/crlib/crtime"
      20             :         "github.com/cockroachdb/crlib/fifo"
      21             :         "github.com/cockroachdb/errors"
      22             :         "github.com/cockroachdb/pebble/internal/base"
      23             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      24             :         "github.com/cockroachdb/pebble/internal/cache"
      25             :         "github.com/cockroachdb/pebble/internal/crc"
      26             :         "github.com/cockroachdb/pebble/internal/invariants"
      27             :         "github.com/cockroachdb/pebble/internal/keyspan"
      28             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      29             :         "github.com/cockroachdb/pebble/objstorage"
      30             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      31             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      32             :         "github.com/cockroachdb/pebble/sstable/block"
      33             :         "github.com/cockroachdb/pebble/sstable/colblk"
      34             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      35             :         "github.com/cockroachdb/pebble/sstable/valblk"
      36             :         "github.com/cockroachdb/pebble/vfs"
      37             : )
      38             : 
      39             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      40             : 
      41             : type loadBlockResult int8
      42             : 
      43             : const (
      44             :         loadBlockOK loadBlockResult = iota
      45             :         // Could be due to error or because no block left to load.
      46             :         loadBlockFailed
      47             :         loadBlockIrrelevant
      48             : )
      49             : 
      50             : // Reader is a table reader.
      51             : type Reader struct {
      52             :         readable objstorage.Readable
      53             : 
      54             :         // The following fields are copied from the ReadOptions.
      55             :         cacheOpts            sstableinternal.CacheOptions
      56             :         keySchema            *colblk.KeySchema
      57             :         loadBlockSema        *fifo.Semaphore
      58             :         deniedUserProperties map[string]struct{}
      59             :         filterMetricsTracker *FilterMetricsTracker
      60             :         logger               base.LoggerAndTracer
      61             : 
      62             :         Comparer *base.Comparer
      63             :         Compare  Compare
      64             :         Equal    Equal
      65             :         Split    Split
      66             : 
      67             :         tableFilter *tableFilterReader
      68             : 
      69             :         err error
      70             : 
      71             :         indexBH      block.Handle
      72             :         filterBH     block.Handle
      73             :         rangeDelBH   block.Handle
      74             :         rangeKeyBH   block.Handle
      75             :         valueBIH     valblk.IndexHandle
      76             :         propertiesBH block.Handle
      77             :         metaindexBH  block.Handle
      78             :         footerBH     block.Handle
      79             : 
      80             :         Properties   Properties
      81             :         tableFormat  TableFormat
      82             :         checksumType block.ChecksumType
      83             : 
      84             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
      85             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
      86             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
      87             :         // capacity 3 to accommodate the meta block (1), and both the compressed
      88             :         // properties block (1) and decompressed properties block (1)
      89             :         // simultaneously.
      90             :         metaBufferPool      block.BufferPool
      91             :         metaBufferPoolAlloc [3]block.AllocedBuffer
      92             : }
      93             : 
      94             : var _ CommonReader = (*Reader)(nil)
      95             : 
      96             : // Close the reader and the underlying objstorage.Readable.
      97           1 : func (r *Reader) Close() error {
      98           1 :         r.cacheOpts.Cache.Unref()
      99           1 : 
     100           1 :         if r.readable != nil {
     101           1 :                 r.err = firstError(r.err, r.readable.Close())
     102           1 :                 r.readable = nil
     103           1 :         }
     104             : 
     105           1 :         if r.err != nil {
     106           1 :                 return r.err
     107           1 :         }
     108             :         // Make any future calls to Get, NewIter or Close return an error.
     109           1 :         r.err = errReaderClosed
     110           1 :         return nil
     111             : }
     112             : 
     113             : // NewPointIter returns an iterator for the point keys in the table.
     114             : //
     115             : // If transform.HideObsoletePoints is set, the callee assumes that filterer
     116             : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
     117             : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     118             : func (r *Reader) NewPointIter(
     119             :         ctx context.Context,
     120             :         transforms IterTransforms,
     121             :         lower, upper []byte,
     122             :         filterer *BlockPropertiesFilterer,
     123             :         filterBlockSizeLimit FilterBlockSizeLimit,
     124             :         stats *base.InternalIteratorStats,
     125             :         statsAccum IterStatsAccumulator,
     126             :         rp ReaderProvider,
     127           1 : ) (Iterator, error) {
     128           1 :         return r.newPointIter(
     129           1 :                 ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
     130           1 :                 stats, statsAccum, rp, nil)
     131           1 : }
     132             : 
     133             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     134             : // before the call to NewPointIter, to get the value of hideObsoletePoints and
     135             : // potentially add a block property filter.
     136             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     137             :         snapshotForHideObsoletePoints base.SeqNum,
     138             :         fileLargestSeqNum base.SeqNum,
     139             :         pointKeyFilters []BlockPropertyFilter,
     140           1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     141           1 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     142           1 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     143           1 :         if hideObsoletePoints {
     144           1 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     145           1 :         }
     146           1 :         return hideObsoletePoints, pointKeyFilters
     147             : }
     148             : 
     149             : func (r *Reader) newPointIter(
     150             :         ctx context.Context,
     151             :         transforms IterTransforms,
     152             :         lower, upper []byte,
     153             :         filterer *BlockPropertiesFilterer,
     154             :         filterBlockSizeLimit FilterBlockSizeLimit,
     155             :         stats *base.InternalIteratorStats,
     156             :         statsAccum IterStatsAccumulator,
     157             :         rp ReaderProvider,
     158             :         vState *virtualState,
     159           1 : ) (Iterator, error) {
     160           1 :         // NB: pebble.tableCache wraps the returned iterator with one which performs
     161           1 :         // reference counting on the Reader, preventing the Reader from being closed
     162           1 :         // until the final iterator closes.
     163           1 :         var res Iterator
     164           1 :         var err error
     165           1 :         if r.Properties.IndexType == twoLevelIndex {
     166           1 :                 if r.tableFormat.BlockColumnar() {
     167           1 :                         res, err = newColumnBlockTwoLevelIterator(
     168           1 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     169           1 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     170           1 :                 } else {
     171           1 :                         res, err = newRowBlockTwoLevelIterator(
     172           1 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     173           1 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     174           1 :                 }
     175           1 :         } else {
     176           1 :                 if r.tableFormat.BlockColumnar() {
     177           1 :                         res, err = newColumnBlockSingleLevelIterator(
     178           1 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     179           1 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     180           1 :                 } else {
     181           1 :                         res, err = newRowBlockSingleLevelIterator(
     182           1 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     183           1 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     184           1 :                 }
     185             :         }
     186           1 :         if err != nil {
     187           1 :                 // Note: we don't want to return res here - it will be a nil
     188           1 :                 // single/twoLevelIterator, not a nil Iterator.
     189           1 :                 return nil, err
     190           1 :         }
     191           1 :         return res, nil
     192             : }
     193             : 
     194             : // NewIter returns an iterator for the point keys in the table. It is a
     195             : // simplified version of NewPointIter and should only be used for tests and
     196             : // tooling.
     197             : //
     198             : // NewIter must only be used when the Reader is guaranteed to outlive any
     199             : // LazyValues returned from the iter.
     200           1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
     201           1 :         // TODO(radu): we should probably not use bloom filters in this case, as there
     202           1 :         // likely isn't a cache set up.
     203           1 :         return r.NewPointIter(
     204           1 :                 context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock,
     205           1 :                 nil /* stats */, nil /* statsAccum */, MakeTrivialReaderProvider(r))
     206           1 : }
     207             : 
     208             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     209             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     210             : // after itself and returns a nil iterator.
     211             : func (r *Reader) NewCompactionIter(
     212             :         transforms IterTransforms,
     213             :         statsAccum IterStatsAccumulator,
     214             :         rp ReaderProvider,
     215             :         bufferPool *block.BufferPool,
     216           1 : ) (Iterator, error) {
     217           1 :         return r.newCompactionIter(transforms, statsAccum, rp, nil, bufferPool)
     218           1 : }
     219             : 
     220             : func (r *Reader) newCompactionIter(
     221             :         transforms IterTransforms,
     222             :         statsAccum IterStatsAccumulator,
     223             :         rp ReaderProvider,
     224             :         vState *virtualState,
     225             :         bufferPool *block.BufferPool,
     226           1 : ) (Iterator, error) {
     227           1 :         if vState != nil && vState.isSharedIngested {
     228           1 :                 transforms.HideObsoletePoints = true
     229           1 :         }
     230           1 :         if r.Properties.IndexType == twoLevelIndex {
     231           1 :                 if !r.tableFormat.BlockColumnar() {
     232           1 :                         i, err := newRowBlockTwoLevelIterator(
     233           1 :                                 context.Background(),
     234           1 :                                 r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     235           1 :                                 NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     236           1 :                         if err != nil {
     237           0 :                                 return nil, err
     238           0 :                         }
     239           1 :                         i.SetupForCompaction()
     240           1 :                         return i, nil
     241             :                 }
     242           1 :                 i, err := newColumnBlockTwoLevelIterator(
     243           1 :                         context.Background(),
     244           1 :                         r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     245           1 :                         NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     246           1 :                 if err != nil {
     247           0 :                         return nil, err
     248           0 :                 }
     249           1 :                 i.SetupForCompaction()
     250           1 :                 return i, nil
     251             :         }
     252           1 :         if !r.tableFormat.BlockColumnar() {
     253           1 :                 i, err := newRowBlockSingleLevelIterator(
     254           1 :                         context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     255           1 :                         nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     256           1 :                 if err != nil {
     257           0 :                         return nil, err
     258           0 :                 }
     259           1 :                 i.SetupForCompaction()
     260           1 :                 return i, nil
     261             :         }
     262           1 :         i, err := newColumnBlockSingleLevelIterator(
     263           1 :                 context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     264           1 :                 nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     265           1 :         if err != nil {
     266           0 :                 return nil, err
     267           0 :         }
     268           1 :         i.SetupForCompaction()
     269           1 :         return i, nil
     270             : }
     271             : 
     272             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     273             : // range-del block for the table. Returns nil if the table does not contain
     274             : // any range deletions.
     275             : func (r *Reader) NewRawRangeDelIter(
     276             :         ctx context.Context, transforms FragmentIterTransforms,
     277           1 : ) (iter keyspan.FragmentIterator, err error) {
     278           1 :         if r.rangeDelBH.Length == 0 {
     279           1 :                 return nil, nil
     280           1 :         }
     281             :         // TODO(radu): plumb stats here.
     282           1 :         h, err := r.readRangeDelBlock(ctx, noEnv, noReadHandle, r.rangeDelBH)
     283           1 :         if err != nil {
     284           1 :                 return nil, err
     285           1 :         }
     286           1 :         if r.tableFormat.BlockColumnar() {
     287           1 :                 iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
     288           1 :         } else {
     289           1 :                 iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareRangeSuffixes, r.Split, h, transforms)
     290           1 :                 if err != nil {
     291           0 :                         return nil, err
     292           0 :                 }
     293             :         }
     294           1 :         return keyspan.MaybeAssert(iter, r.Compare), nil
     295             : }
     296             : 
     297             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     298             : // range-key block for the table. Returns nil if the table does not contain any
     299             : // range keys.
     300             : func (r *Reader) NewRawRangeKeyIter(
     301             :         ctx context.Context, transforms FragmentIterTransforms,
     302           1 : ) (iter keyspan.FragmentIterator, err error) {
     303           1 :         if r.rangeKeyBH.Length == 0 {
     304           1 :                 return nil, nil
     305           1 :         }
     306             :         // TODO(radu): plumb stats here.
     307           1 :         h, err := r.readRangeKeyBlock(ctx, noEnv, noReadHandle, r.rangeKeyBH)
     308           1 :         if err != nil {
     309           1 :                 return nil, err
     310           1 :         }
     311           1 :         if r.tableFormat.BlockColumnar() {
     312           1 :                 iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
     313           1 :         } else {
     314           1 :                 iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareRangeSuffixes, r.Split, h, transforms)
     315           1 :                 if err != nil {
     316           0 :                         return nil, err
     317           0 :                 }
     318             :         }
     319           1 :         return keyspan.MaybeAssert(iter, r.Compare), nil
     320             : }
     321             : 
     322             : // readBlockEnv contains arguments used when reading a block which apply to all
     323             : // the block reads performed by a higher-level operation.
     324             : type readBlockEnv struct {
     325             :         // stats and iterStats are slightly different. stats is a shared struct
     326             :         // supplied from the outside, and represents stats for the whole iterator
     327             :         // tree and can be reset from the outside (e.g. when the pebble.Iterator is
     328             :         // being reused). It is currently only provided when the iterator tree is
     329             :         // rooted at pebble.Iterator. iterStats contains an sstable iterator's
     330             :         // private stats that are reported to a CategoryStatsCollector when this
     331             :         // iterator is closed. In the important code paths, the CategoryStatsCollector
     332             :         // is managed by the tableCacheContainer.
     333             :         Stats     *base.InternalIteratorStats
     334             :         IterStats *iterStatsAccumulator
     335             : 
     336             :         // BufferPool is not-nil if we read blocks into a buffer pool and not into the
     337             :         // cache. This is used during compactions.
     338             :         BufferPool *block.BufferPool
     339             : }
     340             : 
     341             : // BlockServedFromCache updates the stats when a block was found in the cache.
     342           1 : func (env *readBlockEnv) BlockServedFromCache(blockLength uint64) {
     343           1 :         if env.Stats != nil {
     344           1 :                 env.Stats.BlockBytes += blockLength
     345           1 :                 env.Stats.BlockBytesInCache += blockLength
     346           1 :         }
     347           1 :         if env.IterStats != nil {
     348           1 :                 env.IterStats.reportStats(blockLength, blockLength, 0)
     349           1 :         }
     350             : }
     351             : 
     352             : // BlockRead updates the stats when a block had to be read.
     353           1 : func (env *readBlockEnv) BlockRead(blockLength uint64, readDuration time.Duration) {
     354           1 :         if env.Stats != nil {
     355           1 :                 env.Stats.BlockBytes += blockLength
     356           1 :                 env.Stats.BlockReadDuration += readDuration
     357           1 :         }
     358           1 :         if env.IterStats != nil {
     359           1 :                 env.IterStats.reportStats(blockLength, 0, readDuration)
     360           1 :         }
     361             : }
     362             : 
     363             : // noEnv is the empty readBlockEnv which reports no stats and does not use a
     364             : // buffer pool.
     365             : var noEnv = readBlockEnv{}
     366             : 
     367             : // noReadHandle is used when we don't want to pass a ReadHandle to one of the
     368             : // read block methods.
     369             : var noReadHandle objstorage.ReadHandle = nil
     370             : 
     371           1 : var noInitBlockMetadataFn = func(*block.Metadata, []byte) error { return nil }
     372             : 
     373             : // readMetaindexBlock reads the metaindex block.
     374             : func (r *Reader) readMetaindexBlock(
     375             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle,
     376           1 : ) (block.BufferHandle, error) {
     377           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     378           1 :         return r.readBlockInternal(ctx, env, readHandle, r.metaindexBH, noInitBlockMetadataFn)
     379           1 : }
     380             : 
     381             : // readTopLevelIndexBlock reads the top-level index block.
     382             : func (r *Reader) readTopLevelIndexBlock(
     383             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle,
     384           1 : ) (block.BufferHandle, error) {
     385           1 :         return r.readIndexBlock(ctx, env, readHandle, r.indexBH)
     386           1 : }
     387             : 
     388             : // readIndexBlock reads a top-level or second-level index block.
     389             : func (r *Reader) readIndexBlock(
     390             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     391           1 : ) (block.BufferHandle, error) {
     392           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     393           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initIndexBlockMetadata)
     394           1 : }
     395             : 
     396             : // initIndexBlockMetadata initializes the Metadata for a data block. This will
     397             : // later be used (and reused) when reading from the block.
     398           1 : func (r *Reader) initIndexBlockMetadata(metadata *block.Metadata, data []byte) error {
     399           1 :         if r.tableFormat.BlockColumnar() {
     400           1 :                 return colblk.InitIndexBlockMetadata(metadata, data)
     401           1 :         }
     402           1 :         return nil
     403             : }
     404             : 
     405             : func (r *Reader) readDataBlock(
     406             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     407           1 : ) (block.BufferHandle, error) {
     408           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.DataBlock)
     409           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initDataBlockMetadata)
     410           1 : }
     411             : 
     412             : // initDataBlockMetadata initializes the Metadata for a data block. This will
     413             : // later be used (and reused) when reading from the block.
     414           1 : func (r *Reader) initDataBlockMetadata(metadata *block.Metadata, data []byte) error {
     415           1 :         if r.tableFormat.BlockColumnar() {
     416           1 :                 return colblk.InitDataBlockMetadata(r.keySchema, metadata, data)
     417           1 :         }
     418           1 :         return nil
     419             : }
     420             : 
     421             : func (r *Reader) readFilterBlock(
     422             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     423           1 : ) (block.BufferHandle, error) {
     424           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     425           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, noInitBlockMetadataFn)
     426           1 : }
     427             : 
     428             : func (r *Reader) readRangeDelBlock(
     429             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     430           1 : ) (block.BufferHandle, error) {
     431           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     432           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initKeyspanBlockMetadata)
     433           1 : }
     434             : 
     435             : func (r *Reader) readRangeKeyBlock(
     436             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     437           1 : ) (block.BufferHandle, error) {
     438           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     439           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initKeyspanBlockMetadata)
     440           1 : }
     441             : 
     442             : // initKeyspanBlockMetadata initializes the Metadata for a rangedel or range key
     443             : // block. This will later be used (and reused) when reading from the block.
     444           1 : func (r *Reader) initKeyspanBlockMetadata(metadata *block.Metadata, data []byte) error {
     445           1 :         if r.tableFormat.BlockColumnar() {
     446           1 :                 return colblk.InitKeyspanBlockMetadata(metadata, data)
     447           1 :         }
     448           1 :         return nil
     449             : }
     450             : 
     451             : func (r *Reader) readValueBlock(
     452             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     453           1 : ) (block.BufferHandle, error) {
     454           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.ValueBlock)
     455           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, noInitBlockMetadataFn)
     456           1 : }
     457             : 
     458             : func checkChecksum(
     459             :         checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
     460           1 : ) error {
     461           1 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     462           1 :         var computedChecksum uint32
     463           1 :         switch checksumType {
     464           1 :         case block.ChecksumTypeCRC32c:
     465           1 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     466           1 :         case block.ChecksumTypeXXHash64:
     467           1 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     468           0 :         default:
     469           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     470             :         }
     471             : 
     472           1 :         if expectedChecksum != computedChecksum {
     473           1 :                 return base.CorruptionErrorf(
     474           1 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     475           1 :                         fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
     476           1 :         }
     477           1 :         return nil
     478             : }
     479             : 
     480             : // DeterministicReadBlockDurationForTesting is for tests that want a
     481             : // deterministic value of the time to read a block (that is not in the cache).
     482             : // The return value is a function that must be called before the test exits.
     483           1 : func DeterministicReadBlockDurationForTesting() func() {
     484           1 :         drbdForTesting := deterministicReadBlockDurationForTesting
     485           1 :         deterministicReadBlockDurationForTesting = true
     486           1 :         return func() {
     487           1 :                 deterministicReadBlockDurationForTesting = drbdForTesting
     488           1 :         }
     489             : }
     490             : 
     491             : var deterministicReadBlockDurationForTesting = false
     492             : 
     493             : // readBlockInternal should not be used directly; one of the read*Block methods
     494             : // should be used instead.
     495             : func (r *Reader) readBlockInternal(
     496             :         ctx context.Context,
     497             :         env readBlockEnv,
     498             :         readHandle objstorage.ReadHandle,
     499             :         bh block.Handle,
     500             :         initBlockMetadataFn func(*block.Metadata, []byte) error,
     501           1 : ) (handle block.BufferHandle, _ error) {
     502           1 :         if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Valid() {
     503           1 :                 // Cache hit.
     504           1 :                 if readHandle != nil {
     505           1 :                         readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
     506           1 :                 }
     507           1 :                 env.BlockServedFromCache(bh.Length)
     508           1 :                 return block.CacheBufferHandle(h), nil
     509             :         }
     510             : 
     511             :         // Cache miss.
     512             : 
     513           1 :         if sema := r.loadBlockSema; sema != nil {
     514           1 :                 if err := sema.Acquire(ctx, 1); err != nil {
     515           0 :                         // An error here can only come from the context.
     516           0 :                         return block.BufferHandle{}, err
     517           0 :                 }
     518           1 :                 defer sema.Release(1)
     519             :         }
     520             : 
     521           1 :         compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool)
     522           1 :         readStopwatch := makeStopwatch()
     523           1 :         var err error
     524           1 :         if readHandle != nil {
     525           1 :                 err = readHandle.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
     526           1 :         } else {
     527           1 :                 err = r.readable.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
     528           1 :         }
     529           1 :         readDuration := readStopwatch.stop()
     530           1 :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     531           1 :         // interface{}, unless necessary.
     532           1 :         if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
     533           1 :                 _, file1, line1, _ := runtime.Caller(1)
     534           1 :                 _, file2, line2, _ := runtime.Caller(2)
     535           1 :                 r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
     536           1 :                         int(bh.Length+block.TrailerLen), readDuration.String(),
     537           1 :                         r.cacheOpts.FileNum,
     538           1 :                         filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
     539           1 :                         filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
     540           1 :         }
     541           1 :         if err != nil {
     542           1 :                 compressed.Release()
     543           1 :                 return block.BufferHandle{}, err
     544           1 :         }
     545           1 :         env.BlockRead(bh.Length, readDuration)
     546           1 :         if err := checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil {
     547           1 :                 compressed.Release()
     548           1 :                 return block.BufferHandle{}, err
     549           1 :         }
     550             : 
     551           1 :         typ := block.CompressionIndicator(compressed.BlockData()[bh.Length])
     552           1 :         compressed.Truncate(int(bh.Length))
     553           1 : 
     554           1 :         var decompressed block.Value
     555           1 :         if typ == block.NoCompressionIndicator {
     556           1 :                 decompressed = compressed
     557           1 :         } else {
     558           1 :                 // Decode the length of the decompressed value.
     559           1 :                 decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData())
     560           1 :                 if err != nil {
     561           0 :                         compressed.Release()
     562           0 :                         return block.BufferHandle{}, err
     563           0 :                 }
     564             : 
     565           1 :                 decompressed = block.Alloc(decodedLen, env.BufferPool)
     566           1 :                 err = block.DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData())
     567           1 :                 compressed.Release()
     568           1 :                 if err != nil {
     569           0 :                         decompressed.Release()
     570           0 :                         return block.BufferHandle{}, err
     571           0 :                 }
     572             :         }
     573           1 :         if err := initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil {
     574           0 :                 decompressed.Release()
     575           0 :                 return block.BufferHandle{}, err
     576           0 :         }
     577           1 :         h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     578           1 :         return h, nil
     579             : }
     580             : 
     581             : func (r *Reader) readMetaindex(
     582             :         ctx context.Context, readHandle objstorage.ReadHandle, filters map[string]FilterPolicy,
     583           1 : ) error {
     584           1 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     585           1 :         // populating the block cache with these blocks. In heavy-write workloads,
     586           1 :         // especially with high compaction concurrency, new tables may be created
     587           1 :         // frequently. Populating the block cache with these metaindex blocks adds
     588           1 :         // additional contention on the block cache mutexes (see #1997).
     589           1 :         // Additionally, these blocks are exceedingly unlikely to be read again
     590           1 :         // while they're still in the block cache except in misconfigurations with
     591           1 :         // excessive sstables counts or a table cache that's far too small.
     592           1 :         r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
     593           1 :         // When we're finished, release the buffers we've allocated back to memory
     594           1 :         // allocator. We don't expect to use metaBufferPool again.
     595           1 :         defer r.metaBufferPool.Release()
     596           1 :         metaEnv := readBlockEnv{
     597           1 :                 BufferPool: &r.metaBufferPool,
     598           1 :         }
     599           1 : 
     600           1 :         b, err := r.readMetaindexBlock(ctx, metaEnv, readHandle)
     601           1 :         if err != nil {
     602           1 :                 return err
     603           1 :         }
     604           1 :         data := b.BlockData()
     605           1 :         defer b.Release()
     606           1 : 
     607           1 :         if uint64(len(data)) != r.metaindexBH.Length {
     608           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     609           0 :                         errors.Safe(len(data)), errors.Safe(r.metaindexBH.Length))
     610           0 :         }
     611             : 
     612           1 :         var meta map[string]block.Handle
     613           1 :         meta, r.valueBIH, err = decodeMetaindex(data)
     614           1 :         if err != nil {
     615           0 :                 return err
     616           0 :         }
     617             : 
     618           1 :         if bh, ok := meta[metaPropertiesName]; ok {
     619           1 :                 b, err = r.readBlockInternal(ctx, metaEnv, readHandle, bh, noInitBlockMetadataFn)
     620           1 :                 if err != nil {
     621           1 :                         return err
     622           1 :                 }
     623           1 :                 r.propertiesBH = bh
     624           1 :                 err := r.Properties.load(b.BlockData(), r.deniedUserProperties)
     625           1 :                 b.Release()
     626           1 :                 if err != nil {
     627           0 :                         return err
     628           0 :                 }
     629             :         }
     630             : 
     631           1 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     632           1 :                 r.rangeDelBH = bh
     633           1 :         } else if _, ok := meta[metaRangeDelV1Name]; ok {
     634           0 :                 // This version of Pebble requires a format major version at least as
     635           0 :                 // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
     636           0 :                 // this format major verison, we have a guarantee that we've compacted
     637           0 :                 // away all RocksDB sstables. It should not be possible to encounter an
     638           0 :                 // sstable with a v1 range deletion block but not a v2 range deletion
     639           0 :                 // block.
     640           0 :                 err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
     641           0 :                 return errors.Mark(err, base.ErrCorruption)
     642           0 :         }
     643             : 
     644           1 :         if bh, ok := meta[metaRangeKeyName]; ok {
     645           1 :                 r.rangeKeyBH = bh
     646           1 :         }
     647             : 
     648           1 :         for name, fp := range filters {
     649           1 :                 if bh, ok := meta["fullfilter."+name]; ok {
     650           1 :                         r.filterBH = bh
     651           1 :                         r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
     652           1 :                         break
     653             :                 }
     654             :         }
     655           1 :         return nil
     656             : }
     657             : 
     658             : // Layout returns the layout (block organization) for an sstable.
     659           1 : func (r *Reader) Layout() (*Layout, error) {
     660           1 :         if r.err != nil {
     661           0 :                 return nil, r.err
     662           0 :         }
     663             : 
     664           1 :         l := &Layout{
     665           1 :                 Data:       make([]block.HandleWithProperties, 0, r.Properties.NumDataBlocks),
     666           1 :                 RangeDel:   r.rangeDelBH,
     667           1 :                 RangeKey:   r.rangeKeyBH,
     668           1 :                 ValueIndex: r.valueBIH.Handle,
     669           1 :                 Properties: r.propertiesBH,
     670           1 :                 MetaIndex:  r.metaindexBH,
     671           1 :                 Footer:     r.footerBH,
     672           1 :                 Format:     r.tableFormat,
     673           1 :         }
     674           1 :         if r.filterBH.Length > 0 {
     675           1 :                 l.Filter = []NamedBlockHandle{{Name: "fullfilter." + r.tableFilter.policy.Name(), Handle: r.filterBH}}
     676           1 :         }
     677           1 :         ctx := context.TODO()
     678           1 : 
     679           1 :         indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, noReadHandle)
     680           1 :         if err != nil {
     681           1 :                 return nil, err
     682           1 :         }
     683           1 :         defer indexH.Release()
     684           1 : 
     685           1 :         var alloc bytealloc.A
     686           1 : 
     687           1 :         if r.Properties.IndexPartitions == 0 {
     688           1 :                 l.Index = append(l.Index, r.indexBH)
     689           1 :                 iter := r.tableFormat.newIndexIter()
     690           1 :                 err := iter.Init(r.Compare, r.Split, indexH.BlockData(), NoTransforms)
     691           1 :                 if err != nil {
     692           0 :                         return nil, errors.Wrap(err, "reading index block")
     693           0 :                 }
     694           1 :                 for valid := iter.First(); valid; valid = iter.Next() {
     695           1 :                         dataBH, err := iter.BlockHandleWithProperties()
     696           1 :                         if err != nil {
     697           0 :                                 return nil, errCorruptIndexEntry(err)
     698           0 :                         }
     699           1 :                         if len(dataBH.Props) > 0 {
     700           1 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     701           1 :                         }
     702           1 :                         l.Data = append(l.Data, dataBH)
     703             :                 }
     704           1 :         } else {
     705           1 :                 l.TopIndex = r.indexBH
     706           1 :                 topIter := r.tableFormat.newIndexIter()
     707           1 :                 err := topIter.Init(r.Compare, r.Split, indexH.BlockData(), NoTransforms)
     708           1 :                 if err != nil {
     709           0 :                         return nil, errors.Wrap(err, "reading index block")
     710           0 :                 }
     711           1 :                 iter := r.tableFormat.newIndexIter()
     712           1 :                 for valid := topIter.First(); valid; valid = topIter.Next() {
     713           1 :                         indexBH, err := topIter.BlockHandleWithProperties()
     714           1 :                         if err != nil {
     715           0 :                                 return nil, errCorruptIndexEntry(err)
     716           0 :                         }
     717           1 :                         l.Index = append(l.Index, indexBH.Handle)
     718           1 : 
     719           1 :                         subIndex, err := r.readIndexBlock(ctx, noEnv, noReadHandle, indexBH.Handle)
     720           1 :                         if err != nil {
     721           1 :                                 return nil, err
     722           1 :                         }
     723           1 :                         err = func() error {
     724           1 :                                 defer subIndex.Release()
     725           1 :                                 // TODO(msbutler): figure out how to pass virtualState to layout call.
     726           1 :                                 if err := iter.Init(r.Compare, r.Split, subIndex.BlockData(), NoTransforms); err != nil {
     727           0 :                                         return err
     728           0 :                                 }
     729           1 :                                 for valid := iter.First(); valid; valid = iter.Next() {
     730           1 :                                         dataBH, err := iter.BlockHandleWithProperties()
     731           1 :                                         if err != nil {
     732           0 :                                                 return errCorruptIndexEntry(err)
     733           0 :                                         }
     734           1 :                                         if len(dataBH.Props) > 0 {
     735           1 :                                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     736           1 :                                         }
     737           1 :                                         l.Data = append(l.Data, dataBH)
     738             :                                 }
     739           1 :                                 return nil
     740             :                         }()
     741           1 :                         if err != nil {
     742           0 :                                 return nil, err
     743           0 :                         }
     744             :                 }
     745             :         }
     746           1 :         if r.valueBIH.Handle.Length != 0 {
     747           1 :                 vbiH, err := r.readValueBlock(context.Background(), noEnv, noReadHandle, r.valueBIH.Handle)
     748           1 :                 if err != nil {
     749           0 :                         return nil, err
     750           0 :                 }
     751           1 :                 defer vbiH.Release()
     752           1 :                 l.ValueBlock, err = decodeValueBlockIndex(vbiH.BlockData(), r.valueBIH)
     753           1 :                 if err != nil {
     754           0 :                         return nil, err
     755           0 :                 }
     756             :         }
     757             : 
     758           1 :         return l, nil
     759             : }
     760             : 
     761             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
     762           1 : func (r *Reader) ValidateBlockChecksums() error {
     763           1 :         // Pre-compute the BlockHandles for the underlying file.
     764           1 :         l, err := r.Layout()
     765           1 :         if err != nil {
     766           1 :                 return err
     767           1 :         }
     768             : 
     769           1 :         type blk struct {
     770           1 :                 bh     block.Handle
     771           1 :                 readFn func(context.Context, readBlockEnv, objstorage.ReadHandle, block.Handle) (block.BufferHandle, error)
     772           1 :         }
     773           1 :         // Construct the set of blocks to check. Note that the footer is not checked
     774           1 :         // as it is not a block with a checksum.
     775           1 :         blocks := make([]blk, 0, len(l.Data)+6)
     776           1 :         for i := range l.Data {
     777           1 :                 blocks = append(blocks, blk{
     778           1 :                         bh:     l.Data[i].Handle,
     779           1 :                         readFn: r.readDataBlock,
     780           1 :                 })
     781           1 :         }
     782           1 :         for _, h := range l.Index {
     783           1 :                 blocks = append(blocks, blk{
     784           1 :                         bh:     h,
     785           1 :                         readFn: r.readIndexBlock,
     786           1 :                 })
     787           1 :         }
     788           1 :         blocks = append(blocks, blk{
     789           1 :                 bh:     l.TopIndex,
     790           1 :                 readFn: r.readIndexBlock,
     791           1 :         })
     792           1 :         for _, bh := range l.Filter {
     793           1 :                 blocks = append(blocks, blk{
     794           1 :                         bh:     bh.Handle,
     795           1 :                         readFn: r.readFilterBlock,
     796           1 :                 })
     797           1 :         }
     798           1 :         blocks = append(blocks, blk{
     799           1 :                 bh:     l.RangeDel,
     800           1 :                 readFn: r.readRangeDelBlock,
     801           1 :         })
     802           1 :         blocks = append(blocks, blk{
     803           1 :                 bh:     l.RangeKey,
     804           1 :                 readFn: r.readRangeKeyBlock,
     805           1 :         })
     806           1 :         readNoInit := func(ctx context.Context, env readBlockEnv, rh objstorage.ReadHandle, bh block.Handle) (block.BufferHandle, error) {
     807           1 :                 return r.readBlockInternal(ctx, env, rh, bh, noInitBlockMetadataFn)
     808           1 :         }
     809           1 :         blocks = append(blocks, blk{
     810           1 :                 bh:     l.Properties,
     811           1 :                 readFn: readNoInit,
     812           1 :         })
     813           1 :         blocks = append(blocks, blk{
     814           1 :                 bh:     l.MetaIndex,
     815           1 :                 readFn: readNoInit,
     816           1 :         })
     817           1 : 
     818           1 :         // Sorting by offset ensures we are performing a sequential scan of the
     819           1 :         // file.
     820           1 :         slices.SortFunc(blocks, func(a, b blk) int {
     821           1 :                 return cmp.Compare(a.bh.Offset, b.bh.Offset)
     822           1 :         })
     823             : 
     824           1 :         ctx := context.Background()
     825           1 :         for _, b := range blocks {
     826           1 :                 // Certain blocks may not be present, in which case we skip them.
     827           1 :                 if b.bh.Length == 0 {
     828           1 :                         continue
     829             :                 }
     830           1 :                 h, err := b.readFn(ctx, noEnv, noReadHandle, b.bh)
     831           1 :                 if err != nil {
     832           1 :                         return err
     833           1 :                 }
     834           1 :                 h.Release()
     835             :         }
     836             : 
     837           1 :         return nil
     838             : }
     839             : 
     840             : // CommonProperties implemented the CommonReader interface.
     841           1 : func (r *Reader) CommonProperties() *CommonProperties {
     842           1 :         return &r.Properties.CommonProperties
     843           1 : }
     844             : 
     845             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
     846             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
     847             : // determine overlap due to abbreviated index keys, the full data block size is
     848             : // included in the estimation.
     849             : //
     850             : // This function does not account for any metablock space usage. Assumes there
     851             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
     852             : // before nor completely after the file's range.
     853             : //
     854             : // Only blocks containing point keys are considered. Range deletion and range
     855             : // key blocks are not considered.
     856             : //
     857             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
     858             : // data blocks overlapped and add that same fraction of the metadata blocks to the
     859             : // estimate.
     860           1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
     861           1 :         if !r.tableFormat.BlockColumnar() {
     862           1 :                 return estimateDiskUsage[rowblk.IndexIter, *rowblk.IndexIter](r, start, end)
     863           1 :         }
     864           1 :         return estimateDiskUsage[colblk.IndexIter, *colblk.IndexIter](r, start, end)
     865             : }
     866             : 
     867             : func estimateDiskUsage[I any, PI indexBlockIterator[I]](
     868             :         r *Reader, start, end []byte,
     869           1 : ) (uint64, error) {
     870           1 :         if r.err != nil {
     871           0 :                 return 0, r.err
     872           0 :         }
     873           1 :         ctx := context.TODO()
     874           1 : 
     875           1 :         indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, noReadHandle)
     876           1 :         if err != nil {
     877           1 :                 return 0, err
     878           1 :         }
     879             :         // We are using InitHandle below but we never Close those iterators, which
     880             :         // allows us to release the index handle ourselves.
     881             :         // TODO(radu): clean this up.
     882           1 :         defer indexH.Release()
     883           1 : 
     884           1 :         // Iterators over the bottom-level index blocks containing start and end.
     885           1 :         // These may be different in case of partitioned index but will both point
     886           1 :         // to the same blockIter over the single index in the unpartitioned case.
     887           1 :         var startIdxIter, endIdxIter PI
     888           1 :         if r.Properties.IndexPartitions == 0 {
     889           1 :                 startIdxIter = new(I)
     890           1 :                 if err := startIdxIter.InitHandle(r.Compare, r.Split, indexH, NoTransforms); err != nil {
     891           0 :                         return 0, err
     892           0 :                 }
     893           1 :                 endIdxIter = startIdxIter
     894           1 :         } else {
     895           1 :                 var topIter PI = new(I)
     896           1 :                 if err := topIter.InitHandle(r.Compare, r.Split, indexH, NoTransforms); err != nil {
     897           0 :                         return 0, err
     898           0 :                 }
     899           1 :                 if !topIter.SeekGE(start) {
     900           0 :                         // The range falls completely after this file.
     901           0 :                         return 0, nil
     902           0 :                 }
     903           1 :                 startIndexBH, err := topIter.BlockHandleWithProperties()
     904           1 :                 if err != nil {
     905           0 :                         return 0, errCorruptIndexEntry(err)
     906           0 :                 }
     907           1 :                 startIdxBlock, err := r.readIndexBlock(ctx, noEnv, noReadHandle, startIndexBH.Handle)
     908           1 :                 if err != nil {
     909           1 :                         return 0, err
     910           1 :                 }
     911           1 :                 defer startIdxBlock.Release()
     912           1 :                 startIdxIter = new(I)
     913           1 :                 err = startIdxIter.InitHandle(r.Compare, r.Split, startIdxBlock, NoTransforms)
     914           1 :                 if err != nil {
     915           0 :                         return 0, err
     916           0 :                 }
     917             : 
     918           1 :                 if topIter.SeekGE(end) {
     919           1 :                         endIndexBH, err := topIter.BlockHandleWithProperties()
     920           1 :                         if err != nil {
     921           0 :                                 return 0, errCorruptIndexEntry(err)
     922           0 :                         }
     923           1 :                         endIdxBlock, err := r.readIndexBlock(ctx, noEnv, noReadHandle, endIndexBH.Handle)
     924           1 :                         if err != nil {
     925           1 :                                 return 0, err
     926           1 :                         }
     927           1 :                         defer endIdxBlock.Release()
     928           1 :                         endIdxIter = new(I)
     929           1 :                         err = endIdxIter.InitHandle(r.Compare, r.Split, endIdxBlock, NoTransforms)
     930           1 :                         if err != nil {
     931           0 :                                 return 0, err
     932           0 :                         }
     933             :                 }
     934             :         }
     935             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
     936             :         // range spans past the end of the file.
     937             : 
     938           1 :         if !startIdxIter.SeekGE(start) {
     939           1 :                 // The range falls completely after this file.
     940           1 :                 return 0, nil
     941           1 :         }
     942           1 :         startBH, err := startIdxIter.BlockHandleWithProperties()
     943           1 :         if err != nil {
     944           0 :                 return 0, errCorruptIndexEntry(err)
     945           0 :         }
     946             : 
     947           1 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
     948           1 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
     949           1 :                 // Linearly interpolate what is stored in value blocks.
     950           1 :                 //
     951           1 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
     952           1 :                 // (which contain the value handles, and which may also be insufficient if
     953           1 :                 // the values are in separate files), we will need to accumulate the
     954           1 :                 // logical size of the key-value pairs and store the cumulative value for
     955           1 :                 // each data block in the index block entry. This increases the size of
     956           1 :                 // the BlockHandle, so wait until this becomes necessary.
     957           1 :                 return dataBlockSize +
     958           1 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
     959           1 :                                 float64(r.Properties.ValueBlocksSize))
     960           1 :         }
     961           1 :         if endIdxIter == nil {
     962           0 :                 // The range spans beyond this file. Include data blocks through the last.
     963           0 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
     964           0 :         }
     965           1 :         if !endIdxIter.SeekGE(end) {
     966           1 :                 // The range spans beyond this file. Include data blocks through the last.
     967           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
     968           1 :         }
     969           1 :         endBH, err := endIdxIter.BlockHandleWithProperties()
     970           1 :         if err != nil {
     971           0 :                 return 0, errCorruptIndexEntry(err)
     972           0 :         }
     973           1 :         return includeInterpolatedValueBlocksSize(
     974           1 :                 endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
     975             : }
     976             : 
     977             : // TableFormat returns the format version for the table.
     978           1 : func (r *Reader) TableFormat() (TableFormat, error) {
     979           1 :         if r.err != nil {
     980           0 :                 return TableFormatUnspecified, r.err
     981           0 :         }
     982           1 :         return r.tableFormat, nil
     983             : }
     984             : 
     985             : // NewReader returns a new table reader for the file. Closing the reader will
     986             : // close the file.
     987             : //
     988             : // The context is used for tracing any operations performed by NewReader; it is
     989             : // NOT stored for future use.
     990           1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
     991           1 :         if f == nil {
     992           1 :                 return nil, errors.New("pebble/table: nil file")
     993           1 :         }
     994           1 :         o = o.ensureDefaults()
     995           1 :         r := &Reader{
     996           1 :                 readable:             f,
     997           1 :                 cacheOpts:            o.internal.CacheOpts,
     998           1 :                 loadBlockSema:        o.LoadBlockSema,
     999           1 :                 deniedUserProperties: o.DeniedUserProperties,
    1000           1 :                 filterMetricsTracker: o.FilterMetricsTracker,
    1001           1 :                 logger:               o.LoggerAndTracer,
    1002           1 :         }
    1003           1 :         if r.cacheOpts.Cache == nil {
    1004           1 :                 r.cacheOpts.Cache = cache.New(0)
    1005           1 :         } else {
    1006           1 :                 r.cacheOpts.Cache.Ref()
    1007           1 :         }
    1008           1 :         if r.cacheOpts.CacheID == 0 {
    1009           1 :                 r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
    1010           1 :         }
    1011             : 
    1012           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
    1013           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
    1014           1 :                 r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
    1015           1 :         defer rh.Close()
    1016           1 : 
    1017           1 :         footer, err := readFooter(ctx, f, rh, r.logger, r.cacheOpts.FileNum)
    1018           1 :         if err != nil {
    1019           1 :                 r.err = err
    1020           1 :                 return nil, r.Close()
    1021           1 :         }
    1022           1 :         r.checksumType = footer.checksum
    1023           1 :         r.tableFormat = footer.format
    1024           1 :         r.indexBH = footer.indexBH
    1025           1 :         r.metaindexBH = footer.metaindexBH
    1026           1 :         r.footerBH = footer.footerBH
    1027           1 :         // Read the metaindex and properties blocks.
    1028           1 :         if err := r.readMetaindex(ctx, rh, o.Filters); err != nil {
    1029           1 :                 r.err = err
    1030           1 :                 return nil, r.Close()
    1031           1 :         }
    1032             : 
    1033           1 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
    1034           1 :                 r.Comparer = o.Comparer
    1035           1 :                 r.Compare = o.Comparer.Compare
    1036           1 :                 r.Equal = o.Comparer.Equal
    1037           1 :                 r.Split = o.Comparer.Split
    1038           1 :         } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
    1039           1 :                 r.Comparer = o.Comparer
    1040           1 :                 r.Compare = comparer.Compare
    1041           1 :                 r.Equal = comparer.Equal
    1042           1 :                 r.Split = comparer.Split
    1043           1 :         } else {
    1044           1 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
    1045           1 :                         errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
    1046           1 :         }
    1047             : 
    1048           1 :         if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
    1049           1 :                 if o.Merger != nil && o.Merger.Name == mergerName {
    1050           1 :                         // opts.Merger matches.
    1051           1 :                 } else if _, ok := o.Mergers[mergerName]; ok {
    1052           1 :                         // Known merger.
    1053           1 :                 } else {
    1054           1 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
    1055           1 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
    1056           1 :                 }
    1057             :         }
    1058             : 
    1059           1 :         if r.tableFormat.BlockColumnar() {
    1060           1 :                 if ks, ok := o.KeySchemas[r.Properties.KeySchemaName]; ok {
    1061           1 :                         r.keySchema = ks
    1062           1 :                 } else {
    1063           0 :                         var known []string
    1064           0 :                         for name := range o.KeySchemas {
    1065           0 :                                 known = append(known, fmt.Sprintf("%q", name))
    1066           0 :                         }
    1067           0 :                         slices.Sort(known)
    1068           0 : 
    1069           0 :                         r.err = errors.Newf("pebble/table: %d: unknown key schema %q; known key schemas: %s",
    1070           0 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.KeySchemaName), errors.Safe(known))
    1071           0 :                         panic(r.err)
    1072             :                 }
    1073             :         }
    1074             : 
    1075           1 :         if r.err != nil {
    1076           1 :                 return nil, r.Close()
    1077           1 :         }
    1078             : 
    1079           1 :         return r, nil
    1080             : }
    1081             : 
    1082             : // ReadableFile describes the smallest subset of vfs.File that is required for
    1083             : // reading SSTs.
    1084             : type ReadableFile interface {
    1085             :         io.ReaderAt
    1086             :         io.Closer
    1087             :         Stat() (vfs.FileInfo, error)
    1088             : }
    1089             : 
    1090             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
    1091             : // implementation (which does not support read-ahead)
    1092           1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
    1093           1 :         info, err := r.Stat()
    1094           1 :         if err != nil {
    1095           1 :                 return nil, err
    1096           1 :         }
    1097           1 :         res := &simpleReadable{
    1098           1 :                 f:    r,
    1099           1 :                 size: info.Size(),
    1100           1 :         }
    1101           1 :         res.rh = objstorage.MakeNoopReadHandle(res)
    1102           1 :         return res, nil
    1103             : }
    1104             : 
    1105             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
    1106             : type simpleReadable struct {
    1107             :         f    ReadableFile
    1108             :         size int64
    1109             :         rh   objstorage.NoopReadHandle
    1110             : }
    1111             : 
    1112             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1113             : 
    1114             : // ReadAt is part of the objstorage.Readable interface.
    1115           1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1116           1 :         n, err := s.f.ReadAt(p, off)
    1117           1 :         if invariants.Enabled && err == nil && n != len(p) {
    1118           0 :                 panic("short read")
    1119             :         }
    1120           1 :         return err
    1121             : }
    1122             : 
    1123             : // Close is part of the objstorage.Readable interface.
    1124           1 : func (s *simpleReadable) Close() error {
    1125           1 :         return s.f.Close()
    1126           1 : }
    1127             : 
    1128             : // Size is part of the objstorage.Readable interface.
    1129           1 : func (s *simpleReadable) Size() int64 {
    1130           1 :         return s.size
    1131           1 : }
    1132             : 
    1133             : // NewReadHandle is part of the objstorage.Readable interface.
    1134             : func (s *simpleReadable) NewReadHandle(
    1135             :         readBeforeSize objstorage.ReadBeforeSize,
    1136           1 : ) objstorage.ReadHandle {
    1137           1 :         return &s.rh
    1138           1 : }
    1139             : 
    1140           0 : func errCorruptIndexEntry(err error) error {
    1141           0 :         err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
    1142           0 :         if invariants.Enabled {
    1143           0 :                 panic(err)
    1144             :         }
    1145           0 :         return err
    1146             : }
    1147             : 
    1148             : type deterministicStopwatchForTesting struct {
    1149             :         startTime crtime.Mono
    1150             : }
    1151             : 
    1152           1 : func makeStopwatch() deterministicStopwatchForTesting {
    1153           1 :         return deterministicStopwatchForTesting{startTime: crtime.NowMono()}
    1154           1 : }
    1155             : 
    1156           1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
    1157           1 :         dur := w.startTime.Elapsed()
    1158           1 :         if deterministicReadBlockDurationForTesting {
    1159           1 :                 dur = slowReadTracingThreshold
    1160           1 :         }
    1161           1 :         return dur
    1162             : }

Generated by: LCOV version 1.14