LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2024-08-16 08:16Z 91a64c7d - meta test only.lcov Lines: 518 711 72.9 %
Date: 2024-08-16 08:17:41 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "encoding/binary"
      12             :         "io"
      13             :         "os"
      14             :         "path/filepath"
      15             :         "runtime"
      16             :         "slices"
      17             :         "time"
      18             : 
      19             :         "github.com/cespare/xxhash/v2"
      20             :         "github.com/cockroachdb/errors"
      21             :         "github.com/cockroachdb/fifo"
      22             :         "github.com/cockroachdb/pebble/internal/base"
      23             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      24             :         "github.com/cockroachdb/pebble/internal/cache"
      25             :         "github.com/cockroachdb/pebble/internal/crc"
      26             :         "github.com/cockroachdb/pebble/internal/invariants"
      27             :         "github.com/cockroachdb/pebble/internal/keyspan"
      28             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      29             :         "github.com/cockroachdb/pebble/objstorage"
      30             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      31             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      32             :         "github.com/cockroachdb/pebble/sstable/block"
      33             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      34             : )
      35             : 
      36             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      37             : 
      38             : // decodeBlockHandle returns the block handle encoded at the start of src, as
      39             : // well as the number of bytes it occupies. It returns zero if given invalid
      40             : // input. A block handle for a data block or a first/lower level index block
      41             : // should not be decoded using decodeBlockHandle since the caller may validate
      42             : // that the number of bytes decoded is equal to the length of src, which will
      43             : // be false if the properties are not decoded. In those cases the caller
      44             : // should use decodeBlockHandleWithProperties.
      45           1 : func decodeBlockHandle(src []byte) (block.Handle, int) {
      46           1 :         offset, n := binary.Uvarint(src)
      47           1 :         length, m := binary.Uvarint(src[n:])
      48           1 :         if n == 0 || m == 0 {
      49           0 :                 return block.Handle{}, 0
      50           0 :         }
      51           1 :         return block.Handle{Offset: offset, Length: length}, n + m
      52             : }
      53             : 
      54             : // decodeBlockHandleWithProperties returns the block handle and properties
      55             : // encoded in src. src needs to be exactly the length that was encoded. This
      56             : // method must be used for data block and first/lower level index blocks. The
      57             : // properties in the block handle point to the bytes in src.
      58           1 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
      59           1 :         bh, n := decodeBlockHandle(src)
      60           1 :         if n == 0 {
      61           0 :                 return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
      62           0 :         }
      63           1 :         return BlockHandleWithProperties{
      64           1 :                 Handle: bh,
      65           1 :                 Props:  src[n:],
      66           1 :         }, nil
      67             : }
      68             : 
      69           1 : func encodeBlockHandle(dst []byte, b block.Handle) int {
      70           1 :         n := binary.PutUvarint(dst, b.Offset)
      71           1 :         m := binary.PutUvarint(dst[n:], b.Length)
      72           1 :         return n + m
      73           1 : }
      74             : 
      75           1 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
      76           1 :         n := encodeBlockHandle(dst, b.Handle)
      77           1 :         dst = append(dst[:n], b.Props...)
      78           1 :         return dst
      79           1 : }
      80             : 
      81             : type loadBlockResult int8
      82             : 
      83             : const (
      84             :         loadBlockOK loadBlockResult = iota
      85             :         // Could be due to error or because no block left to load.
      86             :         loadBlockFailed
      87             :         loadBlockIrrelevant
      88             : )
      89             : 
      90             : type blockTransform func([]byte) ([]byte, error)
      91             : 
      92             : // Reader is a table reader.
      93             : type Reader struct {
      94             :         readable objstorage.Readable
      95             : 
      96             :         // The following fields are copied from the ReadOptions.
      97             :         cacheOpts            sstableinternal.CacheOptions
      98             :         loadBlockSema        *fifo.Semaphore
      99             :         deniedUserProperties map[string]struct{}
     100             :         filterMetricsTracker *FilterMetricsTracker
     101             :         logger               base.LoggerAndTracer
     102             : 
     103             :         Comparer  *base.Comparer
     104             :         Compare   Compare
     105             :         SuffixCmp CompareSuffixes
     106             :         Equal     Equal
     107             :         Split     Split
     108             : 
     109             :         tableFilter *tableFilterReader
     110             : 
     111             :         err error
     112             : 
     113             :         indexBH      block.Handle
     114             :         filterBH     block.Handle
     115             :         rangeDelBH   block.Handle
     116             :         rangeKeyBH   block.Handle
     117             :         valueBIH     valueBlocksIndexHandle
     118             :         propertiesBH block.Handle
     119             :         metaIndexBH  block.Handle
     120             :         footerBH     block.Handle
     121             : 
     122             :         Properties   Properties
     123             :         tableFormat  TableFormat
     124             :         checksumType block.ChecksumType
     125             : 
     126             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
     127             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
     128             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
     129             :         // capacity 3 to accommodate the meta block (1), and both the compressed
     130             :         // properties block (1) and decompressed properties block (1)
     131             :         // simultaneously.
     132             :         metaBufferPool      block.BufferPool
     133             :         metaBufferPoolAlloc [3]block.AllocedBuffer
     134             : }
     135             : 
     136             : var _ CommonReader = (*Reader)(nil)
     137             : 
     138             : // Close the reader and the underlying objstorage.Readable.
     139           1 : func (r *Reader) Close() error {
     140           1 :         r.cacheOpts.Cache.Unref()
     141           1 : 
     142           1 :         if r.readable != nil {
     143           1 :                 r.err = firstError(r.err, r.readable.Close())
     144           1 :                 r.readable = nil
     145           1 :         }
     146             : 
     147           1 :         if r.err != nil {
     148           0 :                 return r.err
     149           0 :         }
     150             :         // Make any future calls to Get, NewIter or Close return an error.
     151           1 :         r.err = errReaderClosed
     152           1 :         return nil
     153             : }
     154             : 
     155             : // NewPointIter returns an iterator for the point keys in the table.
     156             : //
     157             : // If transform.HideObsoletePoints is set, the callee assumes that filterer
     158             : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
     159             : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     160             : func (r *Reader) NewPointIter(
     161             :         ctx context.Context,
     162             :         transforms IterTransforms,
     163             :         lower, upper []byte,
     164             :         filterer *BlockPropertiesFilterer,
     165             :         filterBlockSizeLimit FilterBlockSizeLimit,
     166             :         stats *base.InternalIteratorStats,
     167             :         categoryAndQoS CategoryAndQoS,
     168             :         statsCollector *CategoryStatsCollector,
     169             :         rp ReaderProvider,
     170           1 : ) (Iterator, error) {
     171           1 :         return r.newPointIter(
     172           1 :                 ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
     173           1 :                 stats, categoryAndQoS, statsCollector, rp, nil)
     174           1 : }
     175             : 
     176             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     177             : // before the call to NewPointIter, to get the value of hideObsoletePoints and
     178             : // potentially add a block property filter.
     179             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     180             :         snapshotForHideObsoletePoints base.SeqNum,
     181             :         fileLargestSeqNum base.SeqNum,
     182             :         pointKeyFilters []BlockPropertyFilter,
     183           1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     184           1 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     185           1 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     186           1 :         if hideObsoletePoints {
     187           1 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     188           1 :         }
     189           1 :         return hideObsoletePoints, pointKeyFilters
     190             : }
     191             : 
     192             : func (r *Reader) newPointIter(
     193             :         ctx context.Context,
     194             :         transforms IterTransforms,
     195             :         lower, upper []byte,
     196             :         filterer *BlockPropertiesFilterer,
     197             :         filterBlockSizeLimit FilterBlockSizeLimit,
     198             :         stats *base.InternalIteratorStats,
     199             :         categoryAndQoS CategoryAndQoS,
     200             :         statsCollector *CategoryStatsCollector,
     201             :         rp ReaderProvider,
     202             :         vState *virtualState,
     203           1 : ) (Iterator, error) {
     204           1 :         // NB: pebble.tableCache wraps the returned iterator with one which performs
     205           1 :         // reference counting on the Reader, preventing the Reader from being closed
     206           1 :         // until the final iterator closes.
     207           1 :         var res Iterator
     208           1 :         var err error
     209           1 :         if r.Properties.IndexType == twoLevelIndex {
     210           1 :                 res, err = newRowBlockTwoLevelIterator(
     211           1 :                         ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     212           1 :                         stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     213           1 :         } else {
     214           1 :                 res, err = newRowBlockSingleLevelIterator(
     215           1 :                         ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
     216           1 :                         stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     217           1 :         }
     218           1 :         if err != nil {
     219           0 :                 // Note: we don't want to return res here - it will be a nil
     220           0 :                 // single/twoLevelIterator, not a nil Iterator.
     221           0 :                 return nil, err
     222           0 :         }
     223           1 :         return res, nil
     224             : }
     225             : 
     226             : // NewIter returns an iterator for the point keys in the table. It is a
     227             : // simplified version of NewPointIter and should only be used for tests and
     228             : // tooling.
     229             : //
     230             : // NewIter must only be used when the Reader is guaranteed to outlive any
     231             : // LazyValues returned from the iter.
     232           1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
     233           1 :         // TODO(radu): we should probably not use bloom filters in this case, as there
     234           1 :         // likely isn't a cache set up.
     235           1 :         return r.NewPointIter(
     236           1 :                 context.TODO(), transforms, lower, upper, nil, AlwaysUseFilterBlock,
     237           1 :                 nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, MakeTrivialReaderProvider(r))
     238           1 : }
     239             : 
     240             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     241             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     242             : // after itself and returns a nil iterator.
     243             : func (r *Reader) NewCompactionIter(
     244             :         transforms IterTransforms,
     245             :         categoryAndQoS CategoryAndQoS,
     246             :         statsCollector *CategoryStatsCollector,
     247             :         rp ReaderProvider,
     248             :         bufferPool *block.BufferPool,
     249           1 : ) (Iterator, error) {
     250           1 :         return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool)
     251           1 : }
     252             : 
     253             : func (r *Reader) newCompactionIter(
     254             :         transforms IterTransforms,
     255             :         categoryAndQoS CategoryAndQoS,
     256             :         statsCollector *CategoryStatsCollector,
     257             :         rp ReaderProvider,
     258             :         vState *virtualState,
     259             :         bufferPool *block.BufferPool,
     260           1 : ) (Iterator, error) {
     261           1 :         if vState != nil && vState.isSharedIngested {
     262           1 :                 transforms.HideObsoletePoints = true
     263           1 :         }
     264           1 :         if r.Properties.IndexType == twoLevelIndex {
     265           1 :                 i, err := newRowBlockTwoLevelIterator(
     266           1 :                         context.Background(),
     267           1 :                         r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     268           1 :                         NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     269           1 :                 )
     270           1 :                 if err != nil {
     271           0 :                         return nil, err
     272           0 :                 }
     273           1 :                 i.SetupForCompaction()
     274           1 :                 return i, nil
     275             :         }
     276           1 :         i, err := newRowBlockSingleLevelIterator(
     277           1 :                 context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     278           1 :                 nil, NeverUseFilterBlock, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     279           1 :         )
     280           1 :         if err != nil {
     281           0 :                 return nil, err
     282           0 :         }
     283           1 :         i.SetupForCompaction()
     284           1 :         return i, nil
     285             : }
     286             : 
     287             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     288             : // range-del block for the table. Returns nil if the table does not contain
     289             : // any range deletions.
     290             : func (r *Reader) NewRawRangeDelIter(
     291             :         ctx context.Context, transforms FragmentIterTransforms,
     292           1 : ) (keyspan.FragmentIterator, error) {
     293           1 :         if r.rangeDelBH.Length == 0 {
     294           1 :                 return nil, nil
     295           1 :         }
     296           1 :         h, err := r.readRangeDel(ctx, nil /* stats */, nil /* iterStats */)
     297           1 :         if err != nil {
     298           0 :                 return nil, err
     299           0 :         }
     300           1 :         transforms.ElideSameSeqNum = true
     301           1 :         i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
     302           1 :         if err != nil {
     303           0 :                 return nil, err
     304           0 :         }
     305           1 :         return keyspan.MaybeAssert(i, r.Compare), nil
     306             : }
     307             : 
     308             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     309             : // range-key block for the table. Returns nil if the table does not contain any
     310             : // range keys.
     311             : func (r *Reader) NewRawRangeKeyIter(
     312             :         ctx context.Context, transforms FragmentIterTransforms,
     313           1 : ) (keyspan.FragmentIterator, error) {
     314           1 :         if r.rangeKeyBH.Length == 0 {
     315           1 :                 return nil, nil
     316           1 :         }
     317           1 :         h, err := r.readRangeKey(ctx, nil /* stats */, nil /* iterStats */)
     318           1 :         if err != nil {
     319           0 :                 return nil, err
     320           0 :         }
     321           1 :         i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
     322           1 :         if err != nil {
     323           0 :                 return nil, err
     324           0 :         }
     325           1 :         return keyspan.MaybeAssert(i, r.Compare), nil
     326             : }
     327             : 
     328             : func (r *Reader) readIndex(
     329             :         ctx context.Context,
     330             :         readHandle objstorage.ReadHandle,
     331             :         stats *base.InternalIteratorStats,
     332             :         iterStats *iterStatsAccumulator,
     333           1 : ) (block.BufferHandle, error) {
     334           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     335           1 :         return r.readBlock(ctx, r.indexBH, nil, readHandle, stats, iterStats, nil /* buffer pool */)
     336           1 : }
     337             : 
     338             : func (r *Reader) readFilter(
     339             :         ctx context.Context,
     340             :         readHandle objstorage.ReadHandle,
     341             :         stats *base.InternalIteratorStats,
     342             :         iterStats *iterStatsAccumulator,
     343           1 : ) (block.BufferHandle, error) {
     344           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     345           1 :         return r.readBlock(ctx, r.filterBH, nil /* transform */, readHandle, stats, iterStats, nil /* buffer pool */)
     346           1 : }
     347             : 
     348             : func (r *Reader) readRangeDel(
     349             :         ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     350           1 : ) (block.BufferHandle, error) {
     351           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     352           1 :         return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     353           1 : }
     354             : 
     355             : func (r *Reader) readRangeKey(
     356             :         ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     357           1 : ) (block.BufferHandle, error) {
     358           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     359           1 :         return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     360           1 : }
     361             : 
     362             : func checkChecksum(
     363             :         checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
     364           1 : ) error {
     365           1 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     366           1 :         var computedChecksum uint32
     367           1 :         switch checksumType {
     368           1 :         case block.ChecksumTypeCRC32c:
     369           1 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     370           0 :         case block.ChecksumTypeXXHash64:
     371           0 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     372           0 :         default:
     373           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     374             :         }
     375             : 
     376           1 :         if expectedChecksum != computedChecksum {
     377           0 :                 return base.CorruptionErrorf(
     378           0 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     379           0 :                         fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
     380           0 :         }
     381           1 :         return nil
     382             : }
     383             : 
     384             : // DeterministicReadBlockDurationForTesting is for tests that want a
     385             : // deterministic value of the time to read a block (that is not in the cache).
     386             : // The return value is a function that must be called before the test exits.
     387           0 : func DeterministicReadBlockDurationForTesting() func() {
     388           0 :         drbdForTesting := deterministicReadBlockDurationForTesting
     389           0 :         deterministicReadBlockDurationForTesting = true
     390           0 :         return func() {
     391           0 :                 deterministicReadBlockDurationForTesting = drbdForTesting
     392           0 :         }
     393             : }
     394             : 
     395             : var deterministicReadBlockDurationForTesting = false
     396             : 
     397             : func (r *Reader) readBlock(
     398             :         ctx context.Context,
     399             :         bh block.Handle,
     400             :         transform blockTransform,
     401             :         readHandle objstorage.ReadHandle,
     402             :         stats *base.InternalIteratorStats,
     403             :         iterStats *iterStatsAccumulator,
     404             :         bufferPool *block.BufferPool,
     405           1 : ) (handle block.BufferHandle, _ error) {
     406           1 :         if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Get() != nil {
     407           1 :                 // Cache hit.
     408           1 :                 if readHandle != nil {
     409           1 :                         readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
     410           1 :                 }
     411           1 :                 if stats != nil {
     412           1 :                         stats.BlockBytes += bh.Length
     413           1 :                         stats.BlockBytesInCache += bh.Length
     414           1 :                 }
     415           1 :                 if iterStats != nil {
     416           1 :                         iterStats.reportStats(bh.Length, bh.Length, 0)
     417           1 :                 }
     418             :                 // This block is already in the cache; return a handle to existing vlaue
     419             :                 // in the cache.
     420           1 :                 return block.CacheBufferHandle(h), nil
     421             :         }
     422             : 
     423             :         // Cache miss.
     424             : 
     425           1 :         if sema := r.loadBlockSema; sema != nil {
     426           0 :                 if err := sema.Acquire(ctx, 1); err != nil {
     427           0 :                         // An error here can only come from the context.
     428           0 :                         return block.BufferHandle{}, err
     429           0 :                 }
     430           0 :                 defer sema.Release(1)
     431             :         }
     432             : 
     433           1 :         compressed := block.Alloc(int(bh.Length+block.TrailerLen), bufferPool)
     434           1 :         readStopwatch := makeStopwatch()
     435           1 :         var err error
     436           1 :         if readHandle != nil {
     437           1 :                 err = readHandle.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
     438           1 :         } else {
     439           1 :                 err = r.readable.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
     440           1 :         }
     441           1 :         readDuration := readStopwatch.stop()
     442           1 :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     443           1 :         // interface{}, unless necessary.
     444           1 :         if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
     445           0 :                 _, file1, line1, _ := runtime.Caller(1)
     446           0 :                 _, file2, line2, _ := runtime.Caller(2)
     447           0 :                 r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
     448           0 :                         int(bh.Length+block.TrailerLen), readDuration.String(),
     449           0 :                         r.cacheOpts.FileNum,
     450           0 :                         filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
     451           0 :                         filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
     452           0 :         }
     453           1 :         if stats != nil {
     454           1 :                 stats.BlockBytes += bh.Length
     455           1 :                 stats.BlockReadDuration += readDuration
     456           1 :         }
     457           1 :         if err != nil {
     458           0 :                 compressed.Release()
     459           0 :                 return block.BufferHandle{}, err
     460           0 :         }
     461           1 :         if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.cacheOpts.FileNum); err != nil {
     462           0 :                 compressed.Release()
     463           0 :                 return block.BufferHandle{}, err
     464           0 :         }
     465             : 
     466           1 :         typ := block.CompressionIndicator(compressed.Get()[bh.Length])
     467           1 :         compressed.Truncate(int(bh.Length))
     468           1 : 
     469           1 :         var decompressed block.Value
     470           1 :         if typ == block.NoCompressionIndicator {
     471           1 :                 decompressed = compressed
     472           1 :         } else {
     473           1 :                 // Decode the length of the decompressed value.
     474           1 :                 decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.Get())
     475           1 :                 if err != nil {
     476           0 :                         compressed.Release()
     477           0 :                         return block.BufferHandle{}, err
     478           0 :                 }
     479             : 
     480           1 :                 decompressed = block.Alloc(decodedLen, bufferPool)
     481           1 :                 if err := block.DecompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
     482           0 :                         compressed.Release()
     483           0 :                         return block.BufferHandle{}, err
     484           0 :                 }
     485           1 :                 compressed.Release()
     486             :         }
     487             : 
     488           1 :         if transform != nil {
     489           0 :                 // Transforming blocks is very rare, so the extra copy of the
     490           0 :                 // transformed data is not problematic.
     491           0 :                 tmpTransformed, err := transform(decompressed.Get())
     492           0 :                 if err != nil {
     493           0 :                         decompressed.Release()
     494           0 :                         return block.BufferHandle{}, err
     495           0 :                 }
     496             : 
     497           0 :                 transformed := block.Alloc(len(tmpTransformed), bufferPool)
     498           0 :                 copy(transformed.Get(), tmpTransformed)
     499           0 :                 decompressed.Release()
     500           0 :                 decompressed = transformed
     501             :         }
     502             : 
     503           1 :         if iterStats != nil {
     504           1 :                 iterStats.reportStats(bh.Length, 0, readDuration)
     505           1 :         }
     506           1 :         h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     507           1 :         return h, nil
     508             : }
     509             : 
     510             : func (r *Reader) readMetaindex(
     511             :         ctx context.Context,
     512             :         metaindexBH block.Handle,
     513             :         readHandle objstorage.ReadHandle,
     514             :         filters map[string]FilterPolicy,
     515           1 : ) error {
     516           1 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     517           1 :         // populating the block cache with these blocks. In heavy-write workloads,
     518           1 :         // especially with high compaction concurrency, new tables may be created
     519           1 :         // frequently. Populating the block cache with these metaindex blocks adds
     520           1 :         // additional contention on the block cache mutexes (see #1997).
     521           1 :         // Additionally, these blocks are exceedingly unlikely to be read again
     522           1 :         // while they're still in the block cache except in misconfigurations with
     523           1 :         // excessive sstables counts or a table cache that's far too small.
     524           1 :         r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
     525           1 :         // When we're finished, release the buffers we've allocated back to memory
     526           1 :         // allocator. We don't expect to use metaBufferPool again.
     527           1 :         defer r.metaBufferPool.Release()
     528           1 : 
     529           1 :         b, err := r.readBlock(
     530           1 :                 ctx, metaindexBH, nil /* transform */, readHandle, nil, /* stats */
     531           1 :                 nil /* iterStats */, &r.metaBufferPool)
     532           1 :         if err != nil {
     533           0 :                 return err
     534           0 :         }
     535           1 :         data := b.Get()
     536           1 :         defer b.Release()
     537           1 : 
     538           1 :         if uint64(len(data)) != metaindexBH.Length {
     539           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     540           0 :                         errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
     541           0 :         }
     542             : 
     543           1 :         i, err := rowblk.NewRawIter(bytes.Compare, data)
     544           1 :         if err != nil {
     545           0 :                 return err
     546           0 :         }
     547             : 
     548           1 :         meta := map[string]block.Handle{}
     549           1 :         for valid := i.First(); valid; valid = i.Next() {
     550           1 :                 value := i.Value()
     551           1 :                 if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
     552           1 :                         vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
     553           1 :                         if err != nil {
     554           0 :                                 return err
     555           0 :                         }
     556           1 :                         if n == 0 || n != len(value) {
     557           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
     558           0 :                         }
     559           1 :                         r.valueBIH = vbih
     560           1 :                 } else {
     561           1 :                         bh, n := decodeBlockHandle(value)
     562           1 :                         if n == 0 || n != len(value) {
     563           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
     564           0 :                         }
     565           1 :                         meta[string(i.Key().UserKey)] = bh
     566             :                 }
     567             :         }
     568           1 :         if err := i.Close(); err != nil {
     569           0 :                 return err
     570           0 :         }
     571             : 
     572           1 :         if bh, ok := meta[metaPropertiesName]; ok {
     573           1 :                 b, err = r.readBlock(
     574           1 :                         ctx, bh, nil /* transform */, readHandle, nil, /* stats */
     575           1 :                         nil /* iterStats */, nil /* buffer pool */)
     576           1 :                 if err != nil {
     577           0 :                         return err
     578           0 :                 }
     579           1 :                 r.propertiesBH = bh
     580           1 :                 err := r.Properties.load(b.Get(), r.deniedUserProperties)
     581           1 :                 b.Release()
     582           1 :                 if err != nil {
     583           0 :                         return err
     584           0 :                 }
     585             :         }
     586             : 
     587           1 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     588           1 :                 r.rangeDelBH = bh
     589           1 :         } else if _, ok := meta[metaRangeDelV1Name]; ok {
     590           0 :                 // This version of Pebble requires a format major version at least as
     591           0 :                 // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
     592           0 :                 // this format major verison, we have a guarantee that we've compacted
     593           0 :                 // away all RocksDB sstables. It should not be possible to encounter an
     594           0 :                 // sstable with a v1 range deletion block but not a v2 range deletion
     595           0 :                 // block.
     596           0 :                 err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
     597           0 :                 return errors.Mark(err, base.ErrCorruption)
     598           0 :         }
     599             : 
     600           1 :         if bh, ok := meta[metaRangeKeyName]; ok {
     601           1 :                 r.rangeKeyBH = bh
     602           1 :         }
     603             : 
     604           1 :         for name, fp := range filters {
     605           1 :                 types := []struct {
     606           1 :                         ftype  FilterType
     607           1 :                         prefix string
     608           1 :                 }{
     609           1 :                         {TableFilter, "fullfilter."},
     610           1 :                 }
     611           1 :                 var done bool
     612           1 :                 for _, t := range types {
     613           1 :                         if bh, ok := meta[t.prefix+name]; ok {
     614           1 :                                 r.filterBH = bh
     615           1 : 
     616           1 :                                 switch t.ftype {
     617           1 :                                 case TableFilter:
     618           1 :                                         r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
     619           0 :                                 default:
     620           0 :                                         return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
     621             :                                 }
     622             : 
     623           1 :                                 done = true
     624           1 :                                 break
     625             :                         }
     626             :                 }
     627           1 :                 if done {
     628           1 :                         break
     629             :                 }
     630             :         }
     631           1 :         return nil
     632             : }
     633             : 
     634             : // Layout returns the layout (block organization) for an sstable.
     635           1 : func (r *Reader) Layout() (*Layout, error) {
     636           1 :         if r.err != nil {
     637           0 :                 return nil, r.err
     638           0 :         }
     639             : 
     640           1 :         l := &Layout{
     641           1 :                 Data:       make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
     642           1 :                 Filter:     r.filterBH,
     643           1 :                 RangeDel:   r.rangeDelBH,
     644           1 :                 RangeKey:   r.rangeKeyBH,
     645           1 :                 ValueIndex: r.valueBIH.h,
     646           1 :                 Properties: r.propertiesBH,
     647           1 :                 MetaIndex:  r.metaIndexBH,
     648           1 :                 Footer:     r.footerBH,
     649           1 :                 Format:     r.tableFormat,
     650           1 :         }
     651           1 : 
     652           1 :         indexH, err := r.readIndex(context.Background(), nil, nil, nil)
     653           1 :         if err != nil {
     654           0 :                 return nil, err
     655           0 :         }
     656           1 :         defer indexH.Release()
     657           1 : 
     658           1 :         var alloc bytealloc.A
     659           1 : 
     660           1 :         if r.Properties.IndexPartitions == 0 {
     661           1 :                 l.Index = append(l.Index, r.indexBH)
     662           1 :                 iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     663           1 :                 for kv := iter.First(); kv != nil; kv = iter.Next() {
     664           1 :                         dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     665           1 :                         if err != nil {
     666           0 :                                 return nil, errCorruptIndexEntry(err)
     667           0 :                         }
     668           1 :                         if len(dataBH.Props) > 0 {
     669           1 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     670           1 :                         }
     671           1 :                         l.Data = append(l.Data, dataBH)
     672             :                 }
     673           1 :         } else {
     674           1 :                 l.TopIndex = r.indexBH
     675           1 :                 topIter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     676           1 :                 iter := &rowblk.Iter{}
     677           1 :                 for kv := topIter.First(); kv != nil; kv = topIter.Next() {
     678           1 :                         indexBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     679           1 :                         if err != nil {
     680           0 :                                 return nil, errCorruptIndexEntry(err)
     681           0 :                         }
     682           1 :                         l.Index = append(l.Index, indexBH.Handle)
     683           1 : 
     684           1 :                         subIndex, err := r.readBlock(context.Background(), indexBH.Handle,
     685           1 :                                 nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     686           1 :                         if err != nil {
     687           0 :                                 return nil, err
     688           0 :                         }
     689             :                         // TODO(msbutler): figure out how to pass virtualState to layout call.
     690           1 :                         if err := iter.Init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
     691           0 :                                 return nil, err
     692           0 :                         }
     693           1 :                         for kv := iter.First(); kv != nil; kv = iter.Next() {
     694           1 :                                 dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     695           1 :                                 if len(dataBH.Props) > 0 {
     696           1 :                                         alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     697           1 :                                 }
     698           1 :                                 if err != nil {
     699           0 :                                         return nil, errCorruptIndexEntry(err)
     700           0 :                                 }
     701           1 :                                 l.Data = append(l.Data, dataBH)
     702             :                         }
     703           1 :                         subIndex.Release()
     704           1 :                         *iter = iter.ResetForReuse()
     705             :                 }
     706             :         }
     707           1 :         if r.valueBIH.h.Length != 0 {
     708           1 :                 vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
     709           1 :                 if err != nil {
     710           0 :                         return nil, err
     711           0 :                 }
     712           1 :                 defer vbiH.Release()
     713           1 :                 vbiBlock := vbiH.Get()
     714           1 :                 indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
     715           1 :                         r.valueBIH.blockLengthByteLength)
     716           1 :                 i := 0
     717           1 :                 for len(vbiBlock) != 0 {
     718           1 :                         if len(vbiBlock) < indexEntryLen {
     719           0 :                                 return nil, errors.Errorf(
     720           0 :                                         "remaining value index block %d does not contain a full entry of length %d",
     721           0 :                                         len(vbiBlock), indexEntryLen)
     722           0 :                         }
     723           1 :                         n := int(r.valueBIH.blockNumByteLength)
     724           1 :                         bn := int(littleEndianGet(vbiBlock, n))
     725           1 :                         if bn != i {
     726           0 :                                 return nil, errors.Errorf("unexpected block num %d, expected %d",
     727           0 :                                         bn, i)
     728           0 :                         }
     729           1 :                         i++
     730           1 :                         vbiBlock = vbiBlock[n:]
     731           1 :                         n = int(r.valueBIH.blockOffsetByteLength)
     732           1 :                         blockOffset := littleEndianGet(vbiBlock, n)
     733           1 :                         vbiBlock = vbiBlock[n:]
     734           1 :                         n = int(r.valueBIH.blockLengthByteLength)
     735           1 :                         blockLen := littleEndianGet(vbiBlock, n)
     736           1 :                         vbiBlock = vbiBlock[n:]
     737           1 :                         l.ValueBlock = append(l.ValueBlock, block.Handle{Offset: blockOffset, Length: blockLen})
     738             :                 }
     739             :         }
     740             : 
     741           1 :         return l, nil
     742             : }
     743             : 
     744             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
     745           1 : func (r *Reader) ValidateBlockChecksums() error {
     746           1 :         // Pre-compute the BlockHandles for the underlying file.
     747           1 :         l, err := r.Layout()
     748           1 :         if err != nil {
     749           0 :                 return err
     750           0 :         }
     751             : 
     752             :         // Construct the set of blocks to check. Note that the footer is not checked
     753             :         // as it is not a block with a checksum.
     754           1 :         blocks := make([]block.Handle, len(l.Data))
     755           1 :         for i := range l.Data {
     756           1 :                 blocks[i] = l.Data[i].Handle
     757           1 :         }
     758           1 :         blocks = append(blocks, l.Index...)
     759           1 :         blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
     760           1 : 
     761           1 :         // Sorting by offset ensures we are performing a sequential scan of the
     762           1 :         // file.
     763           1 :         slices.SortFunc(blocks, func(a, b block.Handle) int {
     764           1 :                 return cmp.Compare(a.Offset, b.Offset)
     765           1 :         })
     766             : 
     767             :         // Check all blocks sequentially. Make use of read-ahead, given we are
     768             :         // scanning the entire file from start to end.
     769           1 :         rh := r.readable.NewReadHandle(objstorage.NoReadBefore)
     770           1 :         defer rh.Close()
     771           1 : 
     772           1 :         for _, bh := range blocks {
     773           1 :                 // Certain blocks may not be present, in which case we skip them.
     774           1 :                 if bh.Length == 0 {
     775           1 :                         continue
     776             :                 }
     777             : 
     778             :                 // Read the block, which validates the checksum.
     779           1 :                 h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
     780           1 :                 if err != nil {
     781           0 :                         return err
     782           0 :                 }
     783           1 :                 h.Release()
     784             :         }
     785             : 
     786           1 :         return nil
     787             : }
     788             : 
     789             : // CommonProperties implemented the CommonReader interface.
     790           1 : func (r *Reader) CommonProperties() *CommonProperties {
     791           1 :         return &r.Properties.CommonProperties
     792           1 : }
     793             : 
     794             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
     795             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
     796             : // determine overlap due to abbreviated index keys, the full data block size is
     797             : // included in the estimation.
     798             : //
     799             : // This function does not account for any metablock space usage. Assumes there
     800             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
     801             : // before nor completely after the file's range.
     802             : //
     803             : // Only blocks containing point keys are considered. Range deletion and range
     804             : // key blocks are not considered.
     805             : //
     806             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
     807             : // data blocks overlapped and add that same fraction of the metadata blocks to the
     808             : // estimate.
     809           1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
     810           1 :         if r.err != nil {
     811           0 :                 return 0, r.err
     812           0 :         }
     813             : 
     814           1 :         indexH, err := r.readIndex(context.Background(), nil, nil, nil)
     815           1 :         if err != nil {
     816           0 :                 return 0, err
     817           0 :         }
     818           1 :         defer indexH.Release()
     819           1 : 
     820           1 :         // Iterators over the bottom-level index blocks containing start and end.
     821           1 :         // These may be different in case of partitioned index but will both point
     822           1 :         // to the same blockIter over the single index in the unpartitioned case.
     823           1 :         var startIdxIter, endIdxIter *rowblk.Iter
     824           1 :         if r.Properties.IndexPartitions == 0 {
     825           1 :                 iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     826           1 :                 if err != nil {
     827           0 :                         return 0, err
     828           0 :                 }
     829           1 :                 startIdxIter = iter
     830           1 :                 endIdxIter = iter
     831           1 :         } else {
     832           1 :                 topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     833           1 :                 if err != nil {
     834           0 :                         return 0, err
     835           0 :                 }
     836             : 
     837           1 :                 kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
     838           1 :                 if kv == nil {
     839           1 :                         // The range falls completely after this file, or an error occurred.
     840           1 :                         return 0, topIter.Error()
     841           1 :                 }
     842           1 :                 startIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     843           1 :                 if err != nil {
     844           0 :                         return 0, errCorruptIndexEntry(err)
     845           0 :                 }
     846           1 :                 startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
     847           1 :                         nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     848           1 :                 if err != nil {
     849           0 :                         return 0, err
     850           0 :                 }
     851           1 :                 defer startIdxBlock.Release()
     852           1 :                 startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
     853           1 :                 if err != nil {
     854           0 :                         return 0, err
     855           0 :                 }
     856             : 
     857           1 :                 kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
     858           1 :                 if kv == nil {
     859           1 :                         if err := topIter.Error(); err != nil {
     860           0 :                                 return 0, err
     861           0 :                         }
     862           1 :                 } else {
     863           1 :                         endIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     864           1 :                         if err != nil {
     865           0 :                                 return 0, errCorruptIndexEntry(err)
     866           0 :                         }
     867           1 :                         endIdxBlock, err := r.readBlock(context.Background(),
     868           1 :                                 endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     869           1 :                         if err != nil {
     870           0 :                                 return 0, err
     871           0 :                         }
     872           1 :                         defer endIdxBlock.Release()
     873           1 :                         endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
     874           1 :                         if err != nil {
     875           0 :                                 return 0, err
     876           0 :                         }
     877             :                 }
     878             :         }
     879             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
     880             :         // range spans past the end of the file.
     881             : 
     882           1 :         kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
     883           1 :         if kv == nil {
     884           1 :                 // The range falls completely after this file, or an error occurred.
     885           1 :                 return 0, startIdxIter.Error()
     886           1 :         }
     887           1 :         startBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     888           1 :         if err != nil {
     889           0 :                 return 0, errCorruptIndexEntry(err)
     890           0 :         }
     891             : 
     892           1 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
     893           1 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
     894           1 :                 // Linearly interpolate what is stored in value blocks.
     895           1 :                 //
     896           1 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
     897           1 :                 // (which contain the value handles, and which may also be insufficient if
     898           1 :                 // the values are in separate files), we will need to accumulate the
     899           1 :                 // logical size of the key-value pairs and store the cumulative value for
     900           1 :                 // each data block in the index block entry. This increases the size of
     901           1 :                 // the BlockHandle, so wait until this becomes necessary.
     902           1 :                 return dataBlockSize +
     903           1 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
     904           1 :                                 float64(r.Properties.ValueBlocksSize))
     905           1 :         }
     906           1 :         if endIdxIter == nil {
     907           1 :                 // The range spans beyond this file. Include data blocks through the last.
     908           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
     909           1 :         }
     910           1 :         kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
     911           1 :         if kv == nil {
     912           1 :                 if err := endIdxIter.Error(); err != nil {
     913           0 :                         return 0, err
     914           0 :                 }
     915             :                 // The range spans beyond this file. Include data blocks through the last.
     916           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
     917             :         }
     918           1 :         endBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     919           1 :         if err != nil {
     920           0 :                 return 0, errCorruptIndexEntry(err)
     921           0 :         }
     922           1 :         return includeInterpolatedValueBlocksSize(
     923           1 :                 endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
     924             : }
     925             : 
     926             : // TableFormat returns the format version for the table.
     927           1 : func (r *Reader) TableFormat() (TableFormat, error) {
     928           1 :         if r.err != nil {
     929           0 :                 return TableFormatUnspecified, r.err
     930           0 :         }
     931           1 :         return r.tableFormat, nil
     932             : }
     933             : 
     934             : // NewReader returns a new table reader for the file. Closing the reader will
     935             : // close the file.
     936             : //
     937             : // The context is used for tracing any operations performed by NewReader; it is
     938             : // NOT stored for future use.
     939           1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
     940           1 :         if f == nil {
     941           0 :                 return nil, errors.New("pebble/table: nil file")
     942           0 :         }
     943           1 :         o = o.ensureDefaults()
     944           1 :         r := &Reader{
     945           1 :                 readable:             f,
     946           1 :                 cacheOpts:            o.internal.CacheOpts,
     947           1 :                 loadBlockSema:        o.LoadBlockSema,
     948           1 :                 deniedUserProperties: o.DeniedUserProperties,
     949           1 :                 filterMetricsTracker: o.FilterMetricsTracker,
     950           1 :                 logger:               o.LoggerAndTracer,
     951           1 :         }
     952           1 :         if r.cacheOpts.Cache == nil {
     953           1 :                 r.cacheOpts.Cache = cache.New(0)
     954           1 :         } else {
     955           1 :                 r.cacheOpts.Cache.Ref()
     956           1 :         }
     957           1 :         if r.cacheOpts.CacheID == 0 {
     958           1 :                 r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
     959           1 :         }
     960             : 
     961           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
     962           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
     963           1 :                 r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
     964           1 :         defer rh.Close()
     965           1 : 
     966           1 :         footer, err := readFooter(ctx, f, rh, r.logger, r.cacheOpts.FileNum)
     967           1 :         if err != nil {
     968           0 :                 r.err = err
     969           0 :                 return nil, r.Close()
     970           0 :         }
     971           1 :         r.checksumType = footer.checksum
     972           1 :         r.tableFormat = footer.format
     973           1 :         // Read the metaindex and properties blocks.
     974           1 :         if err := r.readMetaindex(ctx, footer.metaindexBH, rh, o.Filters); err != nil {
     975           0 :                 r.err = err
     976           0 :                 return nil, r.Close()
     977           0 :         }
     978           1 :         r.indexBH = footer.indexBH
     979           1 :         r.metaIndexBH = footer.metaindexBH
     980           1 :         r.footerBH = footer.footerBH
     981           1 : 
     982           1 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
     983           1 :                 r.Comparer = o.Comparer
     984           1 :                 r.Compare = o.Comparer.Compare
     985           1 :                 r.SuffixCmp = o.Comparer.CompareSuffixes
     986           1 :                 r.Equal = o.Comparer.Equal
     987           1 :                 r.Split = o.Comparer.Split
     988           1 :         } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
     989           0 :                 r.Comparer = o.Comparer
     990           0 :                 r.Compare = comparer.Compare
     991           0 :                 r.SuffixCmp = comparer.CompareSuffixes
     992           0 :                 r.Equal = comparer.Equal
     993           0 :                 r.Split = comparer.Split
     994           0 :         } else {
     995           0 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
     996           0 :                         errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
     997           0 :         }
     998             : 
     999           1 :         if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
    1000           1 :                 if o.Merger != nil && o.Merger.Name == mergerName {
    1001           1 :                         // opts.Merger matches.
    1002           1 :                 } else if _, ok := o.Mergers[mergerName]; ok {
    1003           0 :                         // Known merger.
    1004           0 :                 } else {
    1005           0 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
    1006           0 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
    1007           0 :                 }
    1008             :         }
    1009             : 
    1010           1 :         if r.err != nil {
    1011           0 :                 return nil, r.Close()
    1012           0 :         }
    1013             : 
    1014           1 :         return r, nil
    1015             : }
    1016             : 
    1017             : // ReadableFile describes the smallest subset of vfs.File that is required for
    1018             : // reading SSTs.
    1019             : type ReadableFile interface {
    1020             :         io.ReaderAt
    1021             :         io.Closer
    1022             :         Stat() (os.FileInfo, error)
    1023             : }
    1024             : 
    1025             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
    1026             : // implementation (which does not support read-ahead)
    1027           1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
    1028           1 :         info, err := r.Stat()
    1029           1 :         if err != nil {
    1030           0 :                 return nil, err
    1031           0 :         }
    1032           1 :         res := &simpleReadable{
    1033           1 :                 f:    r,
    1034           1 :                 size: info.Size(),
    1035           1 :         }
    1036           1 :         res.rh = objstorage.MakeNoopReadHandle(res)
    1037           1 :         return res, nil
    1038             : }
    1039             : 
    1040             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
    1041             : type simpleReadable struct {
    1042             :         f    ReadableFile
    1043             :         size int64
    1044             :         rh   objstorage.NoopReadHandle
    1045             : }
    1046             : 
    1047             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1048             : 
    1049             : // ReadAt is part of the objstorage.Readable interface.
    1050           1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1051           1 :         n, err := s.f.ReadAt(p, off)
    1052           1 :         if invariants.Enabled && err == nil && n != len(p) {
    1053           0 :                 panic("short read")
    1054             :         }
    1055           1 :         return err
    1056             : }
    1057             : 
    1058             : // Close is part of the objstorage.Readable interface.
    1059           1 : func (s *simpleReadable) Close() error {
    1060           1 :         return s.f.Close()
    1061           1 : }
    1062             : 
    1063             : // Size is part of the objstorage.Readable interface.
    1064           1 : func (s *simpleReadable) Size() int64 {
    1065           1 :         return s.size
    1066           1 : }
    1067             : 
    1068             : // NewReaddHandle is part of the objstorage.Readable interface.
    1069             : func (s *simpleReadable) NewReadHandle(
    1070             :         readBeforeSize objstorage.ReadBeforeSize,
    1071           1 : ) objstorage.ReadHandle {
    1072           1 :         return &s.rh
    1073           1 : }
    1074             : 
    1075           0 : func errCorruptIndexEntry(err error) error {
    1076           0 :         err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
    1077           0 :         if invariants.Enabled {
    1078           0 :                 panic(err)
    1079             :         }
    1080           0 :         return err
    1081             : }
    1082             : 
    1083             : type deterministicStopwatchForTesting struct {
    1084             :         startTime time.Time
    1085             : }
    1086             : 
    1087           1 : func makeStopwatch() deterministicStopwatchForTesting {
    1088           1 :         return deterministicStopwatchForTesting{startTime: time.Now()}
    1089           1 : }
    1090             : 
    1091           1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
    1092           1 :         dur := time.Since(w.startTime)
    1093           1 :         if deterministicReadBlockDurationForTesting {
    1094           0 :                 dur = slowReadTracingThreshold
    1095           0 :         }
    1096           1 :         return dur
    1097             : }

Generated by: LCOV version 1.14