LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2025-01-07 08:17Z 28edac9f - tests + meta.lcov Lines: 690 793 87.0 %
Date: 2025-01-07 08:18:18 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "path/filepath"
      14             :         "runtime"
      15             :         "slices"
      16             :         "time"
      17             : 
      18             :         "github.com/cespare/xxhash/v2"
      19             :         "github.com/cockroachdb/crlib/crtime"
      20             :         "github.com/cockroachdb/crlib/fifo"
      21             :         "github.com/cockroachdb/errors"
      22             :         "github.com/cockroachdb/pebble/internal/base"
      23             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      24             :         "github.com/cockroachdb/pebble/internal/cache"
      25             :         "github.com/cockroachdb/pebble/internal/crc"
      26             :         "github.com/cockroachdb/pebble/internal/invariants"
      27             :         "github.com/cockroachdb/pebble/internal/keyspan"
      28             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      29             :         "github.com/cockroachdb/pebble/objstorage"
      30             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      31             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      32             :         "github.com/cockroachdb/pebble/sstable/block"
      33             :         "github.com/cockroachdb/pebble/sstable/colblk"
      34             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      35             :         "github.com/cockroachdb/pebble/sstable/valblk"
      36             :         "github.com/cockroachdb/pebble/vfs"
      37             : )
      38             : 
      39             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      40             : 
      41             : type loadBlockResult int8
      42             : 
      43             : const (
      44             :         loadBlockOK loadBlockResult = iota
      45             :         // Could be due to error or because no block left to load.
      46             :         loadBlockFailed
      47             :         loadBlockIrrelevant
      48             : )
      49             : 
      50             : // Reader is a table reader.
      51             : type Reader struct {
      52             :         readable objstorage.Readable
      53             : 
      54             :         // The following fields are copied from the ReadOptions.
      55             :         cacheOpts            sstableinternal.CacheOptions
      56             :         keySchema            *colblk.KeySchema
      57             :         loadBlockSema        *fifo.Semaphore
      58             :         deniedUserProperties map[string]struct{}
      59             :         filterMetricsTracker *FilterMetricsTracker
      60             :         logger               base.LoggerAndTracer
      61             : 
      62             :         Comparer *base.Comparer
      63             :         Compare  Compare
      64             :         Equal    Equal
      65             :         Split    Split
      66             : 
      67             :         tableFilter *tableFilterReader
      68             : 
      69             :         err error
      70             : 
      71             :         indexBH      block.Handle
      72             :         filterBH     block.Handle
      73             :         rangeDelBH   block.Handle
      74             :         rangeKeyBH   block.Handle
      75             :         valueBIH     valblk.IndexHandle
      76             :         propertiesBH block.Handle
      77             :         metaindexBH  block.Handle
      78             :         footerBH     block.Handle
      79             : 
      80             :         Properties   Properties
      81             :         tableFormat  TableFormat
      82             :         checksumType block.ChecksumType
      83             : 
      84             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
      85             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
      86             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
      87             :         // capacity 3 to accommodate the meta block (1), and both the compressed
      88             :         // properties block (1) and decompressed properties block (1)
      89             :         // simultaneously.
      90             :         metaBufferPool      block.BufferPool
      91             :         metaBufferPoolAlloc [3]block.AllocedBuffer
      92             : }
      93             : 
      94             : var _ CommonReader = (*Reader)(nil)
      95             : 
      96             : // Close the reader and the underlying objstorage.Readable.
      97           2 : func (r *Reader) Close() error {
      98           2 :         r.cacheOpts.Cache.Unref()
      99           2 : 
     100           2 :         if r.readable != nil {
     101           2 :                 r.err = firstError(r.err, r.readable.Close())
     102           2 :                 r.readable = nil
     103           2 :         }
     104             : 
     105           2 :         if r.err != nil {
     106           1 :                 return r.err
     107           1 :         }
     108             :         // Make any future calls to Get, NewIter or Close return an error.
     109           2 :         r.err = errReaderClosed
     110           2 :         return nil
     111             : }
     112             : 
     113             : // NewPointIter returns an iterator for the point keys in the table.
     114             : //
     115             : // If transform.HideObsoletePoints is set, the callee assumes that filterer
     116             : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
     117             : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     118             : func (r *Reader) NewPointIter(
     119             :         ctx context.Context,
     120             :         transforms IterTransforms,
     121             :         lower, upper []byte,
     122             :         filterer *BlockPropertiesFilterer,
     123             :         filterBlockSizeLimit FilterBlockSizeLimit,
     124             :         stats *base.InternalIteratorStats,
     125             :         statsAccum IterStatsAccumulator,
     126             :         rp valblk.ReaderProvider,
     127           2 : ) (Iterator, error) {
     128           2 :         return r.newPointIter(
     129           2 :                 ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
     130           2 :                 stats, statsAccum, rp, nil)
     131           2 : }
     132             : 
     133             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     134             : // before the call to NewPointIter, to get the value of hideObsoletePoints and
     135             : // potentially add a block property filter.
     136             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     137             :         snapshotForHideObsoletePoints base.SeqNum,
     138             :         fileLargestSeqNum base.SeqNum,
     139             :         pointKeyFilters []BlockPropertyFilter,
     140           2 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     141           2 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     142           2 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     143           2 :         if hideObsoletePoints {
     144           2 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     145           2 :         }
     146           2 :         return hideObsoletePoints, pointKeyFilters
     147             : }
     148             : 
     149             : func (r *Reader) newPointIter(
     150             :         ctx context.Context,
     151             :         transforms IterTransforms,
     152             :         lower, upper []byte,
     153             :         filterer *BlockPropertiesFilterer,
     154             :         filterBlockSizeLimit FilterBlockSizeLimit,
     155             :         stats *base.InternalIteratorStats,
     156             :         statsAccum IterStatsAccumulator,
     157             :         rp valblk.ReaderProvider,
     158             :         vState *virtualState,
     159           2 : ) (Iterator, error) {
     160           2 :         // NB: pebble.fileCache wraps the returned iterator with one which performs
     161           2 :         // reference counting on the Reader, preventing the Reader from being closed
     162           2 :         // until the final iterator closes.
     163           2 :         var res Iterator
     164           2 :         var err error
     165           2 :         if r.Properties.IndexType == twoLevelIndex {
     166           2 :                 if r.tableFormat.BlockColumnar() {
     167           2 :                         res, err = newColumnBlockTwoLevelIterator(
     168           2 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     169           2 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     170           2 :                 } else {
     171           2 :                         res, err = newRowBlockTwoLevelIterator(
     172           2 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     173           2 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     174           2 :                 }
     175           2 :         } else {
     176           2 :                 if r.tableFormat.BlockColumnar() {
     177           2 :                         res, err = newColumnBlockSingleLevelIterator(
     178           2 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     179           2 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     180           2 :                 } else {
     181           2 :                         res, err = newRowBlockSingleLevelIterator(
     182           2 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     183           2 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     184           2 :                 }
     185             :         }
     186           2 :         if err != nil {
     187           1 :                 // Note: we don't want to return res here - it will be a nil
     188           1 :                 // single/twoLevelIterator, not a nil Iterator.
     189           1 :                 return nil, err
     190           1 :         }
     191           2 :         return res, nil
     192             : }
     193             : 
     194             : // NewIter returns an iterator for the point keys in the table. It is a
     195             : // simplified version of NewPointIter and should only be used for tests and
     196             : // tooling.
     197             : //
     198             : // NewIter must only be used when the Reader is guaranteed to outlive any
     199             : // LazyValues returned from the iter.
     200           2 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
     201           2 :         // TODO(radu): we should probably not use bloom filters in this case, as there
     202           2 :         // likely isn't a cache set up.
     203           2 :         return r.NewPointIter(
     204           2 :                 context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock,
     205           2 :                 nil /* stats */, nil /* statsAccum */, MakeTrivialReaderProvider(r))
     206           2 : }
     207             : 
     208             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     209             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     210             : // after itself and returns a nil iterator.
     211             : func (r *Reader) NewCompactionIter(
     212             :         transforms IterTransforms,
     213             :         statsAccum IterStatsAccumulator,
     214             :         rp valblk.ReaderProvider,
     215             :         bufferPool *block.BufferPool,
     216           2 : ) (Iterator, error) {
     217           2 :         return r.newCompactionIter(transforms, statsAccum, rp, nil, bufferPool)
     218           2 : }
     219             : 
     220             : func (r *Reader) newCompactionIter(
     221             :         transforms IterTransforms,
     222             :         statsAccum IterStatsAccumulator,
     223             :         rp valblk.ReaderProvider,
     224             :         vState *virtualState,
     225             :         bufferPool *block.BufferPool,
     226           2 : ) (Iterator, error) {
     227           2 :         if vState != nil && vState.isSharedIngested {
     228           2 :                 transforms.HideObsoletePoints = true
     229           2 :         }
     230           2 :         if r.Properties.IndexType == twoLevelIndex {
     231           2 :                 if !r.tableFormat.BlockColumnar() {
     232           2 :                         i, err := newRowBlockTwoLevelIterator(
     233           2 :                                 context.Background(),
     234           2 :                                 r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     235           2 :                                 NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     236           2 :                         if err != nil {
     237           0 :                                 return nil, err
     238           0 :                         }
     239           2 :                         i.SetupForCompaction()
     240           2 :                         return i, nil
     241             :                 }
     242           2 :                 i, err := newColumnBlockTwoLevelIterator(
     243           2 :                         context.Background(),
     244           2 :                         r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     245           2 :                         NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     246           2 :                 if err != nil {
     247           0 :                         return nil, err
     248           0 :                 }
     249           2 :                 i.SetupForCompaction()
     250           2 :                 return i, nil
     251             :         }
     252           2 :         if !r.tableFormat.BlockColumnar() {
     253           2 :                 i, err := newRowBlockSingleLevelIterator(
     254           2 :                         context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     255           2 :                         nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     256           2 :                 if err != nil {
     257           0 :                         return nil, err
     258           0 :                 }
     259           2 :                 i.SetupForCompaction()
     260           2 :                 return i, nil
     261             :         }
     262           2 :         i, err := newColumnBlockSingleLevelIterator(
     263           2 :                 context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     264           2 :                 nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     265           2 :         if err != nil {
     266           0 :                 return nil, err
     267           0 :         }
     268           2 :         i.SetupForCompaction()
     269           2 :         return i, nil
     270             : }
     271             : 
     272             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     273             : // range-del block for the table. Returns nil if the table does not contain
     274             : // any range deletions.
     275             : func (r *Reader) NewRawRangeDelIter(
     276             :         ctx context.Context, transforms FragmentIterTransforms,
     277           2 : ) (iter keyspan.FragmentIterator, err error) {
     278           2 :         if r.rangeDelBH.Length == 0 {
     279           2 :                 return nil, nil
     280           2 :         }
     281             :         // TODO(radu): plumb stats here.
     282           2 :         h, err := r.readRangeDelBlock(ctx, noEnv, noReadHandle, r.rangeDelBH)
     283           2 :         if err != nil {
     284           1 :                 return nil, err
     285           1 :         }
     286           2 :         if r.tableFormat.BlockColumnar() {
     287           2 :                 iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
     288           2 :         } else {
     289           2 :                 iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Comparer, h, transforms)
     290           2 :                 if err != nil {
     291           0 :                         return nil, err
     292           0 :                 }
     293             :         }
     294           2 :         return keyspan.MaybeAssert(iter, r.Compare), nil
     295             : }
     296             : 
     297             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     298             : // range-key block for the table. Returns nil if the table does not contain any
     299             : // range keys.
     300             : func (r *Reader) NewRawRangeKeyIter(
     301             :         ctx context.Context, transforms FragmentIterTransforms,
     302           2 : ) (iter keyspan.FragmentIterator, err error) {
     303           2 :         if r.rangeKeyBH.Length == 0 {
     304           2 :                 return nil, nil
     305           2 :         }
     306             :         // TODO(radu): plumb stats here.
     307           2 :         h, err := r.readRangeKeyBlock(ctx, noEnv, noReadHandle, r.rangeKeyBH)
     308           2 :         if err != nil {
     309           1 :                 return nil, err
     310           1 :         }
     311           2 :         if r.tableFormat.BlockColumnar() {
     312           2 :                 iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
     313           2 :         } else {
     314           2 :                 iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Comparer, h, transforms)
     315           2 :                 if err != nil {
     316           0 :                         return nil, err
     317           0 :                 }
     318             :         }
     319           2 :         return keyspan.MaybeAssert(iter, r.Compare), nil
     320             : }
     321             : 
     322             : // readBlockEnv contains arguments used when reading a block which apply to all
     323             : // the block reads performed by a higher-level operation.
     324             : type readBlockEnv struct {
     325             :         // stats and iterStats are slightly different. stats is a shared struct
     326             :         // supplied from the outside, and represents stats for the whole iterator
     327             :         // tree and can be reset from the outside (e.g. when the pebble.Iterator is
     328             :         // being reused). It is currently only provided when the iterator tree is
     329             :         // rooted at pebble.Iterator. iterStats contains an sstable iterator's
     330             :         // private stats that are reported to a CategoryStatsCollector when this
     331             :         // iterator is closed. In the important code paths, the CategoryStatsCollector
     332             :         // is managed by the fileCacheContainer.
     333             :         Stats     *base.InternalIteratorStats
     334             :         IterStats *iterStatsAccumulator
     335             : 
     336             :         // BufferPool is not-nil if we read blocks into a buffer pool and not into the
     337             :         // cache. This is used during compactions.
     338             :         BufferPool *block.BufferPool
     339             : }
     340             : 
     341             : // BlockServedFromCache updates the stats when a block was found in the cache.
     342           2 : func (env *readBlockEnv) BlockServedFromCache(blockLength uint64) {
     343           2 :         if env.Stats != nil {
     344           2 :                 env.Stats.BlockBytes += blockLength
     345           2 :                 env.Stats.BlockBytesInCache += blockLength
     346           2 :         }
     347           2 :         if env.IterStats != nil {
     348           2 :                 env.IterStats.reportStats(blockLength, blockLength, 0)
     349           2 :         }
     350             : }
     351             : 
     352             : // BlockRead updates the stats when a block had to be read.
     353           2 : func (env *readBlockEnv) BlockRead(blockLength uint64, readDuration time.Duration) {
     354           2 :         if env.Stats != nil {
     355           2 :                 env.Stats.BlockBytes += blockLength
     356           2 :                 env.Stats.BlockReadDuration += readDuration
     357           2 :         }
     358           2 :         if env.IterStats != nil {
     359           2 :                 env.IterStats.reportStats(blockLength, 0, readDuration)
     360           2 :         }
     361             : }
     362             : 
     363             : // noEnv is the empty readBlockEnv which reports no stats and does not use a
     364             : // buffer pool.
     365             : var noEnv = readBlockEnv{}
     366             : 
     367             : // noReadHandle is used when we don't want to pass a ReadHandle to one of the
     368             : // read block methods.
     369             : var noReadHandle objstorage.ReadHandle = nil
     370             : 
     371           2 : var noInitBlockMetadataFn = func(*block.Metadata, []byte) error { return nil }
     372             : 
     373             : // readMetaindexBlock reads the metaindex block.
     374             : func (r *Reader) readMetaindexBlock(
     375             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle,
     376           2 : ) (block.BufferHandle, error) {
     377           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     378           2 :         return r.readBlockInternal(ctx, env, readHandle, r.metaindexBH, noInitBlockMetadataFn)
     379           2 : }
     380             : 
     381             : // readTopLevelIndexBlock reads the top-level index block.
     382             : func (r *Reader) readTopLevelIndexBlock(
     383             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle,
     384           2 : ) (block.BufferHandle, error) {
     385           2 :         return r.readIndexBlock(ctx, env, readHandle, r.indexBH)
     386           2 : }
     387             : 
     388             : // readIndexBlock reads a top-level or second-level index block.
     389             : func (r *Reader) readIndexBlock(
     390             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     391           2 : ) (block.BufferHandle, error) {
     392           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     393           2 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initIndexBlockMetadata)
     394           2 : }
     395             : 
     396             : // initIndexBlockMetadata initializes the Metadata for a data block. This will
     397             : // later be used (and reused) when reading from the block.
     398           2 : func (r *Reader) initIndexBlockMetadata(metadata *block.Metadata, data []byte) error {
     399           2 :         if r.tableFormat.BlockColumnar() {
     400           2 :                 return colblk.InitIndexBlockMetadata(metadata, data)
     401           2 :         }
     402           2 :         return nil
     403             : }
     404             : 
     405             : func (r *Reader) readDataBlock(
     406             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     407           2 : ) (block.BufferHandle, error) {
     408           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.DataBlock)
     409           2 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initDataBlockMetadata)
     410           2 : }
     411             : 
     412             : // initDataBlockMetadata initializes the Metadata for a data block. This will
     413             : // later be used (and reused) when reading from the block.
     414           2 : func (r *Reader) initDataBlockMetadata(metadata *block.Metadata, data []byte) error {
     415           2 :         if r.tableFormat.BlockColumnar() {
     416           2 :                 return colblk.InitDataBlockMetadata(r.keySchema, metadata, data)
     417           2 :         }
     418           2 :         return nil
     419             : }
     420             : 
     421             : func (r *Reader) readFilterBlock(
     422             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     423           2 : ) (block.BufferHandle, error) {
     424           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     425           2 :         return r.readBlockInternal(ctx, env, readHandle, bh, noInitBlockMetadataFn)
     426           2 : }
     427             : 
     428             : func (r *Reader) readRangeDelBlock(
     429             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     430           2 : ) (block.BufferHandle, error) {
     431           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     432           2 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initKeyspanBlockMetadata)
     433           2 : }
     434             : 
     435             : func (r *Reader) readRangeKeyBlock(
     436             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     437           2 : ) (block.BufferHandle, error) {
     438           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     439           2 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initKeyspanBlockMetadata)
     440           2 : }
     441             : 
     442             : // initKeyspanBlockMetadata initializes the Metadata for a rangedel or range key
     443             : // block. This will later be used (and reused) when reading from the block.
     444           2 : func (r *Reader) initKeyspanBlockMetadata(metadata *block.Metadata, data []byte) error {
     445           2 :         if r.tableFormat.BlockColumnar() {
     446           2 :                 return colblk.InitKeyspanBlockMetadata(metadata, data)
     447           2 :         }
     448           2 :         return nil
     449             : }
     450             : 
     451             : // ReadValueBlockExternal implements valblk.ExternalBlockReader, allowing a
     452             : // base.LazyValue to read a value block.
     453             : func (r *Reader) ReadValueBlockExternal(
     454             :         ctx context.Context, bh block.Handle,
     455           2 : ) (block.BufferHandle, error) {
     456           2 :         return r.readValueBlock(ctx, noEnv, noReadHandle, bh)
     457           2 : }
     458             : 
     459             : func (r *Reader) readValueBlock(
     460             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     461           2 : ) (block.BufferHandle, error) {
     462           2 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.ValueBlock)
     463           2 :         return r.readBlockInternal(ctx, env, readHandle, bh, noInitBlockMetadataFn)
     464           2 : }
     465             : 
     466             : func checkChecksum(
     467             :         checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
     468           2 : ) error {
     469           2 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     470           2 :         var computedChecksum uint32
     471           2 :         switch checksumType {
     472           2 :         case block.ChecksumTypeCRC32c:
     473           2 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     474           1 :         case block.ChecksumTypeXXHash64:
     475           1 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     476           0 :         default:
     477           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     478             :         }
     479             : 
     480           2 :         if expectedChecksum != computedChecksum {
     481           1 :                 return base.CorruptionErrorf(
     482           1 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     483           1 :                         fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
     484           1 :         }
     485           2 :         return nil
     486             : }
     487             : 
     488             : // DeterministicReadBlockDurationForTesting is for tests that want a
     489             : // deterministic value of the time to read a block (that is not in the cache).
     490             : // The return value is a function that must be called before the test exits.
     491           1 : func DeterministicReadBlockDurationForTesting() func() {
     492           1 :         drbdForTesting := deterministicReadBlockDurationForTesting
     493           1 :         deterministicReadBlockDurationForTesting = true
     494           1 :         return func() {
     495           1 :                 deterministicReadBlockDurationForTesting = drbdForTesting
     496           1 :         }
     497             : }
     498             : 
     499             : var deterministicReadBlockDurationForTesting = false
     500             : 
     501             : // readBlockInternal should not be used directly; one of the read*Block methods
     502             : // should be used instead.
     503             : func (r *Reader) readBlockInternal(
     504             :         ctx context.Context,
     505             :         env readBlockEnv,
     506             :         readHandle objstorage.ReadHandle,
     507             :         bh block.Handle,
     508             :         initBlockMetadataFn func(*block.Metadata, []byte) error,
     509           2 : ) (handle block.BufferHandle, _ error) {
     510           2 :         var ch cache.Handle
     511           2 :         var crh cache.ReadHandle
     512           2 :         hit := true
     513           2 :         if env.BufferPool == nil {
     514           2 :                 var errorDuration time.Duration
     515           2 :                 var err error
     516           2 :                 ch, crh, errorDuration, hit, err = r.cacheOpts.Cache.GetWithReadHandle(
     517           2 :                         ctx, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     518           2 :                 if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) {
     519           0 :                         r.logger.Eventf(
     520           0 :                                 ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String())
     521           0 :                 }
     522             :                 // TODO(sumeer): consider tracing when waited longer than some duration
     523             :                 // for turn to do the read.
     524           2 :                 if err != nil {
     525           0 :                         return block.BufferHandle{}, err
     526           0 :                 }
     527           2 :         } else {
     528           2 :                 // The compaction path uses env.BufferPool, and does not coordinate read
     529           2 :                 // using a cache.ReadHandle. This is ok since only a single compaction is
     530           2 :                 // reading a block.
     531           2 :                 ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     532           2 :                 if ch.Valid() {
     533           2 :                         hit = true
     534           2 :                 }
     535             :         }
     536             :         // INVARIANT: hit => ch.Valid()
     537           2 :         if ch.Valid() {
     538           2 :                 if hit {
     539           2 :                         // Cache hit.
     540           2 :                         if readHandle != nil {
     541           2 :                                 readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
     542           2 :                         }
     543           2 :                         env.BlockServedFromCache(bh.Length)
     544             :                 }
     545           2 :                 if invariants.Enabled && crh.Valid() {
     546           0 :                         panic("cache.ReadHandle must not be valid")
     547             :                 }
     548           2 :                 return block.CacheBufferHandle(ch), nil
     549             :         }
     550             : 
     551             :         // Need to read. First acquire loadBlockSema, if needed.
     552           2 :         if sema := r.loadBlockSema; sema != nil {
     553           1 :                 if err := sema.Acquire(ctx, 1); err != nil {
     554           0 :                         // An error here can only come from the context.
     555           0 :                         return block.BufferHandle{}, err
     556           0 :                 }
     557           1 :                 defer sema.Release(1)
     558             :         }
     559           2 :         value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn)
     560           2 :         if err != nil {
     561           1 :                 if crh.Valid() {
     562           1 :                         crh.SetReadError(err)
     563           1 :                 }
     564           1 :                 return block.BufferHandle{}, err
     565             :         }
     566           2 :         h := value.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     567           2 :         return h, nil
     568             : }
     569             : 
     570             : // doRead is a helper for readBlockInternal that does the read, checksum
     571             : // check, decompression, and returns either a block.Value or an error.
     572             : func (r *Reader) doRead(
     573             :         ctx context.Context,
     574             :         env readBlockEnv,
     575             :         readHandle objstorage.ReadHandle,
     576             :         bh block.Handle,
     577             :         initBlockMetadataFn func(*block.Metadata, []byte) error,
     578           2 : ) (block.Value, error) {
     579           2 :         compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool)
     580           2 :         readStopwatch := makeStopwatch()
     581           2 :         var err error
     582           2 :         if readHandle != nil {
     583           2 :                 err = readHandle.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
     584           2 :         } else {
     585           2 :                 err = r.readable.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
     586           2 :         }
     587           2 :         readDuration := readStopwatch.stop()
     588           2 :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     589           2 :         // interface{}, unless necessary.
     590           2 :         if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
     591           1 :                 _, file1, line1, _ := runtime.Caller(1)
     592           1 :                 _, file2, line2, _ := runtime.Caller(2)
     593           1 :                 r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
     594           1 :                         int(bh.Length+block.TrailerLen), readDuration.String(),
     595           1 :                         r.cacheOpts.FileNum,
     596           1 :                         filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
     597           1 :                         filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
     598           1 :         }
     599           2 :         if err != nil {
     600           1 :                 compressed.Release()
     601           1 :                 return block.Value{}, err
     602           1 :         }
     603           2 :         env.BlockRead(bh.Length, readDuration)
     604           2 :         if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil {
     605           1 :                 compressed.Release()
     606           1 :                 return block.Value{}, err
     607           1 :         }
     608           2 :         typ := block.CompressionIndicator(compressed.BlockData()[bh.Length])
     609           2 :         compressed.Truncate(int(bh.Length))
     610           2 :         var decompressed block.Value
     611           2 :         if typ == block.NoCompressionIndicator {
     612           2 :                 decompressed = compressed
     613           2 :         } else {
     614           2 :                 // Decode the length of the decompressed value.
     615           2 :                 decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData())
     616           2 :                 if err != nil {
     617           0 :                         compressed.Release()
     618           0 :                         return block.Value{}, err
     619           0 :                 }
     620           2 :                 decompressed = block.Alloc(decodedLen, env.BufferPool)
     621           2 :                 err = block.DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData())
     622           2 :                 compressed.Release()
     623           2 :                 if err != nil {
     624           0 :                         decompressed.Release()
     625           0 :                         return block.Value{}, err
     626           0 :                 }
     627             :         }
     628           2 :         if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil {
     629           0 :                 decompressed.Release()
     630           0 :                 return block.Value{}, err
     631           0 :         }
     632           2 :         return decompressed, nil
     633             : }
     634             : 
     635             : func (r *Reader) readMetaindex(
     636             :         ctx context.Context, readHandle objstorage.ReadHandle, filters map[string]FilterPolicy,
     637           2 : ) error {
     638           2 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     639           2 :         // populating the block cache with these blocks. In heavy-write workloads,
     640           2 :         // especially with high compaction concurrency, new tables may be created
     641           2 :         // frequently. Populating the block cache with these metaindex blocks adds
     642           2 :         // additional contention on the block cache mutexes (see #1997).
     643           2 :         // Additionally, these blocks are exceedingly unlikely to be read again
     644           2 :         // while they're still in the block cache except in misconfigurations with
     645           2 :         // excessive sstables counts or a file cache that's far too small.
     646           2 :         r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
     647           2 :         // When we're finished, release the buffers we've allocated back to memory
     648           2 :         // allocator. We don't expect to use metaBufferPool again.
     649           2 :         defer r.metaBufferPool.Release()
     650           2 :         metaEnv := readBlockEnv{
     651           2 :                 BufferPool: &r.metaBufferPool,
     652           2 :         }
     653           2 : 
     654           2 :         b, err := r.readMetaindexBlock(ctx, metaEnv, readHandle)
     655           2 :         if err != nil {
     656           1 :                 return err
     657           1 :         }
     658           2 :         data := b.BlockData()
     659           2 :         defer b.Release()
     660           2 : 
     661           2 :         if uint64(len(data)) != r.metaindexBH.Length {
     662           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     663           0 :                         errors.Safe(len(data)), errors.Safe(r.metaindexBH.Length))
     664           0 :         }
     665             : 
     666           2 :         var meta map[string]block.Handle
     667           2 :         meta, r.valueBIH, err = decodeMetaindex(data)
     668           2 :         if err != nil {
     669           0 :                 return err
     670           0 :         }
     671             : 
     672           2 :         if bh, ok := meta[metaPropertiesName]; ok {
     673           2 :                 b, err = r.readBlockInternal(ctx, metaEnv, readHandle, bh, noInitBlockMetadataFn)
     674           2 :                 if err != nil {
     675           1 :                         return err
     676           1 :                 }
     677           2 :                 r.propertiesBH = bh
     678           2 :                 err := r.Properties.load(b.BlockData(), r.deniedUserProperties)
     679           2 :                 b.Release()
     680           2 :                 if err != nil {
     681           0 :                         return err
     682           0 :                 }
     683             :         }
     684             : 
     685           2 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     686           2 :                 r.rangeDelBH = bh
     687           2 :         } else if _, ok := meta[metaRangeDelV1Name]; ok {
     688           0 :                 // This version of Pebble requires a format major version at least as
     689           0 :                 // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
     690           0 :                 // this format major verison, we have a guarantee that we've compacted
     691           0 :                 // away all RocksDB sstables. It should not be possible to encounter an
     692           0 :                 // sstable with a v1 range deletion block but not a v2 range deletion
     693           0 :                 // block.
     694           0 :                 err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
     695           0 :                 return errors.Mark(err, base.ErrCorruption)
     696           0 :         }
     697             : 
     698           2 :         if bh, ok := meta[metaRangeKeyName]; ok {
     699           2 :                 r.rangeKeyBH = bh
     700           2 :         }
     701             : 
     702           2 :         for name, fp := range filters {
     703           2 :                 if bh, ok := meta["fullfilter."+name]; ok {
     704           2 :                         r.filterBH = bh
     705           2 :                         r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
     706           2 :                         break
     707             :                 }
     708             :         }
     709           2 :         return nil
     710             : }
     711             : 
     712             : // Layout returns the layout (block organization) for an sstable.
     713           2 : func (r *Reader) Layout() (*Layout, error) {
     714           2 :         if r.err != nil {
     715           0 :                 return nil, r.err
     716           0 :         }
     717             : 
     718           2 :         l := &Layout{
     719           2 :                 Data:       make([]block.HandleWithProperties, 0, r.Properties.NumDataBlocks),
     720           2 :                 RangeDel:   r.rangeDelBH,
     721           2 :                 RangeKey:   r.rangeKeyBH,
     722           2 :                 ValueIndex: r.valueBIH.Handle,
     723           2 :                 Properties: r.propertiesBH,
     724           2 :                 MetaIndex:  r.metaindexBH,
     725           2 :                 Footer:     r.footerBH,
     726           2 :                 Format:     r.tableFormat,
     727           2 :         }
     728           2 :         if r.filterBH.Length > 0 {
     729           2 :                 l.Filter = []NamedBlockHandle{{Name: "fullfilter." + r.tableFilter.policy.Name(), Handle: r.filterBH}}
     730           2 :         }
     731           2 :         ctx := context.TODO()
     732           2 : 
     733           2 :         indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, noReadHandle)
     734           2 :         if err != nil {
     735           1 :                 return nil, err
     736           1 :         }
     737           2 :         defer indexH.Release()
     738           2 : 
     739           2 :         var alloc bytealloc.A
     740           2 : 
     741           2 :         if r.Properties.IndexPartitions == 0 {
     742           2 :                 l.Index = append(l.Index, r.indexBH)
     743           2 :                 iter := r.tableFormat.newIndexIter()
     744           2 :                 err := iter.Init(r.Comparer, indexH.BlockData(), NoTransforms)
     745           2 :                 if err != nil {
     746           0 :                         return nil, errors.Wrap(err, "reading index block")
     747           0 :                 }
     748           2 :                 for valid := iter.First(); valid; valid = iter.Next() {
     749           2 :                         dataBH, err := iter.BlockHandleWithProperties()
     750           2 :                         if err != nil {
     751           0 :                                 return nil, errCorruptIndexEntry(err)
     752           0 :                         }
     753           2 :                         if len(dataBH.Props) > 0 {
     754           2 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     755           2 :                         }
     756           2 :                         l.Data = append(l.Data, dataBH)
     757             :                 }
     758           2 :         } else {
     759           2 :                 l.TopIndex = r.indexBH
     760           2 :                 topIter := r.tableFormat.newIndexIter()
     761           2 :                 err := topIter.Init(r.Comparer, indexH.BlockData(), NoTransforms)
     762           2 :                 if err != nil {
     763           0 :                         return nil, errors.Wrap(err, "reading index block")
     764           0 :                 }
     765           2 :                 iter := r.tableFormat.newIndexIter()
     766           2 :                 for valid := topIter.First(); valid; valid = topIter.Next() {
     767           2 :                         indexBH, err := topIter.BlockHandleWithProperties()
     768           2 :                         if err != nil {
     769           0 :                                 return nil, errCorruptIndexEntry(err)
     770           0 :                         }
     771           2 :                         l.Index = append(l.Index, indexBH.Handle)
     772           2 : 
     773           2 :                         subIndex, err := r.readIndexBlock(ctx, noEnv, noReadHandle, indexBH.Handle)
     774           2 :                         if err != nil {
     775           1 :                                 return nil, err
     776           1 :                         }
     777           2 :                         err = func() error {
     778           2 :                                 defer subIndex.Release()
     779           2 :                                 // TODO(msbutler): figure out how to pass virtualState to layout call.
     780           2 :                                 if err := iter.Init(r.Comparer, subIndex.BlockData(), NoTransforms); err != nil {
     781           0 :                                         return err
     782           0 :                                 }
     783           2 :                                 for valid := iter.First(); valid; valid = iter.Next() {
     784           2 :                                         dataBH, err := iter.BlockHandleWithProperties()
     785           2 :                                         if err != nil {
     786           0 :                                                 return errCorruptIndexEntry(err)
     787           0 :                                         }
     788           2 :                                         if len(dataBH.Props) > 0 {
     789           2 :                                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     790           2 :                                         }
     791           2 :                                         l.Data = append(l.Data, dataBH)
     792             :                                 }
     793           2 :                                 return nil
     794             :                         }()
     795           2 :                         if err != nil {
     796           0 :                                 return nil, err
     797           0 :                         }
     798             :                 }
     799             :         }
     800           2 :         if r.valueBIH.Handle.Length != 0 {
     801           2 :                 vbiH, err := r.readValueBlock(context.Background(), noEnv, noReadHandle, r.valueBIH.Handle)
     802           2 :                 if err != nil {
     803           0 :                         return nil, err
     804           0 :                 }
     805           2 :                 defer vbiH.Release()
     806           2 :                 l.ValueBlock, err = valblk.DecodeIndex(vbiH.BlockData(), r.valueBIH)
     807           2 :                 if err != nil {
     808           0 :                         return nil, err
     809           0 :                 }
     810             :         }
     811             : 
     812           2 :         return l, nil
     813             : }
     814             : 
     815             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
     816           2 : func (r *Reader) ValidateBlockChecksums() error {
     817           2 :         // Pre-compute the BlockHandles for the underlying file.
     818           2 :         l, err := r.Layout()
     819           2 :         if err != nil {
     820           1 :                 return err
     821           1 :         }
     822             : 
     823           2 :         type blk struct {
     824           2 :                 bh     block.Handle
     825           2 :                 readFn func(context.Context, readBlockEnv, objstorage.ReadHandle, block.Handle) (block.BufferHandle, error)
     826           2 :         }
     827           2 :         // Construct the set of blocks to check. Note that the footer is not checked
     828           2 :         // as it is not a block with a checksum.
     829           2 :         blocks := make([]blk, 0, len(l.Data)+6)
     830           2 :         for i := range l.Data {
     831           2 :                 blocks = append(blocks, blk{
     832           2 :                         bh:     l.Data[i].Handle,
     833           2 :                         readFn: r.readDataBlock,
     834           2 :                 })
     835           2 :         }
     836           2 :         for _, h := range l.Index {
     837           2 :                 blocks = append(blocks, blk{
     838           2 :                         bh:     h,
     839           2 :                         readFn: r.readIndexBlock,
     840           2 :                 })
     841           2 :         }
     842           2 :         blocks = append(blocks, blk{
     843           2 :                 bh:     l.TopIndex,
     844           2 :                 readFn: r.readIndexBlock,
     845           2 :         })
     846           2 :         for _, bh := range l.Filter {
     847           2 :                 blocks = append(blocks, blk{
     848           2 :                         bh:     bh.Handle,
     849           2 :                         readFn: r.readFilterBlock,
     850           2 :                 })
     851           2 :         }
     852           2 :         blocks = append(blocks, blk{
     853           2 :                 bh:     l.RangeDel,
     854           2 :                 readFn: r.readRangeDelBlock,
     855           2 :         })
     856           2 :         blocks = append(blocks, blk{
     857           2 :                 bh:     l.RangeKey,
     858           2 :                 readFn: r.readRangeKeyBlock,
     859           2 :         })
     860           2 :         readNoInit := func(ctx context.Context, env readBlockEnv, rh objstorage.ReadHandle, bh block.Handle) (block.BufferHandle, error) {
     861           2 :                 return r.readBlockInternal(ctx, env, rh, bh, noInitBlockMetadataFn)
     862           2 :         }
     863           2 :         blocks = append(blocks, blk{
     864           2 :                 bh:     l.Properties,
     865           2 :                 readFn: readNoInit,
     866           2 :         })
     867           2 :         blocks = append(blocks, blk{
     868           2 :                 bh:     l.MetaIndex,
     869           2 :                 readFn: readNoInit,
     870           2 :         })
     871           2 : 
     872           2 :         // Sorting by offset ensures we are performing a sequential scan of the
     873           2 :         // file.
     874           2 :         slices.SortFunc(blocks, func(a, b blk) int {
     875           2 :                 return cmp.Compare(a.bh.Offset, b.bh.Offset)
     876           2 :         })
     877             : 
     878           2 :         ctx := context.Background()
     879           2 :         for _, b := range blocks {
     880           2 :                 // Certain blocks may not be present, in which case we skip them.
     881           2 :                 if b.bh.Length == 0 {
     882           2 :                         continue
     883             :                 }
     884           2 :                 h, err := b.readFn(ctx, noEnv, noReadHandle, b.bh)
     885           2 :                 if err != nil {
     886           1 :                         return err
     887           1 :                 }
     888           2 :                 h.Release()
     889             :         }
     890             : 
     891           2 :         return nil
     892             : }
     893             : 
     894             : // CommonProperties implemented the CommonReader interface.
     895           2 : func (r *Reader) CommonProperties() *CommonProperties {
     896           2 :         return &r.Properties.CommonProperties
     897           2 : }
     898             : 
     899             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
     900             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
     901             : // determine overlap due to abbreviated index keys, the full data block size is
     902             : // included in the estimation.
     903             : //
     904             : // This function does not account for any metablock space usage. Assumes there
     905             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
     906             : // before nor completely after the file's range.
     907             : //
     908             : // Only blocks containing point keys are considered. Range deletion and range
     909             : // key blocks are not considered.
     910             : //
     911             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
     912             : // data blocks overlapped and add that same fraction of the metadata blocks to the
     913             : // estimate.
     914           2 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
     915           2 :         if !r.tableFormat.BlockColumnar() {
     916           2 :                 return estimateDiskUsage[rowblk.IndexIter, *rowblk.IndexIter](r, start, end)
     917           2 :         }
     918           2 :         return estimateDiskUsage[colblk.IndexIter, *colblk.IndexIter](r, start, end)
     919             : }
     920             : 
     921             : func estimateDiskUsage[I any, PI indexBlockIterator[I]](
     922             :         r *Reader, start, end []byte,
     923           2 : ) (uint64, error) {
     924           2 :         if r.err != nil {
     925           0 :                 return 0, r.err
     926           0 :         }
     927           2 :         ctx := context.TODO()
     928           2 : 
     929           2 :         indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, noReadHandle)
     930           2 :         if err != nil {
     931           1 :                 return 0, err
     932           1 :         }
     933             :         // We are using InitHandle below but we never Close those iterators, which
     934             :         // allows us to release the index handle ourselves.
     935             :         // TODO(radu): clean this up.
     936           2 :         defer indexH.Release()
     937           2 : 
     938           2 :         // Iterators over the bottom-level index blocks containing start and end.
     939           2 :         // These may be different in case of partitioned index but will both point
     940           2 :         // to the same blockIter over the single index in the unpartitioned case.
     941           2 :         var startIdxIter, endIdxIter PI
     942           2 :         if r.Properties.IndexPartitions == 0 {
     943           2 :                 startIdxIter = new(I)
     944           2 :                 if err := startIdxIter.InitHandle(r.Comparer, indexH, NoTransforms); err != nil {
     945           0 :                         return 0, err
     946           0 :                 }
     947           2 :                 endIdxIter = startIdxIter
     948           2 :         } else {
     949           2 :                 var topIter PI = new(I)
     950           2 :                 if err := topIter.InitHandle(r.Comparer, indexH, NoTransforms); err != nil {
     951           0 :                         return 0, err
     952           0 :                 }
     953           2 :                 if !topIter.SeekGE(start) {
     954           1 :                         // The range falls completely after this file.
     955           1 :                         return 0, nil
     956           1 :                 }
     957           2 :                 startIndexBH, err := topIter.BlockHandleWithProperties()
     958           2 :                 if err != nil {
     959           0 :                         return 0, errCorruptIndexEntry(err)
     960           0 :                 }
     961           2 :                 startIdxBlock, err := r.readIndexBlock(ctx, noEnv, noReadHandle, startIndexBH.Handle)
     962           2 :                 if err != nil {
     963           1 :                         return 0, err
     964           1 :                 }
     965           2 :                 defer startIdxBlock.Release()
     966           2 :                 startIdxIter = new(I)
     967           2 :                 err = startIdxIter.InitHandle(r.Comparer, startIdxBlock, NoTransforms)
     968           2 :                 if err != nil {
     969           0 :                         return 0, err
     970           0 :                 }
     971             : 
     972           2 :                 if topIter.SeekGE(end) {
     973           2 :                         endIndexBH, err := topIter.BlockHandleWithProperties()
     974           2 :                         if err != nil {
     975           0 :                                 return 0, errCorruptIndexEntry(err)
     976           0 :                         }
     977           2 :                         endIdxBlock, err := r.readIndexBlock(ctx, noEnv, noReadHandle, endIndexBH.Handle)
     978           2 :                         if err != nil {
     979           1 :                                 return 0, err
     980           1 :                         }
     981           2 :                         defer endIdxBlock.Release()
     982           2 :                         endIdxIter = new(I)
     983           2 :                         err = endIdxIter.InitHandle(r.Comparer, endIdxBlock, NoTransforms)
     984           2 :                         if err != nil {
     985           0 :                                 return 0, err
     986           0 :                         }
     987             :                 }
     988             :         }
     989             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
     990             :         // range spans past the end of the file.
     991             : 
     992           2 :         if !startIdxIter.SeekGE(start) {
     993           2 :                 // The range falls completely after this file.
     994           2 :                 return 0, nil
     995           2 :         }
     996           2 :         startBH, err := startIdxIter.BlockHandleWithProperties()
     997           2 :         if err != nil {
     998           0 :                 return 0, errCorruptIndexEntry(err)
     999           0 :         }
    1000             : 
    1001           2 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
    1002           2 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
    1003           2 :                 // Linearly interpolate what is stored in value blocks.
    1004           2 :                 //
    1005           2 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
    1006           2 :                 // (which contain the value handles, and which may also be insufficient if
    1007           2 :                 // the values are in separate files), we will need to accumulate the
    1008           2 :                 // logical size of the key-value pairs and store the cumulative value for
    1009           2 :                 // each data block in the index block entry. This increases the size of
    1010           2 :                 // the BlockHandle, so wait until this becomes necessary.
    1011           2 :                 return dataBlockSize +
    1012           2 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
    1013           2 :                                 float64(r.Properties.ValueBlocksSize))
    1014           2 :         }
    1015           2 :         if endIdxIter == nil {
    1016           1 :                 // The range spans beyond this file. Include data blocks through the last.
    1017           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1018           1 :         }
    1019           2 :         if !endIdxIter.SeekGE(end) {
    1020           2 :                 // The range spans beyond this file. Include data blocks through the last.
    1021           2 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1022           2 :         }
    1023           2 :         endBH, err := endIdxIter.BlockHandleWithProperties()
    1024           2 :         if err != nil {
    1025           0 :                 return 0, errCorruptIndexEntry(err)
    1026           0 :         }
    1027           2 :         return includeInterpolatedValueBlocksSize(
    1028           2 :                 endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
    1029             : }
    1030             : 
    1031             : // TableFormat returns the format version for the table.
    1032           2 : func (r *Reader) TableFormat() (TableFormat, error) {
    1033           2 :         if r.err != nil {
    1034           0 :                 return TableFormatUnspecified, r.err
    1035           0 :         }
    1036           2 :         return r.tableFormat, nil
    1037             : }
    1038             : 
    1039             : // NewReader returns a new table reader for the file. Closing the reader will
    1040             : // close the file.
    1041             : //
    1042             : // The context is used for tracing any operations performed by NewReader; it is
    1043             : // NOT stored for future use.
    1044           2 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
    1045           2 :         if f == nil {
    1046           1 :                 return nil, errors.New("pebble/table: nil file")
    1047           1 :         }
    1048           2 :         o = o.ensureDefaults()
    1049           2 :         r := &Reader{
    1050           2 :                 readable:             f,
    1051           2 :                 cacheOpts:            o.internal.CacheOpts,
    1052           2 :                 loadBlockSema:        o.LoadBlockSema,
    1053           2 :                 deniedUserProperties: o.DeniedUserProperties,
    1054           2 :                 filterMetricsTracker: o.FilterMetricsTracker,
    1055           2 :                 logger:               o.LoggerAndTracer,
    1056           2 :         }
    1057           2 :         if r.cacheOpts.Cache == nil {
    1058           2 :                 r.cacheOpts.Cache = cache.New(0)
    1059           2 :         } else {
    1060           2 :                 r.cacheOpts.Cache.Ref()
    1061           2 :         }
    1062           2 :         if r.cacheOpts.CacheID == 0 {
    1063           2 :                 r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
    1064           2 :         }
    1065             : 
    1066           2 :         var preallocRH objstorageprovider.PreallocatedReadHandle
    1067           2 :         rh := objstorageprovider.UsePreallocatedReadHandle(
    1068           2 :                 r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
    1069           2 :         defer rh.Close()
    1070           2 : 
    1071           2 :         footer, err := readFooter(ctx, f, rh, r.logger, r.cacheOpts.FileNum)
    1072           2 :         if err != nil {
    1073           1 :                 r.err = err
    1074           1 :                 return nil, r.Close()
    1075           1 :         }
    1076           2 :         r.checksumType = footer.checksum
    1077           2 :         r.tableFormat = footer.format
    1078           2 :         r.indexBH = footer.indexBH
    1079           2 :         r.metaindexBH = footer.metaindexBH
    1080           2 :         r.footerBH = footer.footerBH
    1081           2 :         // Read the metaindex and properties blocks.
    1082           2 :         if err := r.readMetaindex(ctx, rh, o.Filters); err != nil {
    1083           1 :                 r.err = err
    1084           1 :                 return nil, r.Close()
    1085           1 :         }
    1086             : 
    1087           2 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
    1088           2 :                 r.Comparer = o.Comparer
    1089           2 :                 r.Compare = o.Comparer.Compare
    1090           2 :                 r.Equal = o.Comparer.Equal
    1091           2 :                 r.Split = o.Comparer.Split
    1092           2 :         } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
    1093           1 :                 r.Comparer = o.Comparer
    1094           1 :                 r.Compare = comparer.Compare
    1095           1 :                 r.Equal = comparer.Equal
    1096           1 :                 r.Split = comparer.Split
    1097           1 :         } else {
    1098           1 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
    1099           1 :                         errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
    1100           1 :         }
    1101             : 
    1102           2 :         if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
    1103           2 :                 if o.Merger != nil && o.Merger.Name == mergerName {
    1104           2 :                         // opts.Merger matches.
    1105           2 :                 } else if _, ok := o.Mergers[mergerName]; ok {
    1106           1 :                         // Known merger.
    1107           1 :                 } else {
    1108           1 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
    1109           1 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
    1110           1 :                 }
    1111             :         }
    1112             : 
    1113           2 :         if r.tableFormat.BlockColumnar() {
    1114           2 :                 if ks, ok := o.KeySchemas[r.Properties.KeySchemaName]; ok {
    1115           2 :                         r.keySchema = ks
    1116           2 :                 } else {
    1117           0 :                         var known []string
    1118           0 :                         for name := range o.KeySchemas {
    1119           0 :                                 known = append(known, fmt.Sprintf("%q", name))
    1120           0 :                         }
    1121           0 :                         slices.Sort(known)
    1122           0 : 
    1123           0 :                         r.err = errors.Newf("pebble/table: %d: unknown key schema %q; known key schemas: %s",
    1124           0 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.KeySchemaName), errors.Safe(known))
    1125           0 :                         panic(r.err)
    1126             :                 }
    1127             :         }
    1128             : 
    1129           2 :         if r.err != nil {
    1130           1 :                 return nil, r.Close()
    1131           1 :         }
    1132             : 
    1133           2 :         return r, nil
    1134             : }
    1135             : 
    1136             : // ReadableFile describes the smallest subset of vfs.File that is required for
    1137             : // reading SSTs.
    1138             : type ReadableFile interface {
    1139             :         io.ReaderAt
    1140             :         io.Closer
    1141             :         Stat() (vfs.FileInfo, error)
    1142             : }
    1143             : 
    1144             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
    1145             : // implementation (which does not support read-ahead)
    1146           2 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
    1147           2 :         info, err := r.Stat()
    1148           2 :         if err != nil {
    1149           1 :                 return nil, err
    1150           1 :         }
    1151           2 :         res := &simpleReadable{
    1152           2 :                 f:    r,
    1153           2 :                 size: info.Size(),
    1154           2 :         }
    1155           2 :         res.rh = objstorage.MakeNoopReadHandle(res)
    1156           2 :         return res, nil
    1157             : }
    1158             : 
    1159             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
    1160             : type simpleReadable struct {
    1161             :         f    ReadableFile
    1162             :         size int64
    1163             :         rh   objstorage.NoopReadHandle
    1164             : }
    1165             : 
    1166             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1167             : 
    1168             : // ReadAt is part of the objstorage.Readable interface.
    1169           2 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1170           2 :         n, err := s.f.ReadAt(p, off)
    1171           2 :         if invariants.Enabled && err == nil && n != len(p) {
    1172           0 :                 panic("short read")
    1173             :         }
    1174           2 :         return err
    1175             : }
    1176             : 
    1177             : // Close is part of the objstorage.Readable interface.
    1178           2 : func (s *simpleReadable) Close() error {
    1179           2 :         return s.f.Close()
    1180           2 : }
    1181             : 
    1182             : // Size is part of the objstorage.Readable interface.
    1183           2 : func (s *simpleReadable) Size() int64 {
    1184           2 :         return s.size
    1185           2 : }
    1186             : 
    1187             : // NewReadHandle is part of the objstorage.Readable interface.
    1188             : func (s *simpleReadable) NewReadHandle(
    1189             :         readBeforeSize objstorage.ReadBeforeSize,
    1190           2 : ) objstorage.ReadHandle {
    1191           2 :         return &s.rh
    1192           2 : }
    1193             : 
    1194           0 : func errCorruptIndexEntry(err error) error {
    1195           0 :         err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
    1196           0 :         if invariants.Enabled {
    1197           0 :                 panic(err)
    1198             :         }
    1199           0 :         return err
    1200             : }
    1201             : 
    1202             : type deterministicStopwatchForTesting struct {
    1203             :         startTime crtime.Mono
    1204             : }
    1205             : 
    1206           2 : func makeStopwatch() deterministicStopwatchForTesting {
    1207           2 :         return deterministicStopwatchForTesting{startTime: crtime.NowMono()}
    1208           2 : }
    1209             : 
    1210           2 : func (w deterministicStopwatchForTesting) stop() time.Duration {
    1211           2 :         dur := w.startTime.Elapsed()
    1212           2 :         if deterministicReadBlockDurationForTesting {
    1213           1 :                 dur = slowReadTracingThreshold
    1214           1 :         }
    1215           2 :         return dur
    1216             : }
    1217             : 
    1218             : // MakeTrivialReaderProvider creates a valblk.ReaderProvider which always
    1219             : // returns the given reader. It should be used when the Reader will outlive the
    1220             : // iterator tree.
    1221           2 : func MakeTrivialReaderProvider(r *Reader) valblk.ReaderProvider {
    1222           2 :         return (*trivialReaderProvider)(r)
    1223           2 : }
    1224             : 
    1225             : // trivialReaderProvider implements valblk.ReaderProvider for a Reader that will
    1226             : // outlive the top-level iterator in the iterator tree.
    1227             : //
    1228             : // Defining the type in this manner (as opposed to a struct) avoids allocation.
    1229             : type trivialReaderProvider Reader
    1230             : 
    1231             : var _ valblk.ReaderProvider = (*trivialReaderProvider)(nil)
    1232             : 
    1233             : // GetReader implements ReaderProvider.
    1234             : func (trp *trivialReaderProvider) GetReader(
    1235             :         ctx context.Context,
    1236           1 : ) (valblk.ExternalBlockReader, error) {
    1237           1 :         return (*Reader)(trp), nil
    1238           1 : }
    1239             : 
    1240             : // Close implements ReaderProvider.
    1241           1 : func (trp *trivialReaderProvider) Close() {}

Generated by: LCOV version 1.14