LCOV - code coverage report
Current view: top level - pebble/sstable - reader.go (source / functions) Hit Total Coverage
Test: 2024-07-21 08:15Z 72c3f550 - meta test only.lcov Lines: 519 711 73.0 %
Date: 2024-07-21 08:16:40 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "encoding/binary"
      12             :         "io"
      13             :         "os"
      14             :         "path/filepath"
      15             :         "runtime"
      16             :         "slices"
      17             :         "time"
      18             : 
      19             :         "github.com/cespare/xxhash/v2"
      20             :         "github.com/cockroachdb/errors"
      21             :         "github.com/cockroachdb/fifo"
      22             :         "github.com/cockroachdb/pebble/internal/base"
      23             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      24             :         "github.com/cockroachdb/pebble/internal/cache"
      25             :         "github.com/cockroachdb/pebble/internal/crc"
      26             :         "github.com/cockroachdb/pebble/internal/invariants"
      27             :         "github.com/cockroachdb/pebble/internal/keyspan"
      28             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      29             :         "github.com/cockroachdb/pebble/objstorage"
      30             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      31             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      32             :         "github.com/cockroachdb/pebble/sstable/block"
      33             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      34             : )
      35             : 
      36             : var errReaderClosed = errors.New("pebble/table: reader is closed")
      37             : 
      38             : // decodeBlockHandle returns the block handle encoded at the start of src, as
      39             : // well as the number of bytes it occupies. It returns zero if given invalid
      40             : // input. A block handle for a data block or a first/lower level index block
      41             : // should not be decoded using decodeBlockHandle since the caller may validate
      42             : // that the number of bytes decoded is equal to the length of src, which will
      43             : // be false if the properties are not decoded. In those cases the caller
      44             : // should use decodeBlockHandleWithProperties.
      45           1 : func decodeBlockHandle(src []byte) (block.Handle, int) {
      46           1 :         offset, n := binary.Uvarint(src)
      47           1 :         length, m := binary.Uvarint(src[n:])
      48           1 :         if n == 0 || m == 0 {
      49           0 :                 return block.Handle{}, 0
      50           0 :         }
      51           1 :         return block.Handle{Offset: offset, Length: length}, n + m
      52             : }
      53             : 
      54             : // decodeBlockHandleWithProperties returns the block handle and properties
      55             : // encoded in src. src needs to be exactly the length that was encoded. This
      56             : // method must be used for data block and first/lower level index blocks. The
      57             : // properties in the block handle point to the bytes in src.
      58           1 : func decodeBlockHandleWithProperties(src []byte) (BlockHandleWithProperties, error) {
      59           1 :         bh, n := decodeBlockHandle(src)
      60           1 :         if n == 0 {
      61           0 :                 return BlockHandleWithProperties{}, errors.Errorf("invalid BlockHandle")
      62           0 :         }
      63           1 :         return BlockHandleWithProperties{
      64           1 :                 Handle: bh,
      65           1 :                 Props:  src[n:],
      66           1 :         }, nil
      67             : }
      68             : 
      69           1 : func encodeBlockHandle(dst []byte, b block.Handle) int {
      70           1 :         n := binary.PutUvarint(dst, b.Offset)
      71           1 :         m := binary.PutUvarint(dst[n:], b.Length)
      72           1 :         return n + m
      73           1 : }
      74             : 
      75           1 : func encodeBlockHandleWithProperties(dst []byte, b BlockHandleWithProperties) []byte {
      76           1 :         n := encodeBlockHandle(dst, b.Handle)
      77           1 :         dst = append(dst[:n], b.Props...)
      78           1 :         return dst
      79           1 : }
      80             : 
      81             : type loadBlockResult int8
      82             : 
      83             : const (
      84             :         loadBlockOK loadBlockResult = iota
      85             :         // Could be due to error or because no block left to load.
      86             :         loadBlockFailed
      87             :         loadBlockIrrelevant
      88             : )
      89             : 
      90             : type blockTransform func([]byte) ([]byte, error)
      91             : 
      92             : // Reader is a table reader.
      93             : type Reader struct {
      94             :         readable objstorage.Readable
      95             : 
      96             :         // The following fields are copied from the ReadOptions.
      97             :         cacheOpts            sstableinternal.CacheOptions
      98             :         loadBlockSema        *fifo.Semaphore
      99             :         deniedUserProperties map[string]struct{}
     100             :         filterMetricsTracker *FilterMetricsTracker
     101             :         logger               base.LoggerAndTracer
     102             : 
     103             :         Compare   Compare
     104             :         Equal     Equal
     105             :         FormatKey base.FormatKey
     106             :         Split     Split
     107             : 
     108             :         tableFilter *tableFilterReader
     109             : 
     110             :         err error
     111             : 
     112             :         indexBH      block.Handle
     113             :         filterBH     block.Handle
     114             :         rangeDelBH   block.Handle
     115             :         rangeKeyBH   block.Handle
     116             :         valueBIH     valueBlocksIndexHandle
     117             :         propertiesBH block.Handle
     118             :         metaIndexBH  block.Handle
     119             :         footerBH     block.Handle
     120             : 
     121             :         Properties   Properties
     122             :         tableFormat  TableFormat
     123             :         checksumType block.ChecksumType
     124             : 
     125             :         // metaBufferPool is a buffer pool used exclusively when opening a table and
     126             :         // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
     127             :         // the BufferPool.pool slice as a part of the Reader allocation. It's
     128             :         // capacity 3 to accommodate the meta block (1), and both the compressed
     129             :         // properties block (1) and decompressed properties block (1)
     130             :         // simultaneously.
     131             :         metaBufferPool      block.BufferPool
     132             :         metaBufferPoolAlloc [3]block.AllocedBuffer
     133             : }
     134             : 
     135             : var _ CommonReader = (*Reader)(nil)
     136             : 
     137             : // Close the reader and the underlying objstorage.Readable.
     138           1 : func (r *Reader) Close() error {
     139           1 :         r.cacheOpts.Cache.Unref()
     140           1 : 
     141           1 :         if r.readable != nil {
     142           1 :                 r.err = firstError(r.err, r.readable.Close())
     143           1 :                 r.readable = nil
     144           1 :         }
     145             : 
     146           1 :         if r.err != nil {
     147           0 :                 return r.err
     148           0 :         }
     149             :         // Make any future calls to Get, NewIter or Close return an error.
     150           1 :         r.err = errReaderClosed
     151           1 :         return nil
     152             : }
     153             : 
     154             : // NewIterWithBlockPropertyFilters returns an iterator for the contents of the
     155             : // table. If an error occurs, NewIterWithBlockPropertyFilters cleans up after
     156             : // itself and returns a nil iterator.
     157             : func (r *Reader) NewIterWithBlockPropertyFilters(
     158             :         transforms IterTransforms,
     159             :         lower, upper []byte,
     160             :         filterer *BlockPropertiesFilterer,
     161             :         useFilterBlock bool,
     162             :         stats *base.InternalIteratorStats,
     163             :         categoryAndQoS CategoryAndQoS,
     164             :         statsCollector *CategoryStatsCollector,
     165             :         rp ReaderProvider,
     166           1 : ) (Iterator, error) {
     167           1 :         return r.newIterWithBlockPropertyFiltersAndContext(
     168           1 :                 context.Background(), transforms, lower, upper, filterer, useFilterBlock,
     169           1 :                 stats, categoryAndQoS, statsCollector, rp, nil)
     170           1 : }
     171             : 
     172             : // NewIterWithBlockPropertyFiltersAndContextEtc is similar to
     173             : // NewIterWithBlockPropertyFilters and additionally accepts a context for
     174             : // tracing.
     175             : //
     176             : // If transform.HideObsoletePoints is set, the callee assumes that filterer
     177             : // already includes obsoleteKeyBlockPropertyFilter. The caller can satisfy this
     178             : // contract by first calling TryAddBlockPropertyFilterForHideObsoletePoints.
     179             : func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc(
     180             :         ctx context.Context,
     181             :         transforms IterTransforms,
     182             :         lower, upper []byte,
     183             :         filterer *BlockPropertiesFilterer,
     184             :         useFilterBlock bool,
     185             :         stats *base.InternalIteratorStats,
     186             :         categoryAndQoS CategoryAndQoS,
     187             :         statsCollector *CategoryStatsCollector,
     188             :         rp ReaderProvider,
     189           1 : ) (Iterator, error) {
     190           1 :         return r.newIterWithBlockPropertyFiltersAndContext(
     191           1 :                 ctx, transforms, lower, upper, filterer, useFilterBlock,
     192           1 :                 stats, categoryAndQoS, statsCollector, rp, nil)
     193           1 : }
     194             : 
     195             : // TryAddBlockPropertyFilterForHideObsoletePoints is expected to be called
     196             : // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the
     197             : // value of hideObsoletePoints and potentially add a block property filter.
     198             : func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints(
     199             :         snapshotForHideObsoletePoints base.SeqNum,
     200             :         fileLargestSeqNum base.SeqNum,
     201             :         pointKeyFilters []BlockPropertyFilter,
     202           1 : ) (hideObsoletePoints bool, filters []BlockPropertyFilter) {
     203           1 :         hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 &&
     204           1 :                 snapshotForHideObsoletePoints > fileLargestSeqNum
     205           1 :         if hideObsoletePoints {
     206           1 :                 pointKeyFilters = append(pointKeyFilters, obsoleteKeyBlockPropertyFilter{})
     207           1 :         }
     208           1 :         return hideObsoletePoints, pointKeyFilters
     209             : }
     210             : 
     211             : func (r *Reader) newIterWithBlockPropertyFiltersAndContext(
     212             :         ctx context.Context,
     213             :         transforms IterTransforms,
     214             :         lower, upper []byte,
     215             :         filterer *BlockPropertiesFilterer,
     216             :         useFilterBlock bool,
     217             :         stats *base.InternalIteratorStats,
     218             :         categoryAndQoS CategoryAndQoS,
     219             :         statsCollector *CategoryStatsCollector,
     220             :         rp ReaderProvider,
     221             :         vState *virtualState,
     222           1 : ) (Iterator, error) {
     223           1 :         // NB: pebble.tableCache wraps the returned iterator with one which performs
     224           1 :         // reference counting on the Reader, preventing the Reader from being closed
     225           1 :         // until the final iterator closes.
     226           1 :         var res Iterator
     227           1 :         var err error
     228           1 :         if r.Properties.IndexType == twoLevelIndex {
     229           1 :                 res, err = newTwoLevelIterator(ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
     230           1 :                         stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     231           1 :         } else {
     232           1 :                 res, err = newSingleLevelIterator(
     233           1 :                         ctx, r, vState, transforms, lower, upper, filterer, useFilterBlock,
     234           1 :                         stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
     235           1 :         }
     236           1 :         if err != nil {
     237           0 :                 // Note: we don't want to return res here - it will be a nil
     238           0 :                 // single/twoLevelIterator, not a nil Iterator.
     239           0 :                 return nil, err
     240           0 :         }
     241           1 :         return res, nil
     242             : }
     243             : 
     244             : // NewIter returns an iterator for the contents of the table. If an error
     245             : // occurs, NewIter cleans up after itself and returns a nil iterator. NewIter
     246             : // must only be used when the Reader is guaranteed to outlive any LazyValues
     247             : // returned from the iter.
     248           1 : func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterator, error) {
     249           1 :         return r.NewIterWithBlockPropertyFilters(
     250           1 :                 transforms, lower, upper, nil, true, /* useFilterBlock */
     251           1 :                 nil /* stats */, CategoryAndQoS{}, nil /* statsCollector */, TrivialReaderProvider{Reader: r})
     252           1 : }
     253             : 
     254             : // NewCompactionIter returns an iterator similar to NewIter but it also increments
     255             : // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up
     256             : // after itself and returns a nil iterator.
     257             : func (r *Reader) NewCompactionIter(
     258             :         transforms IterTransforms,
     259             :         categoryAndQoS CategoryAndQoS,
     260             :         statsCollector *CategoryStatsCollector,
     261             :         rp ReaderProvider,
     262             :         bufferPool *block.BufferPool,
     263           1 : ) (Iterator, error) {
     264           1 :         return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool)
     265           1 : }
     266             : 
     267             : func (r *Reader) newCompactionIter(
     268             :         transforms IterTransforms,
     269             :         categoryAndQoS CategoryAndQoS,
     270             :         statsCollector *CategoryStatsCollector,
     271             :         rp ReaderProvider,
     272             :         vState *virtualState,
     273             :         bufferPool *block.BufferPool,
     274           1 : ) (Iterator, error) {
     275           1 :         if vState != nil && vState.isSharedIngested {
     276           1 :                 transforms.HideObsoletePoints = true
     277           1 :         }
     278           1 :         if r.Properties.IndexType == twoLevelIndex {
     279           1 :                 i, err := newTwoLevelIterator(
     280           1 :                         context.Background(),
     281           1 :                         r, vState, transforms, nil /* lower */, nil /* upper */, nil,
     282           1 :                         false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     283           1 :                 )
     284           1 :                 if err != nil {
     285           0 :                         return nil, err
     286           0 :                 }
     287           1 :                 i.setupForCompaction()
     288           1 :                 return &twoLevelCompactionIterator{twoLevelIterator: i}, nil
     289             :         }
     290           1 :         i, err := newSingleLevelIterator(
     291           1 :                 context.Background(), r, vState, transforms, nil /* lower */, nil, /* upper */
     292           1 :                 nil, false /* useFilter */, nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
     293           1 :         )
     294           1 :         if err != nil {
     295           0 :                 return nil, err
     296           0 :         }
     297           1 :         i.setupForCompaction()
     298           1 :         return &compactionIterator{singleLevelIterator: i}, nil
     299             : }
     300             : 
     301             : // NewRawRangeDelIter returns an internal iterator for the contents of the
     302             : // range-del block for the table. Returns nil if the table does not contain
     303             : // any range deletions.
     304             : func (r *Reader) NewRawRangeDelIter(
     305             :         ctx context.Context, transforms FragmentIterTransforms,
     306           1 : ) (keyspan.FragmentIterator, error) {
     307           1 :         if r.rangeDelBH.Length == 0 {
     308           1 :                 return nil, nil
     309           1 :         }
     310           1 :         h, err := r.readRangeDel(ctx, nil /* stats */, nil /* iterStats */)
     311           1 :         if err != nil {
     312           0 :                 return nil, err
     313           0 :         }
     314           1 :         transforms.ElideSameSeqNum = true
     315           1 :         i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Split, h, transforms)
     316           1 :         if err != nil {
     317           0 :                 return nil, err
     318           0 :         }
     319           1 :         return keyspan.MaybeAssert(i, r.Compare), nil
     320             : }
     321             : 
     322             : // NewRawRangeKeyIter returns an internal iterator for the contents of the
     323             : // range-key block for the table. Returns nil if the table does not contain any
     324             : // range keys.
     325             : func (r *Reader) NewRawRangeKeyIter(
     326             :         ctx context.Context, transforms FragmentIterTransforms,
     327           1 : ) (keyspan.FragmentIterator, error) {
     328           1 :         if r.rangeKeyBH.Length == 0 {
     329           1 :                 return nil, nil
     330           1 :         }
     331           1 :         h, err := r.readRangeKey(ctx, nil /* stats */, nil /* iterStats */)
     332           1 :         if err != nil {
     333           0 :                 return nil, err
     334           0 :         }
     335           1 :         i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Split, h, transforms)
     336           1 :         if err != nil {
     337           0 :                 return nil, err
     338           0 :         }
     339           1 :         return keyspan.MaybeAssert(i, r.Compare), nil
     340             : }
     341             : 
     342             : func (r *Reader) readIndex(
     343             :         ctx context.Context,
     344             :         readHandle objstorage.ReadHandle,
     345             :         stats *base.InternalIteratorStats,
     346             :         iterStats *iterStatsAccumulator,
     347           1 : ) (block.BufferHandle, error) {
     348           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     349           1 :         return r.readBlock(ctx, r.indexBH, nil, readHandle, stats, iterStats, nil /* buffer pool */)
     350           1 : }
     351             : 
     352             : func (r *Reader) readFilter(
     353             :         ctx context.Context,
     354             :         readHandle objstorage.ReadHandle,
     355             :         stats *base.InternalIteratorStats,
     356             :         iterStats *iterStatsAccumulator,
     357           1 : ) (block.BufferHandle, error) {
     358           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock)
     359           1 :         return r.readBlock(ctx, r.filterBH, nil /* transform */, readHandle, stats, iterStats, nil /* buffer pool */)
     360           1 : }
     361             : 
     362             : func (r *Reader) readRangeDel(
     363             :         ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     364           1 : ) (block.BufferHandle, error) {
     365           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     366           1 :         return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     367           1 : }
     368             : 
     369             : func (r *Reader) readRangeKey(
     370             :         ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
     371           1 : ) (block.BufferHandle, error) {
     372           1 :         ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
     373           1 :         return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
     374           1 : }
     375             : 
     376             : func checkChecksum(
     377             :         checksumType block.ChecksumType, b []byte, bh block.Handle, fileNum base.DiskFileNum,
     378           1 : ) error {
     379           1 :         expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:])
     380           1 :         var computedChecksum uint32
     381           1 :         switch checksumType {
     382           1 :         case block.ChecksumTypeCRC32c:
     383           1 :                 computedChecksum = crc.New(b[:bh.Length+1]).Value()
     384           0 :         case block.ChecksumTypeXXHash64:
     385           0 :                 computedChecksum = uint32(xxhash.Sum64(b[:bh.Length+1]))
     386           0 :         default:
     387           0 :                 return errors.Errorf("unsupported checksum type: %d", checksumType)
     388             :         }
     389             : 
     390           1 :         if expectedChecksum != computedChecksum {
     391           0 :                 return base.CorruptionErrorf(
     392           0 :                         "pebble/table: invalid table %s (checksum mismatch at %d/%d)",
     393           0 :                         fileNum, errors.Safe(bh.Offset), errors.Safe(bh.Length))
     394           0 :         }
     395           1 :         return nil
     396             : }
     397             : 
     398             : // DeterministicReadBlockDurationForTesting is for tests that want a
     399             : // deterministic value of the time to read a block (that is not in the cache).
     400             : // The return value is a function that must be called before the test exits.
     401           0 : func DeterministicReadBlockDurationForTesting() func() {
     402           0 :         drbdForTesting := deterministicReadBlockDurationForTesting
     403           0 :         deterministicReadBlockDurationForTesting = true
     404           0 :         return func() {
     405           0 :                 deterministicReadBlockDurationForTesting = drbdForTesting
     406           0 :         }
     407             : }
     408             : 
     409             : var deterministicReadBlockDurationForTesting = false
     410             : 
     411             : func (r *Reader) readBlock(
     412             :         ctx context.Context,
     413             :         bh block.Handle,
     414             :         transform blockTransform,
     415             :         readHandle objstorage.ReadHandle,
     416             :         stats *base.InternalIteratorStats,
     417             :         iterStats *iterStatsAccumulator,
     418             :         bufferPool *block.BufferPool,
     419           1 : ) (handle block.BufferHandle, _ error) {
     420           1 :         if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Get() != nil {
     421           1 :                 // Cache hit.
     422           1 :                 if readHandle != nil {
     423           1 :                         readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
     424           1 :                 }
     425           1 :                 if stats != nil {
     426           1 :                         stats.BlockBytes += bh.Length
     427           1 :                         stats.BlockBytesInCache += bh.Length
     428           1 :                 }
     429           1 :                 if iterStats != nil {
     430           1 :                         iterStats.reportStats(bh.Length, bh.Length, 0)
     431           1 :                 }
     432             :                 // This block is already in the cache; return a handle to existing vlaue
     433             :                 // in the cache.
     434           1 :                 return block.CacheBufferHandle(h), nil
     435             :         }
     436             : 
     437             :         // Cache miss.
     438             : 
     439           1 :         if sema := r.loadBlockSema; sema != nil {
     440           0 :                 if err := sema.Acquire(ctx, 1); err != nil {
     441           0 :                         // An error here can only come from the context.
     442           0 :                         return block.BufferHandle{}, err
     443           0 :                 }
     444           0 :                 defer sema.Release(1)
     445             :         }
     446             : 
     447           1 :         compressed := block.Alloc(int(bh.Length+block.TrailerLen), bufferPool)
     448           1 :         readStopwatch := makeStopwatch()
     449           1 :         var err error
     450           1 :         if readHandle != nil {
     451           1 :                 err = readHandle.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
     452           1 :         } else {
     453           1 :                 err = r.readable.ReadAt(ctx, compressed.Get(), int64(bh.Offset))
     454           1 :         }
     455           1 :         readDuration := readStopwatch.stop()
     456           1 :         // Call IsTracingEnabled to avoid the allocations of boxing integers into an
     457           1 :         // interface{}, unless necessary.
     458           1 :         if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
     459           0 :                 _, file1, line1, _ := runtime.Caller(1)
     460           0 :                 _, file2, line2, _ := runtime.Caller(2)
     461           0 :                 r.logger.Eventf(ctx, "reading block of %d bytes took %s (fileNum=%s; %s/%s:%d -> %s/%s:%d)",
     462           0 :                         int(bh.Length+block.TrailerLen), readDuration.String(),
     463           0 :                         r.cacheOpts.FileNum,
     464           0 :                         filepath.Base(filepath.Dir(file2)), filepath.Base(file2), line2,
     465           0 :                         filepath.Base(filepath.Dir(file1)), filepath.Base(file1), line1)
     466           0 :         }
     467           1 :         if stats != nil {
     468           1 :                 stats.BlockBytes += bh.Length
     469           1 :                 stats.BlockReadDuration += readDuration
     470           1 :         }
     471           1 :         if err != nil {
     472           0 :                 compressed.Release()
     473           0 :                 return block.BufferHandle{}, err
     474           0 :         }
     475           1 :         if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.cacheOpts.FileNum); err != nil {
     476           0 :                 compressed.Release()
     477           0 :                 return block.BufferHandle{}, err
     478           0 :         }
     479             : 
     480           1 :         typ := blockType(compressed.Get()[bh.Length])
     481           1 :         compressed.Truncate(int(bh.Length))
     482           1 : 
     483           1 :         var decompressed block.Value
     484           1 :         if typ == noCompressionBlockType {
     485           1 :                 decompressed = compressed
     486           1 :         } else {
     487           1 :                 // Decode the length of the decompressed value.
     488           1 :                 decodedLen, prefixLen, err := decompressedLen(typ, compressed.Get())
     489           1 :                 if err != nil {
     490           0 :                         compressed.Release()
     491           0 :                         return block.BufferHandle{}, err
     492           0 :                 }
     493             : 
     494           1 :                 decompressed = block.Alloc(decodedLen, bufferPool)
     495           1 :                 if err := decompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
     496           0 :                         compressed.Release()
     497           0 :                         return block.BufferHandle{}, err
     498           0 :                 }
     499           1 :                 compressed.Release()
     500             :         }
     501             : 
     502           1 :         if transform != nil {
     503           0 :                 // Transforming blocks is very rare, so the extra copy of the
     504           0 :                 // transformed data is not problematic.
     505           0 :                 tmpTransformed, err := transform(decompressed.Get())
     506           0 :                 if err != nil {
     507           0 :                         decompressed.Release()
     508           0 :                         return block.BufferHandle{}, err
     509           0 :                 }
     510             : 
     511           0 :                 transformed := block.Alloc(len(tmpTransformed), bufferPool)
     512           0 :                 copy(transformed.Get(), tmpTransformed)
     513           0 :                 decompressed.Release()
     514           0 :                 decompressed = transformed
     515             :         }
     516             : 
     517           1 :         if iterStats != nil {
     518           1 :                 iterStats.reportStats(bh.Length, 0, readDuration)
     519           1 :         }
     520           1 :         h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
     521           1 :         return h, nil
     522             : }
     523             : 
     524             : func (r *Reader) readMetaindex(
     525             :         ctx context.Context,
     526             :         metaindexBH block.Handle,
     527             :         readHandle objstorage.ReadHandle,
     528             :         filters map[string]FilterPolicy,
     529           1 : ) error {
     530           1 :         // We use a BufferPool when reading metaindex blocks in order to avoid
     531           1 :         // populating the block cache with these blocks. In heavy-write workloads,
     532           1 :         // especially with high compaction concurrency, new tables may be created
     533           1 :         // frequently. Populating the block cache with these metaindex blocks adds
     534           1 :         // additional contention on the block cache mutexes (see #1997).
     535           1 :         // Additionally, these blocks are exceedingly unlikely to be read again
     536           1 :         // while they're still in the block cache except in misconfigurations with
     537           1 :         // excessive sstables counts or a table cache that's far too small.
     538           1 :         r.metaBufferPool.InitPreallocated(r.metaBufferPoolAlloc[:0])
     539           1 :         // When we're finished, release the buffers we've allocated back to memory
     540           1 :         // allocator. We don't expect to use metaBufferPool again.
     541           1 :         defer r.metaBufferPool.Release()
     542           1 : 
     543           1 :         b, err := r.readBlock(
     544           1 :                 ctx, metaindexBH, nil /* transform */, readHandle, nil, /* stats */
     545           1 :                 nil /* iterStats */, &r.metaBufferPool)
     546           1 :         if err != nil {
     547           0 :                 return err
     548           0 :         }
     549           1 :         data := b.Get()
     550           1 :         defer b.Release()
     551           1 : 
     552           1 :         if uint64(len(data)) != metaindexBH.Length {
     553           0 :                 return base.CorruptionErrorf("pebble/table: unexpected metaindex block size: %d vs %d",
     554           0 :                         errors.Safe(len(data)), errors.Safe(metaindexBH.Length))
     555           0 :         }
     556             : 
     557           1 :         i, err := rowblk.NewRawIter(bytes.Compare, data)
     558           1 :         if err != nil {
     559           0 :                 return err
     560           0 :         }
     561             : 
     562           1 :         meta := map[string]block.Handle{}
     563           1 :         for valid := i.First(); valid; valid = i.Next() {
     564           1 :                 value := i.Value()
     565           1 :                 if bytes.Equal(i.Key().UserKey, []byte(metaValueIndexName)) {
     566           1 :                         vbih, n, err := decodeValueBlocksIndexHandle(i.Value())
     567           1 :                         if err != nil {
     568           0 :                                 return err
     569           0 :                         }
     570           1 :                         if n == 0 || n != len(value) {
     571           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad value blocks index handle)")
     572           0 :                         }
     573           1 :                         r.valueBIH = vbih
     574           1 :                 } else {
     575           1 :                         bh, n := decodeBlockHandle(value)
     576           1 :                         if n == 0 || n != len(value) {
     577           0 :                                 return base.CorruptionErrorf("pebble/table: invalid table (bad block handle)")
     578           0 :                         }
     579           1 :                         meta[string(i.Key().UserKey)] = bh
     580             :                 }
     581             :         }
     582           1 :         if err := i.Close(); err != nil {
     583           0 :                 return err
     584           0 :         }
     585             : 
     586           1 :         if bh, ok := meta[metaPropertiesName]; ok {
     587           1 :                 b, err = r.readBlock(
     588           1 :                         ctx, bh, nil /* transform */, readHandle, nil, /* stats */
     589           1 :                         nil /* iterStats */, nil /* buffer pool */)
     590           1 :                 if err != nil {
     591           0 :                         return err
     592           0 :                 }
     593           1 :                 r.propertiesBH = bh
     594           1 :                 err := r.Properties.load(b.Get(), r.deniedUserProperties)
     595           1 :                 b.Release()
     596           1 :                 if err != nil {
     597           0 :                         return err
     598           0 :                 }
     599             :         }
     600             : 
     601           1 :         if bh, ok := meta[metaRangeDelV2Name]; ok {
     602           1 :                 r.rangeDelBH = bh
     603           1 :         } else if _, ok := meta[metaRangeDelV1Name]; ok {
     604           0 :                 // This version of Pebble requires a format major version at least as
     605           0 :                 // high as FormatFlushableIngest (see pebble.FormatMinSupported). In
     606           0 :                 // this format major verison, we have a guarantee that we've compacted
     607           0 :                 // away all RocksDB sstables. It should not be possible to encounter an
     608           0 :                 // sstable with a v1 range deletion block but not a v2 range deletion
     609           0 :                 // block.
     610           0 :                 err := errors.Newf("pebble/table: unexpected range-del block type: %s", metaRangeDelV1Name)
     611           0 :                 return errors.Mark(err, base.ErrCorruption)
     612           0 :         }
     613             : 
     614           1 :         if bh, ok := meta[metaRangeKeyName]; ok {
     615           1 :                 r.rangeKeyBH = bh
     616           1 :         }
     617             : 
     618           1 :         for name, fp := range filters {
     619           1 :                 types := []struct {
     620           1 :                         ftype  FilterType
     621           1 :                         prefix string
     622           1 :                 }{
     623           1 :                         {TableFilter, "fullfilter."},
     624           1 :                 }
     625           1 :                 var done bool
     626           1 :                 for _, t := range types {
     627           1 :                         if bh, ok := meta[t.prefix+name]; ok {
     628           1 :                                 r.filterBH = bh
     629           1 : 
     630           1 :                                 switch t.ftype {
     631           1 :                                 case TableFilter:
     632           1 :                                         r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
     633           0 :                                 default:
     634           0 :                                         return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
     635             :                                 }
     636             : 
     637           1 :                                 done = true
     638           1 :                                 break
     639             :                         }
     640             :                 }
     641           1 :                 if done {
     642           1 :                         break
     643             :                 }
     644             :         }
     645           1 :         return nil
     646             : }
     647             : 
     648             : // Layout returns the layout (block organization) for an sstable.
     649           1 : func (r *Reader) Layout() (*Layout, error) {
     650           1 :         if r.err != nil {
     651           0 :                 return nil, r.err
     652           0 :         }
     653             : 
     654           1 :         l := &Layout{
     655           1 :                 Data:       make([]BlockHandleWithProperties, 0, r.Properties.NumDataBlocks),
     656           1 :                 Filter:     r.filterBH,
     657           1 :                 RangeDel:   r.rangeDelBH,
     658           1 :                 RangeKey:   r.rangeKeyBH,
     659           1 :                 ValueIndex: r.valueBIH.h,
     660           1 :                 Properties: r.propertiesBH,
     661           1 :                 MetaIndex:  r.metaIndexBH,
     662           1 :                 Footer:     r.footerBH,
     663           1 :                 Format:     r.tableFormat,
     664           1 :         }
     665           1 : 
     666           1 :         indexH, err := r.readIndex(context.Background(), nil, nil, nil)
     667           1 :         if err != nil {
     668           0 :                 return nil, err
     669           0 :         }
     670           1 :         defer indexH.Release()
     671           1 : 
     672           1 :         var alloc bytealloc.A
     673           1 : 
     674           1 :         if r.Properties.IndexPartitions == 0 {
     675           1 :                 l.Index = append(l.Index, r.indexBH)
     676           1 :                 iter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     677           1 :                 for kv := iter.First(); kv != nil; kv = iter.Next() {
     678           1 :                         dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     679           1 :                         if err != nil {
     680           0 :                                 return nil, errCorruptIndexEntry(err)
     681           0 :                         }
     682           1 :                         if len(dataBH.Props) > 0 {
     683           1 :                                 alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     684           1 :                         }
     685           1 :                         l.Data = append(l.Data, dataBH)
     686             :                 }
     687           1 :         } else {
     688           1 :                 l.TopIndex = r.indexBH
     689           1 :                 topIter, _ := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     690           1 :                 iter := &rowblk.Iter{}
     691           1 :                 for kv := topIter.First(); kv != nil; kv = topIter.Next() {
     692           1 :                         indexBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     693           1 :                         if err != nil {
     694           0 :                                 return nil, errCorruptIndexEntry(err)
     695           0 :                         }
     696           1 :                         l.Index = append(l.Index, indexBH.Handle)
     697           1 : 
     698           1 :                         subIndex, err := r.readBlock(context.Background(), indexBH.Handle,
     699           1 :                                 nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     700           1 :                         if err != nil {
     701           0 :                                 return nil, err
     702           0 :                         }
     703             :                         // TODO(msbutler): figure out how to pass virtualState to layout call.
     704           1 :                         if err := iter.Init(r.Compare, r.Split, subIndex.Get(), NoTransforms); err != nil {
     705           0 :                                 return nil, err
     706           0 :                         }
     707           1 :                         for kv := iter.First(); kv != nil; kv = iter.Next() {
     708           1 :                                 dataBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     709           1 :                                 if len(dataBH.Props) > 0 {
     710           1 :                                         alloc, dataBH.Props = alloc.Copy(dataBH.Props)
     711           1 :                                 }
     712           1 :                                 if err != nil {
     713           0 :                                         return nil, errCorruptIndexEntry(err)
     714           0 :                                 }
     715           1 :                                 l.Data = append(l.Data, dataBH)
     716             :                         }
     717           1 :                         subIndex.Release()
     718           1 :                         *iter = iter.ResetForReuse()
     719             :                 }
     720             :         }
     721           1 :         if r.valueBIH.h.Length != 0 {
     722           1 :                 vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil, nil /* buffer pool */)
     723           1 :                 if err != nil {
     724           0 :                         return nil, err
     725           0 :                 }
     726           1 :                 defer vbiH.Release()
     727           1 :                 vbiBlock := vbiH.Get()
     728           1 :                 indexEntryLen := int(r.valueBIH.blockNumByteLength + r.valueBIH.blockOffsetByteLength +
     729           1 :                         r.valueBIH.blockLengthByteLength)
     730           1 :                 i := 0
     731           1 :                 for len(vbiBlock) != 0 {
     732           1 :                         if len(vbiBlock) < indexEntryLen {
     733           0 :                                 return nil, errors.Errorf(
     734           0 :                                         "remaining value index block %d does not contain a full entry of length %d",
     735           0 :                                         len(vbiBlock), indexEntryLen)
     736           0 :                         }
     737           1 :                         n := int(r.valueBIH.blockNumByteLength)
     738           1 :                         bn := int(littleEndianGet(vbiBlock, n))
     739           1 :                         if bn != i {
     740           0 :                                 return nil, errors.Errorf("unexpected block num %d, expected %d",
     741           0 :                                         bn, i)
     742           0 :                         }
     743           1 :                         i++
     744           1 :                         vbiBlock = vbiBlock[n:]
     745           1 :                         n = int(r.valueBIH.blockOffsetByteLength)
     746           1 :                         blockOffset := littleEndianGet(vbiBlock, n)
     747           1 :                         vbiBlock = vbiBlock[n:]
     748           1 :                         n = int(r.valueBIH.blockLengthByteLength)
     749           1 :                         blockLen := littleEndianGet(vbiBlock, n)
     750           1 :                         vbiBlock = vbiBlock[n:]
     751           1 :                         l.ValueBlock = append(l.ValueBlock, block.Handle{Offset: blockOffset, Length: blockLen})
     752             :                 }
     753             :         }
     754             : 
     755           1 :         return l, nil
     756             : }
     757             : 
     758             : // ValidateBlockChecksums validates the checksums for each block in the SSTable.
     759           1 : func (r *Reader) ValidateBlockChecksums() error {
     760           1 :         // Pre-compute the BlockHandles for the underlying file.
     761           1 :         l, err := r.Layout()
     762           1 :         if err != nil {
     763           0 :                 return err
     764           0 :         }
     765             : 
     766             :         // Construct the set of blocks to check. Note that the footer is not checked
     767             :         // as it is not a block with a checksum.
     768           1 :         blocks := make([]block.Handle, len(l.Data))
     769           1 :         for i := range l.Data {
     770           1 :                 blocks[i] = l.Data[i].Handle
     771           1 :         }
     772           1 :         blocks = append(blocks, l.Index...)
     773           1 :         blocks = append(blocks, l.TopIndex, l.Filter, l.RangeDel, l.RangeKey, l.Properties, l.MetaIndex)
     774           1 : 
     775           1 :         // Sorting by offset ensures we are performing a sequential scan of the
     776           1 :         // file.
     777           1 :         slices.SortFunc(blocks, func(a, b block.Handle) int {
     778           1 :                 return cmp.Compare(a.Offset, b.Offset)
     779           1 :         })
     780             : 
     781             :         // Check all blocks sequentially. Make use of read-ahead, given we are
     782             :         // scanning the entire file from start to end.
     783           1 :         rh := r.readable.NewReadHandle(objstorage.NoReadBefore)
     784           1 :         defer rh.Close()
     785           1 : 
     786           1 :         for _, bh := range blocks {
     787           1 :                 // Certain blocks may not be present, in which case we skip them.
     788           1 :                 if bh.Length == 0 {
     789           1 :                         continue
     790             :                 }
     791             : 
     792             :                 // Read the block, which validates the checksum.
     793           1 :                 h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* iterStats */, nil /* buffer pool */)
     794           1 :                 if err != nil {
     795           0 :                         return err
     796           0 :                 }
     797           1 :                 h.Release()
     798             :         }
     799             : 
     800           1 :         return nil
     801             : }
     802             : 
     803             : // CommonProperties implemented the CommonReader interface.
     804           1 : func (r *Reader) CommonProperties() *CommonProperties {
     805           1 :         return &r.Properties.CommonProperties
     806           1 : }
     807             : 
     808             : // EstimateDiskUsage returns the total size of data blocks overlapping the range
     809             : // `[start, end]`. Even if a data block partially overlaps, or we cannot
     810             : // determine overlap due to abbreviated index keys, the full data block size is
     811             : // included in the estimation.
     812             : //
     813             : // This function does not account for any metablock space usage. Assumes there
     814             : // is at least partial overlap, i.e., `[start, end]` falls neither completely
     815             : // before nor completely after the file's range.
     816             : //
     817             : // Only blocks containing point keys are considered. Range deletion and range
     818             : // key blocks are not considered.
     819             : //
     820             : // TODO(ajkr): account for metablock space usage. Perhaps look at the fraction of
     821             : // data blocks overlapped and add that same fraction of the metadata blocks to the
     822             : // estimate.
     823           1 : func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
     824           1 :         if r.err != nil {
     825           0 :                 return 0, r.err
     826           0 :         }
     827             : 
     828           1 :         indexH, err := r.readIndex(context.Background(), nil, nil, nil)
     829           1 :         if err != nil {
     830           0 :                 return 0, err
     831           0 :         }
     832           1 :         defer indexH.Release()
     833           1 : 
     834           1 :         // Iterators over the bottom-level index blocks containing start and end.
     835           1 :         // These may be different in case of partitioned index but will both point
     836           1 :         // to the same blockIter over the single index in the unpartitioned case.
     837           1 :         var startIdxIter, endIdxIter *rowblk.Iter
     838           1 :         if r.Properties.IndexPartitions == 0 {
     839           1 :                 iter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     840           1 :                 if err != nil {
     841           0 :                         return 0, err
     842           0 :                 }
     843           1 :                 startIdxIter = iter
     844           1 :                 endIdxIter = iter
     845           1 :         } else {
     846           1 :                 topIter, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     847           1 :                 if err != nil {
     848           0 :                         return 0, err
     849           0 :                 }
     850             : 
     851           1 :                 kv := topIter.SeekGE(start, base.SeekGEFlagsNone)
     852           1 :                 if kv == nil {
     853           1 :                         // The range falls completely after this file, or an error occurred.
     854           1 :                         return 0, topIter.Error()
     855           1 :                 }
     856           1 :                 startIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     857           1 :                 if err != nil {
     858           0 :                         return 0, errCorruptIndexEntry(err)
     859           0 :                 }
     860           1 :                 startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.Handle,
     861           1 :                         nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     862           1 :                 if err != nil {
     863           0 :                         return 0, err
     864           0 :                 }
     865           1 :                 defer startIdxBlock.Release()
     866           1 :                 startIdxIter, err = rowblk.NewIter(r.Compare, r.Split, startIdxBlock.Get(), NoTransforms)
     867           1 :                 if err != nil {
     868           0 :                         return 0, err
     869           0 :                 }
     870             : 
     871           1 :                 kv = topIter.SeekGE(end, base.SeekGEFlagsNone)
     872           1 :                 if kv == nil {
     873           1 :                         if err := topIter.Error(); err != nil {
     874           0 :                                 return 0, err
     875           0 :                         }
     876           1 :                 } else {
     877           1 :                         endIdxBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     878           1 :                         if err != nil {
     879           0 :                                 return 0, errCorruptIndexEntry(err)
     880           0 :                         }
     881           1 :                         endIdxBlock, err := r.readBlock(context.Background(),
     882           1 :                                 endIdxBH.Handle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* iterStats */, nil /* buffer pool */)
     883           1 :                         if err != nil {
     884           0 :                                 return 0, err
     885           0 :                         }
     886           1 :                         defer endIdxBlock.Release()
     887           1 :                         endIdxIter, err = rowblk.NewIter(r.Compare, r.Split, endIdxBlock.Get(), NoTransforms)
     888           1 :                         if err != nil {
     889           0 :                                 return 0, err
     890           0 :                         }
     891             :                 }
     892             :         }
     893             :         // startIdxIter should not be nil at this point, while endIdxIter can be if the
     894             :         // range spans past the end of the file.
     895             : 
     896           1 :         kv := startIdxIter.SeekGE(start, base.SeekGEFlagsNone)
     897           1 :         if kv == nil {
     898           1 :                 // The range falls completely after this file, or an error occurred.
     899           1 :                 return 0, startIdxIter.Error()
     900           1 :         }
     901           1 :         startBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     902           1 :         if err != nil {
     903           0 :                 return 0, errCorruptIndexEntry(err)
     904           0 :         }
     905             : 
     906           1 :         includeInterpolatedValueBlocksSize := func(dataBlockSize uint64) uint64 {
     907           1 :                 // INVARIANT: r.Properties.DataSize > 0 since startIdxIter is not nil.
     908           1 :                 // Linearly interpolate what is stored in value blocks.
     909           1 :                 //
     910           1 :                 // TODO(sumeer): if we need more accuracy, without loading any data blocks
     911           1 :                 // (which contain the value handles, and which may also be insufficient if
     912           1 :                 // the values are in separate files), we will need to accumulate the
     913           1 :                 // logical size of the key-value pairs and store the cumulative value for
     914           1 :                 // each data block in the index block entry. This increases the size of
     915           1 :                 // the BlockHandle, so wait until this becomes necessary.
     916           1 :                 return dataBlockSize +
     917           1 :                         uint64((float64(dataBlockSize)/float64(r.Properties.DataSize))*
     918           1 :                                 float64(r.Properties.ValueBlocksSize))
     919           1 :         }
     920           1 :         if endIdxIter == nil {
     921           1 :                 // The range spans beyond this file. Include data blocks through the last.
     922           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
     923           1 :         }
     924           1 :         kv = endIdxIter.SeekGE(end, base.SeekGEFlagsNone)
     925           1 :         if kv == nil {
     926           1 :                 if err := endIdxIter.Error(); err != nil {
     927           0 :                         return 0, err
     928           0 :                 }
     929             :                 // The range spans beyond this file. Include data blocks through the last.
     930           1 :                 return includeInterpolatedValueBlocksSize(r.Properties.DataSize - startBH.Offset), nil
     931             :         }
     932           1 :         endBH, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     933           1 :         if err != nil {
     934           0 :                 return 0, errCorruptIndexEntry(err)
     935           0 :         }
     936           1 :         return includeInterpolatedValueBlocksSize(
     937           1 :                 endBH.Offset + endBH.Length + block.TrailerLen - startBH.Offset), nil
     938             : }
     939             : 
     940             : // TableFormat returns the format version for the table.
     941           1 : func (r *Reader) TableFormat() (TableFormat, error) {
     942           1 :         if r.err != nil {
     943           0 :                 return TableFormatUnspecified, r.err
     944           0 :         }
     945           1 :         return r.tableFormat, nil
     946             : }
     947             : 
     948             : // NewReader returns a new table reader for the file. Closing the reader will
     949             : // close the file.
     950             : //
     951             : // The context is used for tracing any operations performed by NewReader; it is
     952             : // NOT stored for future use.
     953           1 : func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Reader, error) {
     954           1 :         if f == nil {
     955           0 :                 return nil, errors.New("pebble/table: nil file")
     956           0 :         }
     957           1 :         o = o.ensureDefaults()
     958           1 :         r := &Reader{
     959           1 :                 readable:             f,
     960           1 :                 cacheOpts:            o.internal.CacheOpts,
     961           1 :                 loadBlockSema:        o.LoadBlockSema,
     962           1 :                 deniedUserProperties: o.DeniedUserProperties,
     963           1 :                 filterMetricsTracker: o.FilterMetricsTracker,
     964           1 :                 logger:               o.LoggerAndTracer,
     965           1 :         }
     966           1 :         if r.cacheOpts.Cache == nil {
     967           1 :                 r.cacheOpts.Cache = cache.New(0)
     968           1 :         } else {
     969           1 :                 r.cacheOpts.Cache.Ref()
     970           1 :         }
     971           1 :         if r.cacheOpts.CacheID == 0 {
     972           1 :                 r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
     973           1 :         }
     974             : 
     975           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
     976           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
     977           1 :                 r.readable, objstorage.ReadBeforeForNewReader, &preallocRH)
     978           1 :         defer rh.Close()
     979           1 : 
     980           1 :         footer, err := readFooter(ctx, f, rh, r.logger)
     981           1 :         if err != nil {
     982           0 :                 r.err = err
     983           0 :                 return nil, r.Close()
     984           0 :         }
     985           1 :         r.checksumType = footer.checksum
     986           1 :         r.tableFormat = footer.format
     987           1 :         // Read the metaindex and properties blocks.
     988           1 :         if err := r.readMetaindex(ctx, footer.metaindexBH, rh, o.Filters); err != nil {
     989           0 :                 r.err = err
     990           0 :                 return nil, r.Close()
     991           0 :         }
     992           1 :         r.indexBH = footer.indexBH
     993           1 :         r.metaIndexBH = footer.metaindexBH
     994           1 :         r.footerBH = footer.footerBH
     995           1 : 
     996           1 :         if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
     997           1 :                 r.Compare = o.Comparer.Compare
     998           1 :                 r.Equal = o.Comparer.Equal
     999           1 :                 r.FormatKey = o.Comparer.FormatKey
    1000           1 :                 r.Split = o.Comparer.Split
    1001           1 :         } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
    1002           0 :                 r.Compare = comparer.Compare
    1003           0 :                 r.Equal = comparer.Equal
    1004           0 :                 r.FormatKey = comparer.FormatKey
    1005           0 :                 r.Split = comparer.Split
    1006           0 :         } else {
    1007           0 :                 r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
    1008           0 :                         errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
    1009           0 :         }
    1010             : 
    1011           1 :         if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
    1012           1 :                 if o.Merger != nil && o.Merger.Name == mergerName {
    1013           1 :                         // opts.Merger matches.
    1014           1 :                 } else if _, ok := o.Mergers[mergerName]; ok {
    1015           0 :                         // Known merger.
    1016           0 :                 } else {
    1017           0 :                         r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
    1018           0 :                                 errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
    1019           0 :                 }
    1020             :         }
    1021             : 
    1022           1 :         if r.err != nil {
    1023           0 :                 return nil, r.Close()
    1024           0 :         }
    1025             : 
    1026           1 :         return r, nil
    1027             : }
    1028             : 
    1029             : // ReadableFile describes the smallest subset of vfs.File that is required for
    1030             : // reading SSTs.
    1031             : type ReadableFile interface {
    1032             :         io.ReaderAt
    1033             :         io.Closer
    1034             :         Stat() (os.FileInfo, error)
    1035             : }
    1036             : 
    1037             : // NewSimpleReadable wraps a ReadableFile in a objstorage.Readable
    1038             : // implementation (which does not support read-ahead)
    1039           1 : func NewSimpleReadable(r ReadableFile) (objstorage.Readable, error) {
    1040           1 :         info, err := r.Stat()
    1041           1 :         if err != nil {
    1042           0 :                 return nil, err
    1043           0 :         }
    1044           1 :         res := &simpleReadable{
    1045           1 :                 f:    r,
    1046           1 :                 size: info.Size(),
    1047           1 :         }
    1048           1 :         res.rh = objstorage.MakeNoopReadHandle(res)
    1049           1 :         return res, nil
    1050             : }
    1051             : 
    1052             : // simpleReadable wraps a ReadableFile to implement objstorage.Readable.
    1053             : type simpleReadable struct {
    1054             :         f    ReadableFile
    1055             :         size int64
    1056             :         rh   objstorage.NoopReadHandle
    1057             : }
    1058             : 
    1059             : var _ objstorage.Readable = (*simpleReadable)(nil)
    1060             : 
    1061             : // ReadAt is part of the objstorage.Readable interface.
    1062           1 : func (s *simpleReadable) ReadAt(_ context.Context, p []byte, off int64) error {
    1063           1 :         n, err := s.f.ReadAt(p, off)
    1064           1 :         if invariants.Enabled && err == nil && n != len(p) {
    1065           0 :                 panic("short read")
    1066             :         }
    1067           1 :         return err
    1068             : }
    1069             : 
    1070             : // Close is part of the objstorage.Readable interface.
    1071           1 : func (s *simpleReadable) Close() error {
    1072           1 :         return s.f.Close()
    1073           1 : }
    1074             : 
    1075             : // Size is part of the objstorage.Readable interface.
    1076           1 : func (s *simpleReadable) Size() int64 {
    1077           1 :         return s.size
    1078           1 : }
    1079             : 
    1080             : // NewReaddHandle is part of the objstorage.Readable interface.
    1081             : func (s *simpleReadable) NewReadHandle(
    1082             :         readBeforeSize objstorage.ReadBeforeSize,
    1083           1 : ) objstorage.ReadHandle {
    1084           1 :         return &s.rh
    1085           1 : }
    1086             : 
    1087           0 : func errCorruptIndexEntry(err error) error {
    1088           0 :         err = base.CorruptionErrorf("pebble/table: corrupt index entry: %v", err)
    1089           0 :         if invariants.Enabled {
    1090           0 :                 panic(err)
    1091             :         }
    1092           0 :         return err
    1093             : }
    1094             : 
    1095             : type deterministicStopwatchForTesting struct {
    1096             :         startTime time.Time
    1097             : }
    1098             : 
    1099           1 : func makeStopwatch() deterministicStopwatchForTesting {
    1100           1 :         return deterministicStopwatchForTesting{startTime: time.Now()}
    1101           1 : }
    1102             : 
    1103           1 : func (w deterministicStopwatchForTesting) stop() time.Duration {
    1104           1 :         dur := time.Since(w.startTime)
    1105           1 :         if deterministicReadBlockDurationForTesting {
    1106           0 :                 dur = slowReadTracingThreshold
    1107           0 :         }
    1108           1 :         return dur
    1109             : }

Generated by: LCOV version 1.14