LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2024-12-30 08:17Z 87a5141c - tests only.lcov Lines: 684 793 86.3 %
Date: 2024-12-30 08:18:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "path/filepath"
      14             :         "runtime"
      15             :         "slices"
      16             :         "time"
      17             : 
      18             :         "github.com/cespare/xxhash/v2"
      19             :         "github.com/cockroachdb/crlib/crtime"
      20             :         "github.com/cockroachdb/crlib/fifo"
      21             :         "github.com/cockroachdb/errors"
      22             :         "github.com/cockroachdb/pebble/internal/base"
      23             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      24             :         "github.com/cockroachdb/pebble/internal/cache"
      25             :         "github.com/cockroachdb/pebble/internal/crc"
      26             :         "github.com/cockroachdb/pebble/internal/invariants"
      27             :         "github.com/cockroachdb/pebble/internal/keyspan"
      28             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      29             :         "github.com/cockroachdb/pebble/objstorage"
      30             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      31             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      32             :         "github.com/cockroachdb/pebble/sstable/block"
      33             :         "github.com/cockroachdb/pebble/sstable/colblk"
      34             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      35             :         "github.com/cockroachdb/pebble/sstable/valblk"
      36             :         "github.com/cockroachdb/pebble/vfs"
      37             : )
      38             : 
      39             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      40             : 
      41             : type loadBlockResult int8
      42             : 
      43             : const (
      44             :         loadBlockOK loadBlockResult = iota
      45             :         // Could be due to error or because no block left to load.
      46             :         loadBlockFailed
      47             :         loadBlockIrrelevant
      48             : )
      49             : 
      50             : // Reader is a table reader.
      51             : type Reader struct {
      52             :         readable objstorage.Readable
      53             : 
      54             :         // The following fields are copied from the ReadOptions.
      55             :         cacheOpts            sstableinternal.CacheOptions
      56             :         keySchema            *colblk.KeySchema
      57             :         loadBlockSema        *fifo.Semaphore
      58             :         deniedUserProperties map[string]struct{}
      59             :         filterMetricsTracker *FilterMetricsTracker
      60             :         logger               base.LoggerAndTracer
      61             : 
      62             :         Comparer *base.Comparer
      63             :         Compare  Compare
      64             :         Equal    Equal
      65             :         Split    Split
      66             : 
      67             :         tableFilter *tableFilterReader
      68             : 
      69             :         err error
      70             : 
      71             :         indexBH      block.Handle
      72             :         filterBH     block.Handle
      73             :         rangeDelBH   block.Handle
      74             :         rangeKeyBH   block.Handle
      75             :         valueBIH     valblk.IndexHandle
      76             :         propertiesBH block.Handle
      77             :         metaindexBH  block.Handle
      78             :         footerBH     block.Handle
      79             : 
      80             :         Properties   Properties
      81             :         tableFormat  TableFormat
      82             :         checksumType block.ChecksumType
      83             : 
      84             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
      85             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
      86             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
      87             :         // capacity 3 to accommodate the meta block (1), and both the compressed
      88             :         // properties block (1) and decompressed properties block (1)
      89             :         // simultaneously.
      90             :         metaBufferPool      block.BufferPool
      91             :         metaBufferPoolAlloc [3]block.AllocedBuffer
      92             : }
      93             : 
      94             : var _ CommonReader = (*Reader)(nil)
      95             : 
      96             : // Close the reader and the underlying objstorage.Readable.
      97           1 : func (r *Reader) Close() error {
      98           1 :         r.cacheOpts.Cache.Unref()
      99           1 : 
     100           1 :         if r.readable != nil {
     101           1 :                 r.err = firstError(r.err, r.readable.Close())
     102           1 :                 r.readable = nil
     103           1 :         }
     104             : 
     105           1 :         if r.err != nil {
     106           1 :                 return r.err
     107           1 :         }
     108             :         // Make any future calls to Get, NewIter or Close return an error.
     109           1 :         r.err = errReaderClosed
     110           1 :         return nil
     111             : }
     112             : 
     113             : // NewPointIter returns an iterator for the point keys in the table.
     114             : //
     115             : // If transform.HideObsoletePoints is set, the callee assumes that filterer
     116             : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
     117             : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     118             : func (r *Reader) NewPointIter(
     119             :         ctx context.Context,
     120             :         transforms IterTransforms,
     121             :         lower, upper []byte,
     122             :         filterer *BlockPropertiesFilterer,
     123             :         filterBlockSizeLimit FilterBlockSizeLimit,
     124             :         stats *base.InternalIteratorStats,
     125             :         statsAccum IterStatsAccumulator,
     126             :         rp valblk.ReaderProvider,
     127           1 : ) (Iterator, error) {
     128           1 :         return r.newPointIter(
     129           1 :                 ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
     130           1 :                 stats, statsAccum, rp, nil)
     131           1 : }
     132             : 
     133             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     134             : // before the call to NewPointIter, to get the value of hideObsoletePoints and
     135             : // potentially add a block property filter.
     136             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     137             :         snapshotForHideObsoletePoints base.SeqNum,
     138             :         fileLargestSeqNum base.SeqNum,
     139             :         pointKeyFilters []BlockPropertyFilter,
     140           1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     141           1 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     142           1 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     143           1 :         if hideObsoletePoints {
     144           1 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     145           1 :         }
     146           1 :         return hideObsoletePoints, pointKeyFilters
     147             : }
     148             : 
     149             : func (r *Reader) newPointIter(
     150             :         ctx context.Context,
     151             :         transforms IterTransforms,
     152             :         lower, upper []byte,
     153             :         filterer *BlockPropertiesFilterer,
     154             :         filterBlockSizeLimit FilterBlockSizeLimit,
     155             :         stats *base.InternalIteratorStats,
     156             :         statsAccum IterStatsAccumulator,
     157             :         rp valblk.ReaderProvider,
     158             :         vState *virtualState,
     159           1 : ) (Iterator, error) {
     160           1 :         // NB: pebble.fileCache wraps the returned iterator with one which performs
     161           1 :         // reference counting on the Reader, preventing the Reader from being closed
     162           1 :         // until the final iterator closes.
     163           1 :         var res Iterator
     164           1 :         var err error
     165           1 :         if r.Properties.IndexType == twoLevelIndex {
     166           1 :                 if r.tableFormat.BlockColumnar() {
     167           1 :                         res, err = newColumnBlockTwoLevelIterator(
     168           1 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     169           1 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     170           1 :                 } else {
     171           1 :                         res, err = newRowBlockTwoLevelIterator(
     172           1 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     173           1 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     174           1 :                 }
     175           1 :         } else {
     176           1 :                 if r.tableFormat.BlockColumnar() {
     177           1 :                         res, err = newColumnBlockSingleLevelIterator(
     178           1 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     179           1 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     180           1 :                 } else {
     181           1 :                         res, err = newRowBlockSingleLevelIterator(
     182           1 :                                 ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     183           1 :                                 stats, statsAccum, rp, nil /* bufferPool */)
     184           1 :                 }
     185             :         }
     186           1 :         if err != nil {
     187           1 :                 // Note: we don't want to return res here - it will be a nil
     188           1 :                 // single/twoLevelIterator, not a nil Iterator.
     189           1 :                 return nil, err
     190           1 :         }
     191           1 :         return res, nil
     192             : }
     193             : 
     194             : // NewIter returns an iterator for the point keys in the table. It is a
     195             : // simplified version of NewPointIter and should only be used for tests and
     196             : // tooling.
     197             : //
     198             : // NewIter must only be used when the Reader is guaranteed to outlive any
     199             : // LazyValues returned from the iter.
     200           1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
     201           1 :         // TODO(radu): we should probably not use bloom filters in this case, as there
     202           1 :         // likely isn't a cache set up.
     203           1 :         return r.NewPointIter(
     204           1 :                 context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock,
     205           1 :                 nil /* stats */, nil /* statsAccum */, MakeTrivialReaderProvider(r))
     206           1 : }
     207             : 
     208             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     209             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     210             : // after itself and returns a nil iterator.
     211             : func (r *Reader) NewCompactionIter(
     212             :         transforms IterTransforms,
     213             :         statsAccum IterStatsAccumulator,
     214             :         rp valblk.ReaderProvider,
     215             :         bufferPool *block.BufferPool,
     216           1 : ) (Iterator, error) {
     217           1 :         return r.newCompactionIter(transforms, statsAccum, rp, nil, bufferPool)
     218           1 : }
     219             : 
     220             : func (r *Reader) newCompactionIter(
     221             :         transforms IterTransforms,
     222             :         statsAccum IterStatsAccumulator,
     223             :         rp valblk.ReaderProvider,
     224             :         vState *virtualState,
     225             :         bufferPool *block.BufferPool,
     226           1 : ) (Iterator, error) {
     227           1 :         if vState != nil && vState.isSharedIngested {
     228           1 :                 transforms.HideObsoletePoints = true
     229           1 :         }
     230           1 :         if r.Properties.IndexType == twoLevelIndex {
     231           1 :                 if !r.tableFormat.BlockColumnar() {
     232           1 :                         i, err := newRowBlockTwoLevelIterator(
     233           1 :                                 context.Background(),
     234           1 :                                 r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     235           1 :                                 NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     236           1 :                         if err != nil {
     237           0 :                                 return nil, err
     238           0 :                         }
     239           1 :                         i.SetupForCompaction()
     240           1 :                         return i, nil
     241             :                 }
     242           1 :                 i, err := newColumnBlockTwoLevelIterator(
     243           1 :                         context.Background(),
     244           1 :                         r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     245           1 :                         NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     246           1 :                 if err != nil {
     247           0 :                         return nil, err
     248           0 :                 }
     249           1 :                 i.SetupForCompaction()
     250           1 :                 return i, nil
     251             :         }
     252           1 :         if !r.tableFormat.BlockColumnar() {
     253           1 :                 i, err := newRowBlockSingleLevelIterator(
     254           1 :                         context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     255           1 :                         nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     256           1 :                 if err != nil {
     257           0 :                         return nil, err
     258           0 :                 }
     259           1 :                 i.SetupForCompaction()
     260           1 :                 return i, nil
     261             :         }
     262           1 :         i, err := newColumnBlockSingleLevelIterator(
     263           1 :                 context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     264           1 :                 nil, NeverUseFilterBlock, nil /* stats */, statsAccum, rp, bufferPool)
     265           1 :         if err != nil {
     266           0 :                 return nil, err
     267           0 :         }
     268           1 :         i.SetupForCompaction()
     269           1 :         return i, nil
     270             : }
     271             : 
     272             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     273             : // range-del block for the table. Returns nil if the table does not contain
     274             : // any range deletions.
     275             : func (r *Reader) NewRawRangeDelIter(
     276             :         ctx context.Context, transforms FragmentIterTransforms,
     277           1 : ) (iter keyspan.FragmentIterator, err error) {
     278           1 :         if r.rangeDelBH.Length == 0 {
     279           1 :                 return nil, nil
     280           1 :         }
     281             :         // TODO(radu): plumb stats here.
     282           1 :         h, err := r.readRangeDelBlock(ctx, noEnv, noReadHandle, r.rangeDelBH)
     283           1 :         if err != nil {
     284           1 :                 return nil, err
     285           1 :         }
     286           1 :         if r.tableFormat.BlockColumnar() {
     287           1 :                 iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
     288           1 :         } else {
     289           1 :                 iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Comparer, h, transforms)
     290           1 :                 if err != nil {
     291           0 :                         return nil, err
     292           0 :                 }
     293             :         }
     294           1 :         return keyspan.MaybeAssert(iter, r.Compare), nil
     295             : }
     296             : 
     297             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     298             : // range-key block for the table. Returns nil if the table does not contain any
     299             : // range keys.
     300             : func (r *Reader) NewRawRangeKeyIter(
     301             :         ctx context.Context, transforms FragmentIterTransforms,
     302           1 : ) (iter keyspan.FragmentIterator, err error) {
     303           1 :         if r.rangeKeyBH.Length == 0 {
     304           1 :                 return nil, nil
     305           1 :         }
     306             :         // TODO(radu): plumb stats here.
     307           1 :         h, err := r.readRangeKeyBlock(ctx, noEnv, noReadHandle, r.rangeKeyBH)
     308           1 :         if err != nil {
     309           1 :                 return nil, err
     310           1 :         }
     311           1 :         if r.tableFormat.BlockColumnar() {
     312           1 :                 iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
     313           1 :         } else {
     314           1 :                 iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Comparer, h, transforms)
     315           1 :                 if err != nil {
     316           0 :                         return nil, err
     317           0 :                 }
     318             :         }
     319           1 :         return keyspan.MaybeAssert(iter, r.Compare), nil
     320             : }
     321             : 
     322             : // readBlockEnv contains arguments used when reading a block which apply to all
     323             : // the block reads performed by a higher-level operation.
     324             : type readBlockEnv struct {
     325             :         // stats and iterStats are slightly different. stats is a shared struct
     326             :         // supplied from the outside, and represents stats for the whole iterator
     327             :         // tree and can be reset from the outside (e.g. when the pebble.Iterator is
     328             :         // being reused). It is currently only provided when the iterator tree is
     329             :         // rooted at pebble.Iterator. iterStats contains an sstable iterator's
     330             :         // private stats that are reported to a CategoryStatsCollector when this
     331             :         // iterator is closed. In the important code paths, the CategoryStatsCollector
     332             :         // is managed by the fileCacheContainer.
     333             :         Stats     *base.InternalIteratorStats
     334             :         IterStats *iterStatsAccumulator
     335             : 
     336             :         // BufferPool is not-nil if we read blocks into a buffer pool and not into the
     337             :         // cache. This is used during compactions.
     338             :         BufferPool *block.BufferPool
     339             : }
     340             : 
     341             : // BlockServedFromCache updates the stats when a block was found in the cache.
     342           1 : func (env *readBlockEnv) BlockServedFromCache(blockLength uint64) {
     343           1 :         if env.Stats != nil {
     344           1 :                 env.Stats.BlockBytes += blockLength
     345           1 :                 env.Stats.BlockBytesInCache += blockLength
     346           1 :         }
     347           1 :         if env.IterStats != nil {
     348           1 :                 env.IterStats.reportStats(blockLength, blockLength, 0)
     349           1 :         }
     350             : }
     351             : 
     352             : // BlockRead updates the stats when a block had to be read.
     353           1 : func (env *readBlockEnv) BlockRead(blockLength uint64, readDuration time.Duration) {
     354           1 :         if env.Stats != nil {
     355           1 :                 env.Stats.BlockBytes += blockLength
     356           1 :                 env.Stats.BlockReadDuration += readDuration
     357           1 :         }
     358           1 :         if env.IterStats != nil {
     359           1 :                 env.IterStats.reportStats(blockLength, 0, readDuration)
     360           1 :         }
     361             : }
     362             : 
     363             : // noEnv is the empty readBlockEnv which reports no stats and does not use a
     364             : // buffer pool.
     365             : var noEnv = readBlockEnv{}
     366             : 
     367             : // noReadHandle is used when we don't want to pass a ReadHandle to one of the
     368             : // read block methods.
     369             : var noReadHandle objstorage.ReadHandle = nil
     370             : 
     371           1 : var noInitBlockMetadataFn = func(*block.Metadata, []byte) error { return nil }
     372             : 
     373             : // readMetaindexBlock reads the metaindex block.
     374             : func (r *Reader) readMetaindexBlock(
     375             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle,
     376           1 : ) (block.BufferHandle, error) {
     377           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     378           1 :         return r.readBlockInternal(ctx, env, readHandle, r.metaindexBH, noInitBlockMetadataFn)
     379           1 : }
     380             : 
     381             : // readTopLevelIndexBlock reads the top-level index block.
     382             : func (r *Reader) readTopLevelIndexBlock(
     383             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle,
     384           1 : ) (block.BufferHandle, error) {
     385           1 :         return r.readIndexBlock(ctx, env, readHandle, r.indexBH)
     386           1 : }
     387             : 
     388             : // readIndexBlock reads a top-level or second-level index block.
     389             : func (r *Reader) readIndexBlock(
     390             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     391           1 : ) (block.BufferHandle, error) {
     392           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     393           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initIndexBlockMetadata)
     394           1 : }
     395             : 
     396             : // initIndexBlockMetadata initializes the Metadata for a data block. This will
     397             : // later be used (and reused) when reading from the block.
     398           1 : func (r *Reader) initIndexBlockMetadata(metadata *block.Metadata, data []byte) error {
     399           1 :         if r.tableFormat.BlockColumnar() {
     400           1 :                 return colblk.InitIndexBlockMetadata(metadata, data)
     401           1 :         }
     402           1 :         return nil
     403             : }
     404             : 
     405             : func (r *Reader) readDataBlock(
     406             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     407           1 : ) (block.BufferHandle, error) {
     408           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.DataBlock)
     409           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initDataBlockMetadata)
     410           1 : }
     411             : 
     412             : // initDataBlockMetadata initializes the Metadata for a data block. This will
     413             : // later be used (and reused) when reading from the block.
     414           1 : func (r *Reader) initDataBlockMetadata(metadata *block.Metadata, data []byte) error {
     415           1 :         if r.tableFormat.BlockColumnar() {
     416           1 :                 return colblk.InitDataBlockMetadata(r.keySchema, metadata, data)
     417           1 :         }
     418           1 :         return nil
     419             : }
     420             : 
     421             : func (r *Reader) readFilterBlock(
     422             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     423           1 : ) (block.BufferHandle, error) {
     424           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     425           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, noInitBlockMetadataFn)
     426           1 : }
     427             : 
     428             : func (r *Reader) readRangeDelBlock(
     429             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     430           1 : ) (block.BufferHandle, error) {
     431           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     432           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initKeyspanBlockMetadata)
     433           1 : }
     434             : 
     435             : func (r *Reader) readRangeKeyBlock(
     436             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     437           1 : ) (block.BufferHandle, error) {
     438           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     439           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, r.initKeyspanBlockMetadata)
     440           1 : }
     441             : 
     442             : // initKeyspanBlockMetadata initializes the Metadata for a rangedel or range key
     443             : // block. This will later be used (and reused) when reading from the block.
     444           1 : func (r *Reader) initKeyspanBlockMetadata(metadata *block.Metadata, data []byte) error {
     445           1 :         if r.tableFormat.BlockColumnar() {
     446           1 :                 return colblk.InitKeyspanBlockMetadata(metadata, data)
     447           1 :         }
     448           1 :         return nil
     449             : }
     450             : 
     451             : // ReadValueBlockExternal implements valblk.ExternalBlockReader, allowing a
     452             : // base.LazyValue to read a value block.
     453             : func (r *Reader) ReadValueBlockExternal(
     454             :         ctx context.Context, bh block.Handle,
     455           1 : ) (block.BufferHandle, error) {
     456           1 :         return r.readValueBlock(ctx, noEnv, noReadHandle, bh)
     457           1 : }
     458             : 
     459             : func (r *Reader) readValueBlock(
     460             :         ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
     461           1 : ) (block.BufferHandle, error) {
     462           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.ValueBlock)
     463           1 :         return r.readBlockInternal(ctx, env, readHandle, bh, noInitBlockMetadataFn)
     464           1 : }
     465             : 
     466             : func checkChecksum(
     467             :         checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
     468           1 : ) error {
     469           1 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     470           1 :         var computedChecksum uint32
     471           1 :         switch checksumType {
     472           1 :         case block.ChecksumTypeCRC32c:
     473           1 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     474           1 :         case block.ChecksumTypeXXHash64:
     475           1 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     476           0 :         default:
     477           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     478             :         }
     479             : 
     480           1 :         if expectedChecksum != computedChecksum {
     481           1 :                 return base.CorruptionErrorf(
     482           1 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     483           1 :                         fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
     484           1 :         }
     485           1 :         return nil
     486             : }
     487             : 
     488             : // DeterministicReadBlockDurationForTesting is for tests that want a
     489             : // deterministic value of the time to read a block (that is not in the cache).
     490             : // The return value is a function that must be called before the test exits.
     491           1 : func DeterministicReadBlockDurationForTesting() func() {
     492           1 :         drbdForTesting := deterministicReadBlockDurationForTesting
     493           1 :         deterministicReadBlockDurationForTesting = true
     494           1 :         return func() {
     495           1 :                 deterministicReadBlockDurationForTesting = drbdForTesting
     496           1 :         }
     497             : }
     498             : 
     499             : var deterministicReadBlockDurationForTesting = false
     500             : 
     501             : // readBlockInternal should not be used directly; one of the read*Block methods
     502             : // should be used instead.
     503             : func (r *Reader) readBlockInternal(
     504             :         ctx context.Context,
     505             :         env readBlockEnv,
     506             :         readHandle objstorage.ReadHandle,
     507             :         bh block.Handle,
     508             :         initBlockMetadataFn func(*block.Metadata, []byte) error,
     509           1 : ) (handle block.BufferHandle, _ error) {
     510           1 :         var ch cache.Handle
     511           1 :         var crh cache.ReadHandle
     512           1 :         hit := true
     513           1 :         if env.BufferPool == nil {
     514           1 :                 var errorDuration time.Duration
     515           1 :                 var err error
     516           1 :                 ch, crh, errorDuration, hit, err = r.cacheOpts.Cache.GetWithReadHandle(
     517           1 :                         ctx, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     518           1 :                 if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) {
     519           0 :                         r.logger.Eventf(
     520           0 :                                 ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String())
     521           0 :                 }
     522             :                 // TODO(sumeer): consider tracing when waited longer than some duration
     523             :                 // for turn to do the read.
     524           1 :                 if err != nil {
     525           0 :                         return block.BufferHandle{}, err
     526           0 :                 }
     527           1 :         } else {
     528           1 :                 // The compaction path uses env.BufferPool, and does not coordinate read
     529           1 :                 // using a cache.ReadHandle. This is ok since only a single compaction is
     530           1 :                 // reading a block.
     531           1 :                 ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     532           1 :                 if ch.Valid() {
     533           1 :                         hit = true
     534           1 :                 }
     535             :         }
     536             :         // INVARIANT: hit => ch.Valid()
     537           1 :         if ch.Valid() {
     538           1 :                 if hit {
     539           1 :                         // Cache hit.
     540           1 :                         if readHandle != nil {
     541           1 :                                 readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
     542           1 :                         }
     543           1 :                         env.BlockServedFromCache(bh.Length)
     544             :                 }
     545           1 :                 if invariants.Enabled && crh.Valid() {
     546           0 :                         panic("cache.ReadHandle must not be valid")
     547             :                 }
     548           1 :                 return block.CacheBufferHandle(ch), nil
     549             :         }
     550             : 
     551             :         // Need to read. First acquire loadBlockSema, if needed.
     552           1 :         if sema := r.loadBlockSema; sema != nil {
     553           1 :                 if err := sema.Acquire(ctx, 1); err != nil {
     554           0 :                         // An error here can only come from the context.
     555           0 :                         return block.BufferHandle{}, err
     556           0 :                 }
     557           1 :                 defer sema.Release(1)
     558             :         }
     559           1 :         value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn)
     560           1 :         if err != nil {
     561           1 :                 if crh.Valid() {
     562           1 :                         crh.SetReadError(err)
     563           1 :                 }
     564           1 :                 return block.BufferHandle{}, err
     565             :         }
     566           1 :         h := value.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     567           1 :         return h, nil
     568             : }
     569             : 
     570             : // doRead is a helper for readBlockInternal that does the read, checksum
     571             : // check, decompression, and returns either a block.Value or an error.
     572             : func (r *Reader) doRead(
     573             :         ctx context.Context,
     574             :         env readBlockEnv,
     575             :         readHandle objstorage.ReadHandle,
     576             :         bh block.Handle,
     577             :         initBlockMetadataFn func(*block.Metadata, []byte) error,
     578           1 : ) (block.Value, error) {
     579           1 :         compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool)
     580           1 :         readStopwatch := makeStopwatch()
     581           1 :         var err error
     582           1 :         if readHandle != nil {
     583           1 :                 err = readHandle.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
     584           1 :         } else {
     585           1 :                 err = r.readable.ReadAt(ctx, compressed.BlockData(), int64(bh.Offset))
     586           1 :         }
     587           1 :         readDuration := readStopwatch.stop()
     588           1 :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     589           1 :         // interface{}, unless necessary.
     590           1 :         if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
     591           1 :                 _, file1, line1, _ := runtime.Caller(1)
     592           1 :                 _, file2, line2, _ := runtime.Caller(2)
     593           1 :                 r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
     594           1 :                         int(bh.Length+block.TrailerLen), readDuration.String(),
     595           1 :                         r.cacheOpts.FileNum,
     596           1 :                         filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
     597           1 :                         filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
     598           1 :         }
     599           1 :         if err != nil {
     600           1 :                 compressed.Release()
     601           1 :                 return block.Value{}, err
     602           1 :         }
     603           1 :         env.BlockRead(bh.Length, readDuration)
     604           1 :         if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil {
     605           1 :                 compressed.Release()
     606           1 :                 return block.Value{}, err
     607           1 :         }
     608           1 :         typ := block.CompressionIndicator(compressed.BlockData()[bh.Length])
     609           1 :         compressed.Truncate(int(bh.Length))
     610           1 :         var decompressed block.Value
     611           1 :         if typ == block.NoCompressionIndicator {
     612           1 :                 decompressed = compressed
     613           1 :         } else {
     614           1 :                 // Decode the length of the decompressed value.
     615           1 :                 decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData())
     616           1 :                 if err != nil {
     617           0 :                         compressed.Release()
     618           0 :                         return block.Value{}, err
     619           0 :                 }
     620           1 :                 decompressed = block.Alloc(decodedLen, env.BufferPool)
     621           1 :                 err = block.DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData())
     622           1 :                 compressed.Release()
     623           1 :                 if err != nil {
     624           0 :                         decompressed.Release()
     625           0 :                         return block.Value{}, err
     626           0 :                 }
     627             :         }
     628           1 :         if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil {
     629           0 :                 decompressed.Release()
     630           0 :                 return block.Value{}, err
     631           0 :         }
     632           1 :         return decompressed, nil
     633             : }
     634             : 
     635             : func (r *Reader) readMetaindex(
     636             :         ctx context.Context, readHandle objstorage.ReadHandle, filters map[string]FilterPolicy,
     637           1 : ) error {
     638           1 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     639           1 :         // populating the block cache with these blocks. In heavy-write workloads,
     640           1 :         // especially with high compaction concurrency, new tables may be created
     641           1 :         // frequently. Populating the block cache with these metaindex blocks adds
     642           1 :         // additional contention on the block cache mutexes (see #1997).
     643           1 :         // Additionally, these blocks are exceedingly unlikely to be read again
     644           1 :         // while they're still in the block cache except in misconfigurations with
     645           1 :         // excessive sstables counts or a file cache that's far too small.
     646           1 :         r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
     647           1 :         // When we're finished, release the buffers we've allocated back to memory
     648           1 :         // allocator. We don't expect to use metaBufferPool again.
     649           1 :         defer r.metaBufferPool.Release()
     650           1 :         metaEnv := readBlockEnv{
     651           1 :                 BufferPool: &r.metaBufferPool,
     652           1 :         }
     653           1 : 
     654           1 :         b, err := r.readMetaindexBlock(ctx, metaEnv, readHandle)
     655           1 :         if err != nil {
     656           1 :                 return err
     657           1 :         }
     658           1 :         data := b.BlockData()
     659           1 :         defer b.Release()
     660           1 : 
     661           1 :         if uint64(len(data)) != r.metaindexBH.Length {
     662           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     663           0 :                         errors.Safe(len(data)), errors.Safe(r.metaindexBH.Length))
     664           0 :         }
     665             : 
     666           1 :         var meta map[string]block.Handle
     667           1 :         meta, r.valueBIH, err = decodeMetaindex(data)
     668           1 :         if err != nil {
     669           0 :                 return err
     670           0 :         }
     671             : 
     672           1 :         if bh, ok := meta[metaPropertiesName]; ok {
     673           1 :                 b, err = r.readBlockInternal(ctx, metaEnv, readHandle, bh, noInitBlockMetadataFn)
     674           1 :                 if err != nil {
     675           1 :                         return err
     676           1 :                 }
     677           1 :                 r.propertiesBH = bh
     678           1 :                 err := r.Properties.load(b.BlockData(), r.deniedUserProperties)
     679           1 :                 b.Release()
     680           1 :                 if err != nil {
     681           0 :                         return err
     682           0 :                 }
     683             :         }
     684             : 
     685           1 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     686           1 :                 r.rangeDelBH = bh
     687           1 :         } else if _, ok := meta[metaRangeDelV1Name]; ok {
     688           0 :                 // This version of Pebble requires a format major version at least as
     689           0 :                 // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
     690           0 :                 // this format major verison, we have a guarantee that we've compacted
     691           0 :                 // away all RocksDB sstables. It should not be possible to encounter an
     692           0 :                 // sstable with a v1 range deletion block but not a v2 range deletion
     693           0 :                 // block.
     694           0 :                 err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
     695           0 :                 return errors.Mark(err, base.ErrCorruption)
     696           0 :         }
     697             : 
     698           1 :         if bh, ok := meta[metaRangeKeyName]; ok {
     699           1 :                 r.rangeKeyBH = bh
     700           1 :         }
     701             : 
     702           1 :         for name, fp := range filters {
     703           1 :                 if bh, ok := meta["fullfilter."+name]; ok {
     704           1 :                         r.filterBH = bh
     705           1 :                         r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
     706           1 :                         break
     707             :                 }
     708             :         }
     709           1 :         return nil
     710             : }
     711             : 
     712             : // Layout returns the layout (block organization) for an sstable.
     713           1 : func (r *Reader) Layout() (*Layout, error) {
     714           1 :         if r.err != nil {
     715           0 :                 return nil, r.err
     716           0 :         }
     717             : 
     718           1 :         l := &Layout{
     719           1 :                 Data:       make([]block.HandleWithProperties, 0, r.Properties.NumDataBlocks),
     720           1 :                 RangeDel:   r.rangeDelBH,
     721           1 :                 RangeKey:   r.rangeKeyBH,
     722           1 :                 ValueIndex: r.valueBIH.Handle,
     723           1 :                 Properties: r.propertiesBH,
     724           1 :                 MetaIndex:  r.metaindexBH,
     725           1 :                 Footer:     r.footerBH,
     726           1 :                 Format:     r.tableFormat,
     727           1 :         }
     728           1 :         if r.filterBH.Length > 0 {
     729           1 :                 l.Filter = []NamedBlockHandle{{Name: "fullfilter." + r.tableFilter.policy.Name(), Handle: r.filterBH}}
     730           1 :         }
     731           1 :         ctx := context.TODO()
     732           1 : 
     733           1 :         indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, noReadHandle)
     734           1 :         if err != nil {
     735           1 :                 return nil, err
     736           1 :         }
     737           1 :         defer indexH.Release()
     738           1 : 
     739           1 :         var alloc bytealloc.A
     740           1 : 
     741           1 :         if r.Properties.IndexPartitions == 0 {
     742           1 :                 l.Index = append(l.Index, r.indexBH)
     743           1 :                 iter := r.tableFormat.newIndexIter()
     744           1 :                 err := iter.Init(r.Comparer, indexH.BlockData(), NoTransforms)
     745           1 :                 if err != nil {
     746           0 :                         return nil, errors.Wrap(err, "reading index block")
     747           0 :                 }
     748           1 :                 for valid := iter.First(); valid; valid = iter.Next() {
     749           1 :                         dataBH, err := iter.BlockHandleWithProperties()
     750           1 :                         if err != nil {
     751           0 :                                 return nil, errCorruptIndexEntry(err)
     752           0 :                         }
     753           1 :                         if len(dataBH.Props) > 0 {
     754           1 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     755           1 :                         }
     756           1 :                         l.Data = append(l.Data, dataBH)
     757             :                 }
     758           1 :         } else {
     759           1 :                 l.TopIndex = r.indexBH
     760           1 :                 topIter := r.tableFormat.newIndexIter()
     761           1 :                 err := topIter.Init(r.Comparer, indexH.BlockData(), NoTransforms)
     762           1 :                 if err != nil {
     763           0 :                         return nil, errors.Wrap(err, "reading index block")
     764           0 :                 }
     765           1 :                 iter := r.tableFormat.newIndexIter()
     766           1 :                 for valid := topIter.First(); valid; valid = topIter.Next() {
     767           1 :                         indexBH, err := topIter.BlockHandleWithProperties()
     768           1 :                         if err != nil {
     769           0 :                                 return nil, errCorruptIndexEntry(err)
     770           0 :                         }
     771           1 :                         l.Index = append(l.Index, indexBH.Handle)
     772           1 : 
     773           1 :                         subIndex, err := r.readIndexBlock(ctx, noEnv, noReadHandle, indexBH.Handle)
     774           1 :                         if err != nil {
     775           1 :                                 return nil, err
     776           1 :                         }
     777           1 :                         err = func() error {
     778           1 :                                 defer subIndex.Release()
     779           1 :                                 // TODO(msbutler): figure out how to pass virtualState to layout call.
     780           1 :                                 if err := iter.Init(r.Comparer, subIndex.BlockData(), NoTransforms); err != nil {
     781           0 :                                         return err
     782           0 :                                 }
     783           1 :                                 for valid := iter.First(); valid; valid = iter.Next() {
     784           1 :                                         dataBH, err := iter.BlockHandleWithProperties()
     785           1 :                                         if err != nil {
     786           0 :                                                 return errCorruptIndexEntry(err)
     787           0 :                                         }
     788           1 :                                         if len(dataBH.Props) > 0 {
     789           1 :                                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     790           1 :                                         }
     791           1 :                                         l.Data = append(l.Data, dataBH)
     792             :                                 }
     793           1 :                                 return nil
     794             :                         }()
     795           1 :                         if err != nil {
     796           0 :                                 return nil, err
     797           0 :                         }
     798             :                 }
     799             :         }
     800           1 :         if r.valueBIH.Handle.Length != 0 {
     801           1 :                 vbiH, err := r.readValueBlock(context.Background(), noEnv, noReadHandle, r.valueBIH.Handle)
     802           1 :                 if err != nil {
     803           0 :                         return nil, err
     804           0 :                 }
     805           1 :                 defer vbiH.Release()
     806           1 :                 l.ValueBlock, err = valblk.DecodeIndex(vbiH.BlockData(), r.valueBIH)
     807           1 :                 if err != nil {
     808           0 :                         return nil, err
     809           0 :                 }
     810             :         }
     811             : 
     812           1 :         return l, nil
     813             : }
     814             : 
     815             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
     816           1 : func (r *Reader) ValidateBlockChecksums() error {
     817           1 :         // Pre-compute the BlockHandles for the underlying file.
     818           1 :         l, err := r.Layout()
     819           1 :         if err != nil {
     820           1 :                 return err
     821           1 :         }
     822             : 
     823           1 :         type blk struct {
     824           1 :                 bh     block.Handle
     825           1 :                 readFn func(context.Context, readBlockEnv, objstorage.ReadHandle, block.Handle) (block.BufferHandle, error)
     826           1 :         }
     827           1 :         // Construct the set of blocks to check. Note that the footer is not checked
     828           1 :         // as it is not a block with a checksum.
     829           1 :         blocks := make([]blk, 0, len(l.Data)+6)
     830           1 :         for i := range l.Data {
     831           1 :                 blocks = append(blocks, blk{
     832           1 :                         bh:     l.Data[i].Handle,
     833           1 :                         readFn: r.readDataBlock,
     834           1 :                 })
     835           1 :         }
     836           1 :         for _, h := range l.Index {
     837           1 :                 blocks = append(blocks, blk{
     838           1 :                         bh:     h,
     839           1 :                         readFn: r.readIndexBlock,
     840           1 :                 })
     841           1 :         }
     842           1 :         blocks = append(blocks, blk{
     843           1 :                 bh:     l.TopIndex,
     844           1 :                 readFn: r.readIndexBlock,
     845           1 :         })
     846           1 :         for _, bh := range l.Filter {
     847           1 :                 blocks = append(blocks, blk{
     848           1 :                         bh:     bh.Handle,
     849           1 :                         readFn: r.readFilterBlock,
     850           1 :                 })
     851           1 :         }
     852           1 :         blocks = append(blocks, blk{
     853           1 :                 bh:     l.RangeDel,
     854           1 :                 readFn: r.readRangeDelBlock,
     855           1 :         })
     856           1 :         blocks = append(blocks, blk{
     857           1 :                 bh:     l.RangeKey,
     858           1 :                 readFn: r.readRangeKeyBlock,
     859           1 :         })
     860           1 :         readNoInit := func(ctx context.Context, env readBlockEnv, rh objstorage.ReadHandle, bh block.Handle) (block.BufferHandle, error) {
     861           1 :                 return r.readBlockInternal(ctx, env, rh, bh, noInitBlockMetadataFn)
     862           1 :         }
     863           1 :         blocks = append(blocks, blk{
     864           1 :                 bh:     l.Properties,
     865           1 :                 readFn: readNoInit,
     866           1 :         })
     867           1 :         blocks = append(blocks, blk{
     868           1 :                 bh:     l.MetaIndex,
     869           1 :                 readFn: readNoInit,
     870           1 :         })
     871           1 : 
     872           1 :         // Sorting by offset ensures we are performing a sequential scan of the
     873           1 :         // file.
     874           1 :         slices.SortFunc(blocks, func(a, b blk) int {
     875           1 :                 return cmp.Compare(a.bh.Offset, b.bh.Offset)
     876           1 :         })
     877             : 
     878           1 :         ctx := context.Background()
     879           1 :         for _, b := range blocks {
     880           1 :                 // Certain blocks may not be present, in which case we skip them.
     881           1 :                 if b.bh.Length == 0 {
     882           1 :                         continue
     883             :                 }
     884           1 :                 h, err := b.readFn(ctx, noEnv, noReadHandle, b.bh)
     885           1 :                 if err != nil {
     886           1 :                         return err
     887           1 :                 }
     888           1 :                 h.Release()
     889             :         }
     890             : 
     891           1 :         return nil
     892             : }
     893             : 
     894             : // CommonProperties implemented the CommonReader interface.
     895           1 : func (r *Reader) CommonProperties() *CommonProperties {
     896           1 :         return &r.Properties.CommonProperties
     897           1 : }
     898             : 
     899             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
     900             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
     901             : // determine overlap due to abbreviated index keys, the full data block size is
     902             : // included in the estimation.
     903             : //
     904             : // This function does not account for any metablock space usage. Assumes there
     905             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
     906             : // before nor completely after the file's range.
     907             : //
     908             : // Only blocks containing point keys are considered. Range deletion and range
     909             : // key blocks are not considered.
     910             : //
     911             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
     912             : // data blocks overlapped and add that same fraction of the metadata blocks to the
     913             : // estimate.
     914           1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
     915           1 :         if !r.tableFormat.BlockColumnar() {
     916           1 :                 return estimateDiskUsage[rowblk.IndexIter, *rowblk.IndexIter](r, start, end)
     917           1 :         }
     918           1 :         return estimateDiskUsage[colblk.IndexIter, *colblk.IndexIter](r, start, end)
     919             : }
     920             : 
     921             : func estimateDiskUsage[I any, PI indexBlockIterator[I]](
     922             :         r *Reader, start, end []byte,
     923           1 : ) (uint64, error) {
     924           1 :         if r.err != nil {
     925           0 :                 return 0, r.err
     926           0 :         }
     927           1 :         ctx := context.TODO()
     928           1 : 
     929           1 :         indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, noReadHandle)
     930           1 :         if err != nil {
     931           1 :                 return 0, err
     932           1 :         }
     933             :         // We are using InitHandle below but we never Close those iterators, which
     934             :         // allows us to release the index handle ourselves.
     935             :         // TODO(radu): clean this up.
     936           1 :         defer indexH.Release()
     937           1 : 
     938           1 :         // Iterators over the bottom-level index blocks containing start and end.
     939           1 :         // These may be different in case of partitioned index but will both point
     940           1 :         // to the same blockIter over the single index in the unpartitioned case.
     941           1 :         var startIdxIter, endIdxIter PI
     942           1 :         if r.Properties.IndexPartitions == 0 {
     943           1 :                 startIdxIter = new(I)
     944           1 :                 if err := startIdxIter.InitHandle(r.Comparer, indexH, NoTransforms); err != nil {
     945           0 :                         return 0, err
     946           0 :                 }
     947           1 :                 endIdxIter = startIdxIter
     948           1 :         } else {
     949           1 :                 var topIter PI = new(I)
     950           1 :                 if err := topIter.InitHandle(r.Comparer, indexH, NoTransforms); err != nil {
     951           0 :                         return 0, err
     952           0 :                 }
     953           1 :                 if !topIter.SeekGE(start) {
     954           0 :                         // The range falls completely after this file.
     955           0 :                         return 0, nil
     956           0 :                 }
     957           1 :                 startIndexBH, err := topIter.BlockHandleWithProperties()
     958           1 :                 if err != nil {
     959           0 :                         return 0, errCorruptIndexEntry(err)
     960           0 :                 }
     961           1 :                 startIdxBlock, err := r.readIndexBlock(ctx, noEnv, noReadHandle, startIndexBH.Handle)
     962           1 :                 if err != nil {
     963           1 :                         return 0, err
     964           1 :                 }
     965           1 :                 defer startIdxBlock.Release()
     966           1 :                 startIdxIter = new(I)
     967           1 :                 err = startIdxIter.InitHandle(r.Comparer, startIdxBlock, NoTransforms)
     968           1 :                 if err != nil {
     969           0 :                         return 0, err
     970           0 :                 }
     971             : 
     972           1 :                 if topIter.SeekGE(end) {
     973           1 :                         endIndexBH, err := topIter.BlockHandleWithProperties()
     974           1 :                         if err != nil {
     975           0 :                                 return 0, errCorruptIndexEntry(err)
     976           0 :                         }
     977           1 :                         endIdxBlock, err := r.readIndexBlock(ctx, noEnv, noReadHandle, endIndexBH.Handle)
     978           1 :                         if err != nil {
     979           1 :                                 return 0, err
     980           1 :                         }
     981           1 :                         defer endIdxBlock.Release()
     982           1 :                         endIdxIter = new(I)
     983           1 :                         err = endIdxIter.InitHandle(r.Comparer, endIdxBlock, NoTransforms)
     984           1 :                         if err != nil {
     985           0 :                                 return 0, err
     986           0 :                         }
     987             :                 }
     988             :         }
     989             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
     990             :         // range spans past the end of the file.
     991             : 
     992           1 :         if !startIdxIter.SeekGE(start) {
     993           1 :                 // The range falls completely after this file.
     994           1 :                 return 0, nil
     995           1 :         }
     996           1 :         startBH, err := startIdxIter.BlockHandleWithProperties()
     997           1 :         if err != nil {
     998           0 :                 return 0, errCorruptIndexEntry(err)
     999           0 :         }
    1000             : 
    1001           1 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
    1002           1 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
    1003           1 :                 // Linearly interpolate what is stored in value blocks.
    1004           1 :                 //
    1005           1 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
    1006           1 :                 // (which contain the value handles, and which may also be insufficient if
    1007           1 :                 // the values are in separate files), we will need to accumulate the
    1008           1 :                 // logical size of the key-value pairs and store the cumulative value for
    1009           1 :                 // each data block in the index block entry. This increases the size of
    1010           1 :                 // the BlockHandle, so wait until this becomes necessary.
    1011           1 :                 return dataBlockSize +
    1012           1 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
    1013           1 :                                 float64(r.Properties.ValueBlocksSize))
    1014           1 :         }
    1015           1 :         if endIdxIter == nil {
    1016           0 :                 // The range spans beyond this file. Include data blocks through the last.
    1017           0 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1018           0 :         }
    1019           1 :         if !endIdxIter.SeekGE(end) {
    1020           1 :                 // The range spans beyond this file. Include data blocks through the last.
    1021           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
    1022           1 :         }
    1023           1 :         endBH, err := endIdxIter.BlockHandleWithProperties()
    1024           1 :         if err != nil {
    1025           0 :                 return 0, errCorruptIndexEntry(err)
    1026           0 :         }
    1027           1 :         return includeInterpolatedValueBlocksSize(
    1028           1 :                 endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
    1029             : }
    1030             : 
    1031             : // TableFormat returns the format version for the table.
    1032           1 : func (r *Reader) TableFormat() (TableFormat, error) {
    1033           1 :         if r.err != nil {
    1034           0 :                 return TableFormatUnspecified, r.err
    1035           0 :         }
    1036           1 :         return r.tableFormat, nil
    1037             : }
    1038             : 
    1039             : // NewReader returns a new table reader for the file. Closing the reader will
    1040             : // close the file.
    1041             : //
    1042             : // The context is used for tracing any operations performed by NewReader; it is
    1043             : // NOT stored for future use.
    1044           1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
    1045           1 :         if f == nil {
    1046           1 :                 return nil, errors.New("pebble/table: nil file")
    1047           1 :         }
    1048           1 :         o = o.ensureDefaults()
    1049           1 :         r := &Reader{
    1050           1 :                 readable:             f,
    1051           1 :                 cacheOpts:            o.internal.CacheOpts,
    1052           1 :                 loadBlockSema:        o.LoadBlockSema,
    1053           1 :                 deniedUserProperties: o.DeniedUserProperties,
    1054           1 :                 filterMetricsTracker: o.FilterMetricsTracker,
    1055           1 :                 logger:               o.LoggerAndTracer,
    1056           1 :         }
    1057           1 :         if r.cacheOpts.Cache == nil {
    1058           1 :                 r.cacheOpts.Cache = cache.New(0)
    1059           1 :         } else {
    1060           1 :                 r.cacheOpts.Cache.Ref()
    1061           1 :         }
    1062           1 :         if r.cacheOpts.CacheID == 0 {
    1063           1 :                 r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
    1064           1 :         }
    1065             : 
    1066           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
    1067           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
    1068           1 :                 r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
    1069           1 :         defer rh.Close()
    1070           1 : 
    1071           1 :         footer, err := readFooter(ctx, f, rh, r.logger, r.cacheOpts.FileNum)
    1072           1 :         if err != nil {
    1073           1 :                 r.err = err
    1074           1 :                 return nil, r.Close()
    1075           1 :         }
    1076           1 :         r.checksumType = footer.checksum
    1077           1 :         r.tableFormat = footer.format
    1078           1 :         r.indexBH = footer.indexBH
    1079           1 :         r.metaindexBH = footer.metaindexBH
    1080           1 :         r.footerBH = footer.footerBH
    1081           1 :         // Read the metaindex and properties blocks.
    1082           1 :         if err := r.readMetaindex(ctx, rh, o.Filters); err != nil {
    1083           1 :                 r.err = err
    1084           1 :                 return nil, r.Close()
    1085           1 :         }
    1086             : 
    1087           1 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
    1088           1 :                 r.Comparer = o.Comparer
    1089           1 :                 r.Compare = o.Comparer.Compare
    1090           1 :                 r.Equal = o.Comparer.Equal
    1091           1 :                 r.Split = o.Comparer.Split
    1092           1 :         } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
    1093           1 :                 r.Comparer = o.Comparer
    1094           1 :                 r.Compare = comparer.Compare
    1095           1 :                 r.Equal = comparer.Equal
    1096           1 :                 r.Split = comparer.Split
    1097           1 :         } else {
    1098           1 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
    1099           1 :                         errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
    1100           1 :         }
    1101             : 
    1102           1 :         if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
    1103           1 :                 if o.Merger != nil && o.Merger.Name == mergerName {
    1104           1 :                         // opts.Merger matches.
    1105           1 :                 } else if _, ok := o.Mergers[mergerName]; ok {
    1106           1 :                         // Known merger.
    1107           1 :                 } else {
    1108           1 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
    1109           1 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
    1110           1 :                 }
    1111             :         }
    1112             : 
    1113           1 :         if r.tableFormat.BlockColumnar() {
    1114           1 :                 if ks, ok := o.KeySchemas[r.Properties.KeySchemaName]; ok {
    1115           1 :                         r.keySchema = ks
    1116           1 :                 } else {
    1117           0 :                         var known []string
    1118           0 :                         for name := range o.KeySchemas {
    1119           0 :                                 known = append(known, fmt.Sprintf("%q", name))
    1120           0 :                         }
    1121           0 :                         slices.Sort(known)
    1122           0 : 
    1123           0 :                         r.err = errors.Newf("pebble/table: %d: unknown key schema %q; known key schemas: %s",
    1124           0 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.KeySchemaName), errors.Safe(known))
    1125           0 :                         panic(r.err)
    1126             :                 }
    1127             :         }
    1128             : 
    1129           1 :         if r.err != nil {
    1130           1 :                 return nil, r.Close()
    1131           1 :         }
    1132             : 
    1133           1 :         return r, nil
    1134             : }
    1135             : 
    1136             : // ReadableFile describes the smallest subset of vfs.File that is required for
    1137             : // reading SSTs.
    1138             : type ReadableFile interface {
    1139             :         io.ReaderAt
    1140             :         io.Closer
    1141             :         Stat() (vfs.FileInfo, error)
    1142             : }
    1143             : 
    1144             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
    1145             : // implementation (which does not support read-ahead)
    1146           1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
    1147           1 :         info, err := r.Stat()
    1148           1 :         if err != nil {
    1149           1 :                 return nil, err
    1150           1 :         }
    1151           1 :         res := &simpleReadable{
    1152           1 :                 f:    r,
    1153           1 :                 size: info.Size(),
    1154           1 :         }
    1155           1 :         res.rh = objstorage.MakeNoopReadHandle(res)
    1156           1 :         return res, nil
    1157             : }
    1158             : 
    1159             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
    1160             : type simpleReadable struct {
    1161             :         f    ReadableFile
    1162             :         size int64
    1163             :         rh   objstorage.NoopReadHandle
    1164             : }
    1165             : 
    1166             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1167             : 
    1168             : // ReadAt is part of the objstorage.Readable interface.
    1169           1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1170           1 :         n, err := s.f.ReadAt(p, off)
    1171           1 :         if invariants.Enabled && err == nil && n != len(p) {
    1172           0 :                 panic("short read")
    1173             :         }
    1174           1 :         return err
    1175             : }
    1176             : 
    1177             : // Close is part of the objstorage.Readable interface.
    1178           1 : func (s *simpleReadable) Close() error {
    1179           1 :         return s.f.Close()
    1180           1 : }
    1181             : 
    1182             : // Size is part of the objstorage.Readable interface.
    1183           1 : func (s *simpleReadable) Size() int64 {
    1184           1 :         return s.size
    1185           1 : }
    1186             : 
    1187             : // NewReadHandle is part of the objstorage.Readable interface.
    1188             : func (s *simpleReadable) NewReadHandle(
    1189             :         readBeforeSize objstorage.ReadBeforeSize,
    1190           1 : ) objstorage.ReadHandle {
    1191           1 :         return &s.rh
    1192           1 : }
    1193             : 
    1194           0 : func errCorruptIndexEntry(err error) error {
    1195           0 :         err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
    1196           0 :         if invariants.Enabled {
    1197           0 :                 panic(err)
    1198             :         }
    1199           0 :         return err
    1200             : }
    1201             : 
    1202             : type deterministicStopwatchForTesting struct {
    1203             :         startTime crtime.Mono
    1204             : }
    1205             : 
    1206           1 : func makeStopwatch() deterministicStopwatchForTesting {
    1207           1 :         return deterministicStopwatchForTesting{startTime: crtime.NowMono()}
    1208           1 : }
    1209             : 
    1210           1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
    1211           1 :         dur := w.startTime.Elapsed()
    1212           1 :         if deterministicReadBlockDurationForTesting {
    1213           1 :                 dur = slowReadTracingThreshold
    1214           1 :         }
    1215           1 :         return dur
    1216             : }
    1217             : 
    1218             : // MakeTrivialReaderProvider creates a valblk.ReaderProvider which always
    1219             : // returns the given reader. It should be used when the Reader will outlive the
    1220             : // iterator tree.
    1221           1 : func MakeTrivialReaderProvider(r *Reader) valblk.ReaderProvider {
    1222           1 :         return (*trivialReaderProvider)(r)
    1223           1 : }
    1224             : 
    1225             : // trivialReaderProvider implements valblk.ReaderProvider for a Reader that will
    1226             : // outlive the top-level iterator in the iterator tree.
    1227             : //
    1228             : // Defining the type in this manner (as opposed to a struct) avoids allocation.
    1229             : type trivialReaderProvider Reader
    1230             : 
    1231             : var _ valblk.ReaderProvider = (*trivialReaderProvider)(nil)
    1232             : 
    1233             : // GetReader implements ReaderProvider.
    1234             : func (trp *trivialReaderProvider) GetReader(
    1235             :         ctx context.Context,
    1236           1 : ) (valblk.ExternalBlockReader, error) {
    1237           1 :         return (*Reader)(trp), nil
    1238           1 : }
    1239             : 
    1240             : // Close implements ReaderProvider.
    1241           1 : func (trp *trivialReaderProvider) Close() {}

Generated by: LCOV version 1.14