LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2024-08-25 08:16Z 3d1f61aa - meta test only.lcov Lines: 496 685 72.4 %
Date: 2024-08-25 08:17:46 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "encoding/binary"
      12             :         "io"
      13             :         "path/filepath"
      14             :         "runtime"
      15             :         "slices"
      16             :         "time"
      17             : 
      18             :         "github.com/cespare/xxhash/v2"
      19             :         "github.com/cockroachdb/errors"
      20             :         "github.com/cockroachdb/fifo"
      21             :         "github.com/cockroachdb/pebble/internal/base"
      22             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      23             :         "github.com/cockroachdb/pebble/internal/cache"
      24             :         "github.com/cockroachdb/pebble/internal/crc"
      25             :         "github.com/cockroachdb/pebble/internal/invariants"
      26             :         "github.com/cockroachdb/pebble/internal/keyspan"
      27             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      28             :         "github.com/cockroachdb/pebble/objstorage"
      29             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      30             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      31             :         "github.com/cockroachdb/pebble/sstable/block"
      32             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      33             :         "github.com/cockroachdb/pebble/vfs"
      34             : )
      35             : 
      36             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      37             : 
      38             : type loadBlockResult int8
      39             : 
      40             : const (
      41             :         loadBlockOK loadBlockResult = iota
      42             :         // Could be due to error or because no block left to load.
      43             :         loadBlockFailed
      44             :         loadBlockIrrelevant
      45             : )
      46             : 
      47             : type blockTransform func([]byte) ([]byte, error)
      48             : 
      49             : // Reader is a table reader.
      50             : type Reader struct {
      51             :         readable objstorage.Readable
      52             : 
      53             :         // The following fields are copied from the ReadOptions.
      54             :         cacheOpts            sstableinternal.CacheOptions
      55             :         loadBlockSema        *fifo.Semaphore
      56             :         deniedUserProperties map[string]struct{}
      57             :         filterMetricsTracker *FilterMetricsTracker
      58             :         logger               base.LoggerAndTracer
      59             : 
      60             :         Comparer  *base.Comparer
      61             :         Compare   Compare
      62             :         SuffixCmp CompareSuffixes
      63             :         Equal     Equal
      64             :         Split     Split
      65             : 
      66             :         tableFilter *tableFilterReader
      67             : 
      68             :         err error
      69             : 
      70             :         indexBH      block.Handle
      71             :         filterBH     block.Handle
      72             :         rangeDelBH   block.Handle
      73             :         rangeKeyBH   block.Handle
      74             :         valueBIH     valueBlocksIndexHandle
      75             :         propertiesBH block.Handle
      76             :         metaIndexBH  block.Handle
      77             :         footerBH     block.Handle
      78             : 
      79             :         Properties   Properties
      80             :         tableFormat  TableFormat
      81             :         checksumType block.ChecksumType
      82             : 
      83             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
      84             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
      85             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
      86             :         // capacity 3 to accommodate the meta block (1), and both the compressed
      87             :         // properties block (1) and decompressed properties block (1)
      88             :         // simultaneously.
      89             :         metaBufferPool      block.BufferPool
      90             :         metaBufferPoolAlloc [3]block.AllocedBuffer
      91             : }
      92             : 
      93             : var _ CommonReader = (*Reader)(nil)
      94             : 
      95             : // Close the reader and the underlying objstorage.Readable.
      96           1 : func (r *Reader) Close() error {
      97           1 :         r.cacheOpts.Cache.Unref()
      98           1 : 
      99           1 :         if r.readable != nil {
     100           1 :                 r.err = firstError(r.err, r.readable.Close())
     101           1 :                 r.readable = nil
     102           1 :         }
     103             : 
     104           1 :         if r.err != nil {
     105           0 :                 return r.err
     106           0 :         }
     107             :         // Make any future calls to Get, NewIter or Close return an error.
     108           1 :         r.err = errReaderClosed
     109           1 :         return nil
     110             : }
     111             : 
     112             : // NewPointIter returns an iterator for the point keys in the table.
     113             : //
     114             : // If transform.HideObsoletePoints is set, the callee assumes that filterer
     115             : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
     116             : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     117             : func (r *Reader) NewPointIter(
     118             :         ctx context.Context,
     119             :         transforms IterTransforms,
     120             :         lower, upper []byte,
     121             :         filterer *BlockPropertiesFilterer,
     122             :         filterBlockSizeLimit FilterBlockSizeLimit,
     123             :         stats *base.InternalIteratorStats,
     124             :         categoryAndQoS CategoryAndQoS,
     125             :         statsCollector *CategoryStatsCollector,
     126             :         rp ReaderProvider,
     127           1 : ) (Iterator, error) {
     128           1 :         return r.newPointIter(
     129           1 :                 ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
     130           1 :                 stats, categoryAndQoS, statsCollector, rp, nil)
     131           1 : }
     132             : 
     133             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     134             : // before the call to NewPointIter, to get the value of hideObsoletePoints and
     135             : // potentially add a block property filter.
     136             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     137             :         snapshotForHideObsoletePoints base.SeqNum,
     138             :         fileLargestSeqNum base.SeqNum,
     139             :         pointKeyFilters []BlockPropertyFilter,
     140           1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     141           1 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     142           1 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     143           1 :         if hideObsoletePoints {
     144           1 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     145           1 :         }
     146           1 :         return hideObsoletePoints, pointKeyFilters
     147             : }
     148             : 
     149             : func (r *Reader) newPointIter(
     150             :         ctx context.Context,
     151             :         transforms IterTransforms,
     152             :         lower, upper []byte,
     153             :         filterer *BlockPropertiesFilterer,
     154             :         filterBlockSizeLimit FilterBlockSizeLimit,
     155             :         stats *base.InternalIteratorStats,
     156             :         categoryAndQoS CategoryAndQoS,
     157             :         statsCollector *CategoryStatsCollector,
     158             :         rp ReaderProvider,
     159             :         vState *virtualState,
     160           1 : ) (Iterator, error) {
     161           1 :         // NB: pebble.tableCache wraps the returned iterator with one which performs
     162           1 :         // reference counting on the Reader, preventing the Reader from being closed
     163           1 :         // until the final iterator closes.
     164           1 :         var res Iterator
     165           1 :         var err error
     166           1 :         if r.Properties.IndexType == twoLevelIndex {
     167           1 :                 res, err = newRowBlockTwoLevelIterator(
     168           1 :                         ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     169           1 :                         stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     170           1 :         } else {
     171           1 :                 res, err = newRowBlockSingleLevelIterator(
     172           1 :                         ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     173           1 :                         stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     174           1 :         }
     175           1 :         if err != nil {
     176           0 :                 // Note: we don't want to return res here - it will be a nil
     177           0 :                 // single/twoLevelIterator, not a nil Iterator.
     178           0 :                 return nil, err
     179           0 :         }
     180           1 :         return res, nil
     181             : }
     182             : 
     183             : // NewIter returns an iterator for the point keys in the table. It is a
     184             : // simplified version of NewPointIter and should only be used for tests and
     185             : // tooling.
     186             : //
     187             : // NewIter must only be used when the Reader is guaranteed to outlive any
     188             : // LazyValues returned from the iter.
     189           1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
     190           1 :         // TODO(radu): we should probably not use bloom filters in this case, as there
     191           1 :         // likely isn't a cache set up.
     192           1 :         return r.NewPointIter(
     193           1 :                 context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock,
     194           1 :                 nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, MakeTrivialReaderProvider(r))
     195           1 : }
     196             : 
     197             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     198             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     199             : // after itself and returns a nil iterator.
     200             : func (r *Reader) NewCompactionIter(
     201             :         transforms IterTransforms,
     202             :         categoryAndQoS CategoryAndQoS,
     203             :         statsCollector *CategoryStatsCollector,
     204             :         rp ReaderProvider,
     205             :         bufferPool *block.BufferPool,
     206           1 : ) (Iterator, error) {
     207           1 :         return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool)
     208           1 : }
     209             : 
     210             : func (r *Reader) newCompactionIter(
     211             :         transforms IterTransforms,
     212             :         categoryAndQoS CategoryAndQoS,
     213             :         statsCollector *CategoryStatsCollector,
     214             :         rp ReaderProvider,
     215             :         vState *virtualState,
     216             :         bufferPool *block.BufferPool,
     217           1 : ) (Iterator, error) {
     218           1 :         if vState != nil && vState.isSharedIngested {
     219           1 :                 transforms.HideObsoletePoints = true
     220           1 :         }
     221           1 :         if r.Properties.IndexType == twoLevelIndex {
     222           1 :                 i, err := newRowBlockTwoLevelIterator(
     223           1 :                         context.Background(),
     224           1 :                         r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     225           1 :                         NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     226           1 :                 )
     227           1 :                 if err != nil {
     228           0 :                         return nil, err
     229           0 :                 }
     230           1 :                 i.SetupForCompaction()
     231           1 :                 return i, nil
     232             :         }
     233           1 :         i, err := newRowBlockSingleLevelIterator(
     234           1 :                 context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     235           1 :                 nil, NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     236           1 :         )
     237           1 :         if err != nil {
     238           0 :                 return nil, err
     239           0 :         }
     240           1 :         i.SetupForCompaction()
     241           1 :         return i, nil
     242             : }
     243             : 
     244             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     245             : // range-del block for the table. Returns nil if the table does not contain
     246             : // any range deletions.
     247             : func (r *Reader) NewRawRangeDelIter(
     248             :         ctx context.Context, transforms FragmentIterTransforms,
     249           1 : ) (keyspan.FragmentIterator, error) {
     250           1 :         if r.rangeDelBH.Length == 0 {
     251           1 :                 return nil, nil
     252           1 :         }
     253           1 :         h, err := r.readRangeDel(ctx, nil /* stats */, nil /* iterStats */)
     254           1 :         if err != nil {
     255           0 :                 return nil, err
     256           0 :         }
     257           1 :         transforms.ElideSameSeqNum = true
     258           1 :         i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
     259           1 :         if err != nil {
     260           0 :                 return nil, err
     261           0 :         }
     262           1 :         return keyspan.MaybeAssert(i, r.Compare), nil
     263             : }
     264             : 
     265             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     266             : // range-key block for the table. Returns nil if the table does not contain any
     267             : // range keys.
     268             : func (r *Reader) NewRawRangeKeyIter(
     269             :         ctx context.Context, transforms FragmentIterTransforms,
     270           1 : ) (keyspan.FragmentIterator, error) {
     271           1 :         if r.rangeKeyBH.Length == 0 {
     272           1 :                 return nil, nil
     273           1 :         }
     274           1 :         h, err := r.readRangeKey(ctx, nil /* stats */, nil /* iterStats */)
     275           1 :         if err != nil {
     276           0 :                 return nil, err
     277           0 :         }
     278           1 :         i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
     279           1 :         if err != nil {
     280           0 :                 return nil, err
     281           0 :         }
     282           1 :         return keyspan.MaybeAssert(i, r.Compare), nil
     283             : }
     284             : 
     285             : func (r *Reader) readIndex(
     286             :         ctx context.Context,
     287             :         readHandle objstorage.ReadHandle,
     288             :         stats *base.InternalIteratorStats,
     289             :         iterStats *iterStatsAccumulator,
     290           1 : ) (block.BufferHandle, error) {
     291           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     292           1 :         return r.readBlock(ctx, r.indexBH, nil, readHandle, stats, iterStats, nil /* buffer pool */)
     293           1 : }
     294             : 
     295             : func (r *Reader) readFilter(
     296             :         ctx context.Context,
     297             :         readHandle objstorage.ReadHandle,
     298             :         stats *base.InternalIteratorStats,
     299             :         iterStats *iterStatsAccumulator,
     300           1 : ) (block.BufferHandle, error) {
     301           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     302           1 :         return r.readBlock(ctx, r.filterBH, nil /* transform */, readHandle, stats, iterStats, nil /* buffer pool */)
     303           1 : }
     304             : 
     305             : func (r *Reader) readRangeDel(
     306             :         ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     307           1 : ) (block.BufferHandle, error) {
     308           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     309           1 :         return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     310           1 : }
     311             : 
     312             : func (r *Reader) readRangeKey(
     313             :         ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     314           1 : ) (block.BufferHandle, error) {
     315           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     316           1 :         return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     317           1 : }
     318             : 
     319             : func checkChecksum(
     320             :         checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
     321           1 : ) error {
     322           1 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     323           1 :         var computedChecksum uint32
     324           1 :         switch checksumType {
     325           1 :         case block.ChecksumTypeCRC32c:
     326           1 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     327           0 :         case block.ChecksumTypeXXHash64:
     328           0 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     329           0 :         default:
     330           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     331             :         }
     332             : 
     333           1 :         if expectedChecksum != computedChecksum {
     334           0 :                 return base.CorruptionErrorf(
     335           0 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     336           0 :                         fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
     337           0 :         }
     338           1 :         return nil
     339             : }
     340             : 
     341             : // DeterministicReadBlockDurationForTesting is for tests that want a
     342             : // deterministic value of the time to read a block (that is not in the cache).
     343             : // The return value is a function that must be called before the test exits.
     344           0 : func DeterministicReadBlockDurationForTesting() func() {
     345           0 :         drbdForTesting := deterministicReadBlockDurationForTesting
     346           0 :         deterministicReadBlockDurationForTesting = true
     347           0 :         return func() {
     348           0 :                 deterministicReadBlockDurationForTesting = drbdForTesting
     349           0 :         }
     350             : }
     351             : 
     352             : var deterministicReadBlockDurationForTesting = false
     353             : 
     354             : func (r *Reader) readBlock(
     355             :         ctx context.Context,
     356             :         bh block.Handle,
     357             :         transform blockTransform,
     358             :         readHandle objstorage.ReadHandle,
     359             :         stats *base.InternalIteratorStats,
     360             :         iterStats *iterStatsAccumulator,
     361             :         bufferPool *block.BufferPool,
     362           1 : ) (handle block.BufferHandle, _ error) {
     363           1 :         if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Get() != nil {
     364           1 :                 // Cache hit.
     365           1 :                 if readHandle != nil {
     366           1 :                         readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
     367           1 :                 }
     368           1 :                 if stats != nil {
     369           1 :                         stats.BlockBytes += bh.Length
     370           1 :                         stats.BlockBytesInCache += bh.Length
     371           1 :                 }
     372           1 :                 if iterStats != nil {
     373           1 :                         iterStats.reportStats(bh.Length, bh.Length, 0)
     374           1 :                 }
     375             :                 // This block is already in the cache; return a handle to existing vlaue
     376             :                 // in the cache.
     377           1 :                 return block.CacheBufferHandle(h), nil
     378             :         }
     379             : 
     380             :         // Cache miss.
     381             : 
     382           1 :         if sema := r.loadBlockSema; sema != nil {
     383           0 :                 if err := sema.Acquire(ctx, 1); err != nil {
     384           0 :                         // An error here can only come from the context.
     385           0 :                         return block.BufferHandle{}, err
     386           0 :                 }
     387           0 :                 defer sema.Release(1)
     388             :         }
     389             : 
     390           1 :         compressed := block.Alloc(int(bh.Length+block.TrailerLen), bufferPool)
     391           1 :         readStopwatch := makeStopwatch()
     392           1 :         var err error
     393           1 :         if readHandle != nil {
     394           1 :                 err = readHandle.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
     395           1 :         } else {
     396           1 :                 err = r.readable.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
     397           1 :         }
     398           1 :         readDuration := readStopwatch.stop()
     399           1 :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     400           1 :         // interface{}, unless necessary.
     401           1 :         if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
     402           0 :                 _, file1, line1, _ := runtime.Caller(1)
     403           0 :                 _, file2, line2, _ := runtime.Caller(2)
     404           0 :                 r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
     405           0 :                         int(bh.Length+block.TrailerLen), readDuration.String(),
     406           0 :                         r.cacheOpts.FileNum,
     407           0 :                         filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
     408           0 :                         filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
     409           0 :         }
     410           1 :         if stats != nil {
     411           1 :                 stats.BlockBytes += bh.Length
     412           1 :                 stats.BlockReadDuration += readDuration
     413           1 :         }
     414           1 :         if err != nil {
     415           0 :                 compressed.Release()
     416           0 :                 return block.BufferHandle{}, err
     417           0 :         }
     418           1 :         if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.cacheOpts.FileNum); err != nil {
     419           0 :                 compressed.Release()
     420           0 :                 return block.BufferHandle{}, err
     421           0 :         }
     422             : 
     423           1 :         typ := block.CompressionIndicator(compressed.Get()[bh.Length])
     424           1 :         compressed.Truncate(int(bh.Length))
     425           1 : 
     426           1 :         var decompressed block.Value
     427           1 :         if typ == block.NoCompressionIndicator {
     428           1 :                 decompressed = compressed
     429           1 :         } else {
     430           1 :                 // Decode the length of the decompressed value.
     431           1 :                 decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.Get())
     432           1 :                 if err != nil {
     433           0 :                         compressed.Release()
     434           0 :                         return block.BufferHandle{}, err
     435           0 :                 }
     436             : 
     437           1 :                 decompressed = block.Alloc(decodedLen, bufferPool)
     438           1 :                 if err := block.DecompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
     439           0 :                         compressed.Release()
     440           0 :                         return block.BufferHandle{}, err
     441           0 :                 }
     442           1 :                 compressed.Release()
     443             :         }
     444             : 
     445           1 :         if transform != nil {
     446           0 :                 // Transforming blocks is very rare, so the extra copy of the
     447           0 :                 // transformed data is not problematic.
     448           0 :                 tmpTransformed, err := transform(decompressed.Get())
     449           0 :                 if err != nil {
     450           0 :                         decompressed.Release()
     451           0 :                         return block.BufferHandle{}, err
     452           0 :                 }
     453             : 
     454           0 :                 transformed := block.Alloc(len(tmpTransformed), bufferPool)
     455           0 :                 copy(transformed.Get(), tmpTransformed)
     456           0 :                 decompressed.Release()
     457           0 :                 decompressed = transformed
     458             :         }
     459             : 
     460           1 :         if iterStats != nil {
     461           1 :                 iterStats.reportStats(bh.Length, 0, readDuration)
     462           1 :         }
     463           1 :         h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     464           1 :         return h, nil
     465             : }
     466             : 
     467             : func (r *Reader) readMetaindex(
     468             :         ctx context.Context,
     469             :         metaindexBH block.Handle,
     470             :         readHandle objstorage.ReadHandle,
     471             :         filters map[string]FilterPolicy,
     472           1 : ) error {
     473           1 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     474           1 :         // populating the block cache with these blocks. In heavy-write workloads,
     475           1 :         // especially with high compaction concurrency, new tables may be created
     476           1 :         // frequently. Populating the block cache with these metaindex blocks adds
     477           1 :         // additional contention on the block cache mutexes (see #1997).
     478           1 :         // Additionally, these blocks are exceedingly unlikely to be read again
     479           1 :         // while they're still in the block cache except in misconfigurations with
     480           1 :         // excessive sstables counts or a table cache that's far too small.
     481           1 :         r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
     482           1 :         // When we're finished, release the buffers we've allocated back to memory
     483           1 :         // allocator. We don't expect to use metaBufferPool again.
     484           1 :         defer r.metaBufferPool.Release()
     485           1 : 
     486           1 :         b, err := r.readBlock(
     487           1 :                 ctx, metaindexBH, nil /* transform */, readHandle, nil, /* stats */
     488           1 :                 nil /* iterStats */, &r.metaBufferPool)
     489           1 :         if err != nil {
     490           0 :                 return err
     491           0 :         }
     492           1 :         data := b.Get()
     493           1 :         defer b.Release()
     494           1 : 
     495           1 :         if uint64(len(data)) != metaindexBH.Length {
     496           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     497           0 :                         errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
     498           0 :         }
     499             : 
     500           1 :         i, err := rowblk.NewRawIter(bytes.Compare, data)
     501           1 :         if err != nil {
     502           0 :                 return err
     503           0 :         }
     504             : 
     505           1 :         meta := map[string]block.Handle{}
     506           1 :         for valid := i.First(); valid; valid = i.Next() {
     507           1 :                 value := i.Value()
     508           1 :                 if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
     509           1 :                         vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
     510           1 :                         if err != nil {
     511           0 :                                 return err
     512           0 :                         }
     513           1 :                         if n == 0 || n != len(value) {
     514           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
     515           0 :                         }
     516           1 :                         r.valueBIH = vbih
     517           1 :                 } else {
     518           1 :                         bh, n := block.DecodeHandle(value)
     519           1 :                         if n == 0 || n != len(value) {
     520           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
     521           0 :                         }
     522           1 :                         meta[string(i.Key().UserKey)] = bh
     523             :                 }
     524             :         }
     525           1 :         if err := i.Close(); err != nil {
     526           0 :                 return err
     527           0 :         }
     528             : 
     529           1 :         if bh, ok := meta[metaPropertiesName]; ok {
     530           1 :                 b, err = r.readBlock(
     531           1 :                         ctx, bh, nil /* transform */, readHandle, nil, /* stats */
     532           1 :                         nil /* iterStats */, nil /* buffer pool */)
     533           1 :                 if err != nil {
     534           0 :                         return err
     535           0 :                 }
     536           1 :                 r.propertiesBH = bh
     537           1 :                 err := r.Properties.load(b.Get(), r.deniedUserProperties)
     538           1 :                 b.Release()
     539           1 :                 if err != nil {
     540           0 :                         return err
     541           0 :                 }
     542             :         }
     543             : 
     544           1 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     545           1 :                 r.rangeDelBH = bh
     546           1 :         } else if _, ok := meta[metaRangeDelV1Name]; ok {
     547           0 :                 // This version of Pebble requires a format major version at least as
     548           0 :                 // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
     549           0 :                 // this format major verison, we have a guarantee that we've compacted
     550           0 :                 // away all RocksDB sstables. It should not be possible to encounter an
     551           0 :                 // sstable with a v1 range deletion block but not a v2 range deletion
     552           0 :                 // block.
     553           0 :                 err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
     554           0 :                 return errors.Mark(err, base.ErrCorruption)
     555           0 :         }
     556             : 
     557           1 :         if bh, ok := meta[metaRangeKeyName]; ok {
     558           1 :                 r.rangeKeyBH = bh
     559           1 :         }
     560             : 
     561           1 :         for name, fp := range filters {
     562           1 :                 types := []struct {
     563           1 :                         ftype  FilterType
     564           1 :                         prefix string
     565           1 :                 }{
     566           1 :                         {TableFilter, "fullfilter."},
     567           1 :                 }
     568           1 :                 var done bool
     569           1 :                 for _, t := range types {
     570           1 :                         if bh, ok := meta[t.prefix+name]; ok {
     571           1 :                                 r.filterBH = bh
     572           1 : 
     573           1 :                                 switch t.ftype {
     574           1 :                                 case TableFilter:
     575           1 :                                         r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
     576           0 :                                 default:
     577           0 :                                         return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
     578             :                                 }
     579             : 
     580           1 :                                 done = true
     581           1 :                                 break
     582             :                         }
     583             :                 }
     584           1 :                 if done {
     585           1 :                         break
     586             :                 }
     587             :         }
     588           1 :         return nil
     589             : }
     590             : 
     591             : // Layout returns the layout (block organization) for an sstable.
     592           1 : func (r *Reader) Layout() (*Layout, error) {
     593           1 :         if r.err != nil {
     594           0 :                 return nil, r.err
     595           0 :         }
     596             : 
     597           1 :         l := &Layout{
     598           1 :                 Data:       make([]block.HandleWithProperties, 0, r.Properties.NumDataBlocks),
     599           1 :                 Filter:     r.filterBH,
     600           1 :                 RangeDel:   r.rangeDelBH,
     601           1 :                 RangeKey:   r.rangeKeyBH,
     602           1 :                 ValueIndex: r.valueBIH.h,
     603           1 :                 Properties: r.propertiesBH,
     604           1 :                 MetaIndex:  r.metaIndexBH,
     605           1 :                 Footer:     r.footerBH,
     606           1 :                 Format:     r.tableFormat,
     607           1 :         }
     608           1 : 
     609           1 :         indexH, err := r.readIndex(context.Background(), nil, nil, nil)
     610           1 :         if err != nil {
     611           0 :                 return nil, err
     612           0 :         }
     613           1 :         defer indexH.Release()
     614           1 : 
     615           1 :         var alloc bytealloc.A
     616           1 : 
     617           1 :         if r.Properties.IndexPartitions == 0 {
     618           1 :                 l.Index = append(l.Index, r.indexBH)
     619           1 :                 iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     620           1 :                 for kv := iter.First(); kv != nil; kv = iter.Next() {
     621           1 :                         dataBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     622           1 :                         if err != nil {
     623           0 :                                 return nil, errCorruptIndexEntry(err)
     624           0 :                         }
     625           1 :                         if len(dataBH.Props) > 0 {
     626           1 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     627           1 :                         }
     628           1 :                         l.Data = append(l.Data, dataBH)
     629             :                 }
     630           1 :         } else {
     631           1 :                 l.TopIndex = r.indexBH
     632           1 :                 topIter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     633           1 :                 iter := &rowblk.Iter{}
     634           1 :                 for kv := topIter.First(); kv != nil; kv = topIter.Next() {
     635           1 :                         indexBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     636           1 :                         if err != nil {
     637           0 :                                 return nil, errCorruptIndexEntry(err)
     638           0 :                         }
     639           1 :                         l.Index = append(l.Index, indexBH.Handle)
     640           1 : 
     641           1 :                         subIndex, err := r.readBlock(context.Background(), indexBH.Handle,
     642           1 :                                 nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     643           1 :                         if err != nil {
     644           0 :                                 return nil, err
     645           0 :                         }
     646             :                         // TODO(msbutler): figure out how to pass virtualState to layout call.
     647           1 :                         if err := iter.Init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
     648           0 :                                 return nil, err
     649           0 :                         }
     650           1 :                         for kv := iter.First(); kv != nil; kv = iter.Next() {
     651           1 :                                 dataBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     652           1 :                                 if len(dataBH.Props) > 0 {
     653           1 :                                         alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     654           1 :                                 }
     655           1 :                                 if err != nil {
     656           0 :                                         return nil, errCorruptIndexEntry(err)
     657           0 :                                 }
     658           1 :                                 l.Data = append(l.Data, dataBH)
     659             :                         }
     660           1 :                         subIndex.Release()
     661           1 :                         *iter = iter.ResetForReuse()
     662             :                 }
     663             :         }
     664           1 :         if r.valueBIH.h.Length != 0 {
     665           1 :                 vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
     666           1 :                 if err != nil {
     667           0 :                         return nil, err
     668           0 :                 }
     669           1 :                 defer vbiH.Release()
     670           1 :                 vbiBlock := vbiH.Get()
     671           1 :                 indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
     672           1 :                         r.valueBIH.blockLengthByteLength)
     673           1 :                 i := 0
     674           1 :                 for len(vbiBlock) != 0 {
     675           1 :                         if len(vbiBlock) < indexEntryLen {
     676           0 :                                 return nil, errors.Errorf(
     677           0 :                                         "remaining value index block %d does not contain a full entry of length %d",
     678           0 :                                         len(vbiBlock), indexEntryLen)
     679           0 :                         }
     680           1 :                         n := int(r.valueBIH.blockNumByteLength)
     681           1 :                         bn := int(littleEndianGet(vbiBlock, n))
     682           1 :                         if bn != i {
     683           0 :                                 return nil, errors.Errorf("unexpected block num %d, expected %d",
     684           0 :                                         bn, i)
     685           0 :                         }
     686           1 :                         i++
     687           1 :                         vbiBlock = vbiBlock[n:]
     688           1 :                         n = int(r.valueBIH.blockOffsetByteLength)
     689           1 :                         blockOffset := littleEndianGet(vbiBlock, n)
     690           1 :                         vbiBlock = vbiBlock[n:]
     691           1 :                         n = int(r.valueBIH.blockLengthByteLength)
     692           1 :                         blockLen := littleEndianGet(vbiBlock, n)
     693           1 :                         vbiBlock = vbiBlock[n:]
     694           1 :                         l.ValueBlock = append(l.ValueBlock, block.Handle{Offset: blockOffset, Length: blockLen})
     695             :                 }
     696             :         }
     697             : 
     698           1 :         return l, nil
     699             : }
     700             : 
     701             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
     702           1 : func (r *Reader) ValidateBlockChecksums() error {
     703           1 :         // Pre-compute the BlockHandles for the underlying file.
     704           1 :         l, err := r.Layout()
     705           1 :         if err != nil {
     706           0 :                 return err
     707           0 :         }
     708             : 
     709             :         // Construct the set of blocks to check. Note that the footer is not checked
     710             :         // as it is not a block with a checksum.
     711           1 :         blocks := make([]block.Handle, len(l.Data))
     712           1 :         for i := range l.Data {
     713           1 :                 blocks[i] = l.Data[i].Handle
     714           1 :         }
     715           1 :         blocks = append(blocks, l.Index...)
     716           1 :         blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
     717           1 : 
     718           1 :         // Sorting by offset ensures we are performing a sequential scan of the
     719           1 :         // file.
     720           1 :         slices.SortFunc(blocks, func(a, b block.Handle) int {
     721           1 :                 return cmp.Compare(a.Offset, b.Offset)
     722           1 :         })
     723             : 
     724             :         // Check all blocks sequentially. Make use of read-ahead, given we are
     725             :         // scanning the entire file from start to end.
     726           1 :         rh := r.readable.NewReadHandle(objstorage.NoReadBefore)
     727           1 :         defer rh.Close()
     728           1 : 
     729           1 :         for _, bh := range blocks {
     730           1 :                 // Certain blocks may not be present, in which case we skip them.
     731           1 :                 if bh.Length == 0 {
     732           1 :                         continue
     733             :                 }
     734             : 
     735             :                 // Read the block, which validates the checksum.
     736           1 :                 h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
     737           1 :                 if err != nil {
     738           0 :                         return err
     739           0 :                 }
     740           1 :                 h.Release()
     741             :         }
     742             : 
     743           1 :         return nil
     744             : }
     745             : 
     746             : // CommonProperties implemented the CommonReader interface.
     747           1 : func (r *Reader) CommonProperties() *CommonProperties {
     748           1 :         return &r.Properties.CommonProperties
     749           1 : }
     750             : 
     751             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
     752             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
     753             : // determine overlap due to abbreviated index keys, the full data block size is
     754             : // included in the estimation.
     755             : //
     756             : // This function does not account for any metablock space usage. Assumes there
     757             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
     758             : // before nor completely after the file's range.
     759             : //
     760             : // Only blocks containing point keys are considered. Range deletion and range
     761             : // key blocks are not considered.
     762             : //
     763             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
     764             : // data blocks overlapped and add that same fraction of the metadata blocks to the
     765             : // estimate.
     766           1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
     767           1 :         if r.err != nil {
     768           0 :                 return 0, r.err
     769           0 :         }
     770             : 
     771           1 :         indexH, err := r.readIndex(context.Background(), nil, nil, nil)
     772           1 :         if err != nil {
     773           0 :                 return 0, err
     774           0 :         }
     775           1 :         defer indexH.Release()
     776           1 : 
     777           1 :         // Iterators over the bottom-level index blocks containing start and end.
     778           1 :         // These may be different in case of partitioned index but will both point
     779           1 :         // to the same blockIter over the single index in the unpartitioned case.
     780           1 :         var startIdxIter, endIdxIter *rowblk.Iter
     781           1 :         if r.Properties.IndexPartitions == 0 {
     782           1 :                 iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     783           1 :                 if err != nil {
     784           0 :                         return 0, err
     785           0 :                 }
     786           1 :                 startIdxIter = iter
     787           1 :                 endIdxIter = iter
     788           1 :         } else {
     789           1 :                 topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     790           1 :                 if err != nil {
     791           0 :                         return 0, err
     792           0 :                 }
     793             : 
     794           1 :                 kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
     795           1 :                 if kv == nil {
     796           1 :                         // The range falls completely after this file, or an error occurred.
     797           1 :                         return 0, topIter.Error()
     798           1 :                 }
     799           1 :                 startIdxBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     800           1 :                 if err != nil {
     801           0 :                         return 0, errCorruptIndexEntry(err)
     802           0 :                 }
     803           1 :                 startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
     804           1 :                         nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     805           1 :                 if err != nil {
     806           0 :                         return 0, err
     807           0 :                 }
     808           1 :                 defer startIdxBlock.Release()
     809           1 :                 startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
     810           1 :                 if err != nil {
     811           0 :                         return 0, err
     812           0 :                 }
     813             : 
     814           1 :                 kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
     815           1 :                 if kv == nil {
     816           1 :                         if err := topIter.Error(); err != nil {
     817           0 :                                 return 0, err
     818           0 :                         }
     819           1 :                 } else {
     820           1 :                         endIdxBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     821           1 :                         if err != nil {
     822           0 :                                 return 0, errCorruptIndexEntry(err)
     823           0 :                         }
     824           1 :                         endIdxBlock, err := r.readBlock(context.Background(),
     825           1 :                                 endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     826           1 :                         if err != nil {
     827           0 :                                 return 0, err
     828           0 :                         }
     829           1 :                         defer endIdxBlock.Release()
     830           1 :                         endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
     831           1 :                         if err != nil {
     832           0 :                                 return 0, err
     833           0 :                         }
     834             :                 }
     835             :         }
     836             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
     837             :         // range spans past the end of the file.
     838             : 
     839           1 :         kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
     840           1 :         if kv == nil {
     841           1 :                 // The range falls completely after this file, or an error occurred.
     842           1 :                 return 0, startIdxIter.Error()
     843           1 :         }
     844           1 :         startBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     845           1 :         if err != nil {
     846           0 :                 return 0, errCorruptIndexEntry(err)
     847           0 :         }
     848             : 
     849           1 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
     850           1 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
     851           1 :                 // Linearly interpolate what is stored in value blocks.
     852           1 :                 //
     853           1 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
     854           1 :                 // (which contain the value handles, and which may also be insufficient if
     855           1 :                 // the values are in separate files), we will need to accumulate the
     856           1 :                 // logical size of the key-value pairs and store the cumulative value for
     857           1 :                 // each data block in the index block entry. This increases the size of
     858           1 :                 // the BlockHandle, so wait until this becomes necessary.
     859           1 :                 return dataBlockSize +
     860           1 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
     861           1 :                                 float64(r.Properties.ValueBlocksSize))
     862           1 :         }
     863           1 :         if endIdxIter == nil {
     864           1 :                 // The range spans beyond this file. Include data blocks through the last.
     865           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
     866           1 :         }
     867           1 :         kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
     868           1 :         if kv == nil {
     869           1 :                 if err := endIdxIter.Error(); err != nil {
     870           0 :                         return 0, err
     871           0 :                 }
     872             :                 // The range spans beyond this file. Include data blocks through the last.
     873           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
     874             :         }
     875           1 :         endBH, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     876           1 :         if err != nil {
     877           0 :                 return 0, errCorruptIndexEntry(err)
     878           0 :         }
     879           1 :         return includeInterpolatedValueBlocksSize(
     880           1 :                 endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
     881             : }
     882             : 
     883             : // TableFormat returns the format version for the table.
     884           1 : func (r *Reader) TableFormat() (TableFormat, error) {
     885           1 :         if r.err != nil {
     886           0 :                 return TableFormatUnspecified, r.err
     887           0 :         }
     888           1 :         return r.tableFormat, nil
     889             : }
     890             : 
     891             : // NewReader returns a new table reader for the file. Closing the reader will
     892             : // close the file.
     893             : //
     894             : // The context is used for tracing any operations performed by NewReader; it is
     895             : // NOT stored for future use.
     896           1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
     897           1 :         if f == nil {
     898           0 :                 return nil, errors.New("pebble/table: nil file")
     899           0 :         }
     900           1 :         o = o.ensureDefaults()
     901           1 :         r := &Reader{
     902           1 :                 readable:             f,
     903           1 :                 cacheOpts:            o.internal.CacheOpts,
     904           1 :                 loadBlockSema:        o.LoadBlockSema,
     905           1 :                 deniedUserProperties: o.DeniedUserProperties,
     906           1 :                 filterMetricsTracker: o.FilterMetricsTracker,
     907           1 :                 logger:               o.LoggerAndTracer,
     908           1 :         }
     909           1 :         if r.cacheOpts.Cache == nil {
     910           1 :                 r.cacheOpts.Cache = cache.New(0)
     911           1 :         } else {
     912           1 :                 r.cacheOpts.Cache.Ref()
     913           1 :         }
     914           1 :         if r.cacheOpts.CacheID == 0 {
     915           1 :                 r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
     916           1 :         }
     917             : 
     918           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
     919           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
     920           1 :                 r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
     921           1 :         defer rh.Close()
     922           1 : 
     923           1 :         footer, err := readFooter(ctx, f, rh, r.logger, r.cacheOpts.FileNum)
     924           1 :         if err != nil {
     925           0 :                 r.err = err
     926           0 :                 return nil, r.Close()
     927           0 :         }
     928           1 :         r.checksumType = footer.checksum
     929           1 :         r.tableFormat = footer.format
     930           1 :         // Read the metaindex and properties blocks.
     931           1 :         if err := r.readMetaindex(ctx, footer.metaindexBH, rh, o.Filters); err != nil {
     932           0 :                 r.err = err
     933           0 :                 return nil, r.Close()
     934           0 :         }
     935           1 :         r.indexBH = footer.indexBH
     936           1 :         r.metaIndexBH = footer.metaindexBH
     937           1 :         r.footerBH = footer.footerBH
     938           1 : 
     939           1 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
     940           1 :                 r.Comparer = o.Comparer
     941           1 :                 r.Compare = o.Comparer.Compare
     942           1 :                 r.SuffixCmp = o.Comparer.CompareSuffixes
     943           1 :                 r.Equal = o.Comparer.Equal
     944           1 :                 r.Split = o.Comparer.Split
     945           1 :         } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
     946           0 :                 r.Comparer = o.Comparer
     947           0 :                 r.Compare = comparer.Compare
     948           0 :                 r.SuffixCmp = comparer.CompareSuffixes
     949           0 :                 r.Equal = comparer.Equal
     950           0 :                 r.Split = comparer.Split
     951           0 :         } else {
     952           0 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
     953           0 :                         errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
     954           0 :         }
     955             : 
     956           1 :         if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
     957           1 :                 if o.Merger != nil && o.Merger.Name == mergerName {
     958           1 :                         // opts.Merger matches.
     959           1 :                 } else if _, ok := o.Mergers[mergerName]; ok {
     960           0 :                         // Known merger.
     961           0 :                 } else {
     962           0 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
     963           0 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
     964           0 :                 }
     965             :         }
     966             : 
     967           1 :         if r.err != nil {
     968           0 :                 return nil, r.Close()
     969           0 :         }
     970             : 
     971           1 :         return r, nil
     972             : }
     973             : 
     974             : // ReadableFile describes the smallest subset of vfs.File that is required for
     975             : // reading SSTs.
     976             : type ReadableFile interface {
     977             :         io.ReaderAt
     978             :         io.Closer
     979             :         Stat() (vfs.FileInfo, error)
     980             : }
     981             : 
     982             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
     983             : // implementation (which does not support read-ahead)
     984           1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
     985           1 :         info, err := r.Stat()
     986           1 :         if err != nil {
     987           0 :                 return nil, err
     988           0 :         }
     989           1 :         res := &simpleReadable{
     990           1 :                 f:    r,
     991           1 :                 size: info.Size(),
     992           1 :         }
     993           1 :         res.rh = objstorage.MakeNoopReadHandle(res)
     994           1 :         return res, nil
     995             : }
     996             : 
     997             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
     998             : type simpleReadable struct {
     999             :         f    ReadableFile
    1000             :         size int64
    1001             :         rh   objstorage.NoopReadHandle
    1002             : }
    1003             : 
    1004             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1005             : 
    1006             : // ReadAt is part of the objstorage.Readable interface.
    1007           1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1008           1 :         n, err := s.f.ReadAt(p, off)
    1009           1 :         if invariants.Enabled && err == nil && n != len(p) {
    1010           0 :                 panic("short read")
    1011             :         }
    1012           1 :         return err
    1013             : }
    1014             : 
    1015             : // Close is part of the objstorage.Readable interface.
    1016           1 : func (s *simpleReadable) Close() error {
    1017           1 :         return s.f.Close()
    1018           1 : }
    1019             : 
    1020             : // Size is part of the objstorage.Readable interface.
    1021           1 : func (s *simpleReadable) Size() int64 {
    1022           1 :         return s.size
    1023           1 : }
    1024             : 
    1025             : // NewReaddHandle is part of the objstorage.Readable interface.
    1026             : func (s *simpleReadable) NewReadHandle(
    1027             :         readBeforeSize objstorage.ReadBeforeSize,
    1028           1 : ) objstorage.ReadHandle {
    1029           1 :         return &s.rh
    1030           1 : }
    1031             : 
    1032           0 : func errCorruptIndexEntry(err error) error {
    1033           0 :         err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
    1034           0 :         if invariants.Enabled {
    1035           0 :                 panic(err)
    1036             :         }
    1037           0 :         return err
    1038             : }
    1039             : 
    1040             : type deterministicStopwatchForTesting struct {
    1041             :         startTime time.Time
    1042             : }
    1043             : 
    1044           1 : func makeStopwatch() deterministicStopwatchForTesting {
    1045           1 :         return deterministicStopwatchForTesting{startTime: time.Now()}
    1046           1 : }
    1047             : 
    1048           1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
    1049           1 :         dur := time.Since(w.startTime)
    1050           1 :         if deterministicReadBlockDurationForTesting {
    1051           0 :                 dur = slowReadTracingThreshold
    1052           0 :         }
    1053           1 :         return dur
    1054             : }

Generated by: LCOV version 1.14