LCOV - code coverage report
Current view: top level - pebble - file_cache.go (source / functions) Coverage Total Hit
Test: 2025-10-02 08:18Z d1cc1fa5 - meta test only.lcov Lines: 70.0 % 626 438
Test Date: 2025-10-02 08:20:34 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "fmt"
      11              :         "io"
      12              :         "runtime/debug"
      13              :         "sync"
      14              :         "sync/atomic"
      15              :         "unsafe"
      16              : 
      17              :         "github.com/cockroachdb/errors"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/cache"
      20              :         "github.com/cockroachdb/pebble/internal/genericcache"
      21              :         "github.com/cockroachdb/pebble/internal/invariants"
      22              :         "github.com/cockroachdb/pebble/internal/keyspan"
      23              :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      24              :         "github.com/cockroachdb/pebble/internal/manifest"
      25              :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      26              :         "github.com/cockroachdb/pebble/objstorage"
      27              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      28              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      29              :         "github.com/cockroachdb/pebble/sstable"
      30              :         "github.com/cockroachdb/pebble/sstable/blob"
      31              :         "github.com/cockroachdb/pebble/sstable/block"
      32              :         "github.com/cockroachdb/pebble/sstable/valblk"
      33              :         "github.com/cockroachdb/pebble/vfs"
      34              :         "github.com/cockroachdb/redact"
      35              : )
      36              : 
      37              : // FileCacheMetrics contains metrics for the file cache. Note that the file
      38              : // cache is normally shared between all the stores on a node.
      39              : type FileCacheMetrics struct {
      40              :         // The number of bytes inuse by the cache.
      41              :         Size          int64
      42              :         TableCount    int64
      43              :         BlobFileCount int64
      44              :         Hits          int64
      45              :         Misses        int64
      46              : }
      47              : 
      48              : var emptyIter = &errorIter{err: nil}
      49              : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
      50              : 
      51              : // tableNewIters creates new iterators (point, range deletion and/or range key)
      52              : // for the given table metadata. Which of the various iterator kinds the user is
      53              : // requesting is specified with the iterKinds bitmap.
      54              : //
      55              : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
      56              : // populated with iterators.
      57              : //
      58              : // If a point iterator is requested and the operation was successful,
      59              : // iters.point is guaranteed to be non-nil and must be closed when the caller is
      60              : // finished.
      61              : //
      62              : // If a range deletion or range key iterator is requested, the corresponding
      63              : // iterator may be nil if the table does not contain any keys of the
      64              : // corresponding kind. The returned iterSet type provides RangeDeletion() and
      65              : // RangeKey() convenience methods that return non-nil empty iterators that may
      66              : // be used if the caller requires a non-nil iterator.
      67              : //
      68              : // On error, all iterators are nil.
      69              : //
      70              : // The only (non-test) implementation of tableNewIters is
      71              : // fileCacheHandle.newIters().
      72              : type tableNewIters func(
      73              :         ctx context.Context,
      74              :         file *manifest.TableMetadata,
      75              :         opts *IterOptions,
      76              :         internalOpts internalIterOpts,
      77              :         kinds iterKinds,
      78              : ) (iterSet, error)
      79              : 
      80              : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
      81              : // for the rangedel iterator returned by tableNewIters.
      82            1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
      83            1 :         return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      84            1 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
      85            1 :                 if err != nil {
      86            0 :                         return nil, err
      87            0 :                 }
      88            1 :                 return iters.RangeDeletion(), nil
      89              :         }
      90              : }
      91              : 
      92              : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
      93              : // for the range key iterator returned by tableNewIters.
      94            1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
      95            1 :         return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      96            1 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
      97            1 :                 if err != nil {
      98            0 :                         return nil, err
      99            0 :                 }
     100            1 :                 return iters.RangeKey(), nil
     101              :         }
     102              : }
     103              : 
     104              : // fileCacheHandle is used to access the file cache. Each DB has its own handle.
     105              : type fileCacheHandle struct {
     106              :         fileCache *FileCache
     107              : 
     108              :         // The handle contains fields which are unique to each DB. Note that these get
     109              :         // accessed from all shards, so keep read-only fields separate for read-write
     110              :         // fields.
     111              :         loggerAndTracer  LoggerAndTracer
     112              :         blockCacheHandle *cache.Handle
     113              :         objProvider      objstorage.Provider
     114              :         readerOpts       sstable.ReaderOptions
     115              : 
     116              :         // iterCount keeps track of how many iterators are open. It is used to keep
     117              :         // track of leaked iterators on a per-db level.
     118              :         iterCount         atomic.Int32
     119              :         sstStatsCollector block.CategoryStatsCollector
     120              : 
     121              :         // reportCorruptionFn is used for block.ReadEnv.ReportCorruptionFn. The
     122              :         // first argument must implement the ObjectInfo interface. Typically callers
     123              :         // use *TableMetadata or *PhysicalBlobFile to satisfy this interface.
     124              :         // reportCorruptionFn returns an error that contains more details.
     125              :         reportCorruptionFn func(base.ObjectInfo, error) error
     126              : 
     127              :         // This struct is only populated in race builds.
     128              :         raceMu struct {
     129              :                 sync.Mutex
     130              :                 // nextRefID is the next ID to allocate for a new reference.
     131              :                 nextRefID uint64
     132              :                 // openRefs maps reference IDs to the stack trace recorded at creation
     133              :                 // time. It's used to track which call paths leaked open references to
     134              :                 // files.
     135              :                 openRefs map[uint64][]byte
     136              :         }
     137              : }
     138              : 
     139              : // Assert that *fileCacheHandle implements blob.ReaderProvider.
     140              : var _ blob.ReaderProvider = (*fileCacheHandle)(nil)
     141              : 
     142              : // newHandle creates a handle for the FileCache which has its own options. Each
     143              : // handle has its own set of files in the cache, separate from those of other
     144              : // handles.
     145              : func (c *FileCache) newHandle(
     146              :         cacheHandle *cache.Handle,
     147              :         objProvider objstorage.Provider,
     148              :         loggerAndTracer LoggerAndTracer,
     149              :         readerOpts sstable.ReaderOptions,
     150              :         reportCorruptionFn func(base.ObjectInfo, error) error,
     151            1 : ) *fileCacheHandle {
     152            1 :         c.Ref()
     153            1 : 
     154            1 :         t := &fileCacheHandle{
     155            1 :                 fileCache:        c,
     156            1 :                 loggerAndTracer:  loggerAndTracer,
     157            1 :                 blockCacheHandle: cacheHandle,
     158            1 :                 objProvider:      objProvider,
     159            1 :         }
     160            1 :         t.readerOpts = readerOpts
     161            1 :         t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
     162            1 :         t.reportCorruptionFn = reportCorruptionFn
     163            1 :         if invariants.RaceEnabled {
     164            0 :                 t.raceMu.openRefs = make(map[uint64][]byte)
     165            0 :         }
     166            1 :         return t
     167              : }
     168              : 
     169              : // Close the handle, make sure that there will be no further need
     170              : // to access any of the files associated with the store.
     171            1 : func (h *fileCacheHandle) Close() error {
     172            1 :         // We want to do some cleanup work here. Check for leaked iterators
     173            1 :         // by the DB using this container. Note that we'll still perform cleanup
     174            1 :         // below in the case that there are leaked iterators.
     175            1 :         var err error
     176            1 :         if v := h.iterCount.Load(); v > 0 {
     177            0 :                 if !invariants.RaceEnabled {
     178            0 :                         err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
     179            0 :                 } else {
     180            0 :                         var buf bytes.Buffer
     181            0 :                         for _, stack := range h.raceMu.openRefs {
     182            0 :                                 fmt.Fprintf(&buf, "%s\n", stack)
     183            0 :                         }
     184            0 :                         err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
     185              :                 }
     186              :         }
     187              : 
     188              :         // EvictAll would panic if there are still outstanding references.
     189            1 :         if err == nil {
     190            1 :                 keys := h.fileCache.c.EvictAll(func(key fileCacheKey) bool {
     191            1 :                         return key.handle == h
     192            1 :                 })
     193              :                 // Evict any associated blocks in the cache.
     194            1 :                 for i := range keys {
     195            1 :                         h.blockCacheHandle.EvictFile(keys[i].fileNum)
     196            1 :                 }
     197              :         }
     198              : 
     199            1 :         h.fileCache.Unref()
     200            1 :         // TODO(radu): we have to tolerate metrics() calls after close (see
     201            1 :         // https://github.com/cockroachdb/cockroach/issues/140454).
     202            1 :         // *h = fileCacheHandle{}
     203            1 :         return err
     204              : }
     205              : 
     206              : // openFile is called when we insert a new entry in the file cache.
     207              : func (h *fileCacheHandle) openFile(
     208              :         ctx context.Context, fileNum base.DiskFileNum, fileType base.FileType, opts initFileOpts,
     209            1 : ) (io.Closer, objstorage.ObjectMetadata, error) {
     210            1 :         f, err := h.objProvider.OpenForReading(
     211            1 :                 ctx, fileType, fileNum, objstorage.OpenOptions{MustExist: true},
     212            1 :         )
     213            1 :         if err != nil {
     214            0 :                 return nil, objstorage.ObjectMetadata{}, err
     215            0 :         }
     216            1 :         objMeta, err := h.objProvider.Lookup(fileType, fileNum)
     217            1 :         if err != nil {
     218            0 :                 return nil, objstorage.ObjectMetadata{}, err
     219            0 :         }
     220              : 
     221            1 :         o := h.readerOpts
     222            1 :         o.CacheOpts = sstableinternal.CacheOptions{
     223            1 :                 CacheHandle: h.blockCacheHandle,
     224            1 :                 FileNum:     fileNum,
     225            1 :         }
     226            1 :         o.InitFileReadStats = opts.stats
     227            1 :         switch fileType {
     228            1 :         case base.FileTypeTable:
     229            1 :                 r, err := sstable.NewReader(ctx, f, o)
     230            1 :                 if err != nil {
     231            0 :                         // If opening the sstable reader fails, we're responsible for
     232            0 :                         // closing the objstorage.Readable.
     233            0 :                         return nil, objMeta, errors.CombineErrors(err, f.Close())
     234            0 :                 }
     235            1 :                 return r, objMeta, nil
     236            1 :         case base.FileTypeBlob:
     237            1 :                 // TODO(sumeer): we should pass o.InitFileReadStats, so that the latency
     238            1 :                 // of the footer read is accounted for.
     239            1 :                 r, err := blob.NewFileReader(ctx, f, blob.FileReaderOptions{
     240            1 :                         ReaderOptions: o.ReaderOptions,
     241            1 :                 })
     242            1 :                 if err != nil {
     243            0 :                         // If opening the blob file reader fails, we're responsible for
     244            0 :                         // closing the objstorage.Readable.
     245            0 :                         return nil, objMeta, errors.CombineErrors(err, f.Close())
     246            0 :                 }
     247            1 :                 return r, objMeta, nil
     248            0 :         default:
     249            0 :                 panic(errors.AssertionFailedf("pebble: unexpected file cache file type: %s", fileType))
     250              :         }
     251              : }
     252              : 
     253              : // findOrCreateTable retrieves an existing sstable reader or creates a new one
     254              : // for the backing file of the given table. If a corruption error is
     255              : // encountered, reportCorruptionFn() is called.
     256              : func (h *fileCacheHandle) findOrCreateTable(
     257              :         ctx context.Context, meta *manifest.TableMetadata, opts initFileOpts,
     258            1 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue, initFileOpts], error) {
     259            1 :         key := fileCacheKey{
     260            1 :                 handle:   h,
     261            1 :                 fileNum:  meta.TableBacking.DiskFileNum,
     262            1 :                 fileType: base.FileTypeTable,
     263            1 :         }
     264            1 :         valRef, err := h.fileCache.c.FindOrCreate(ctx, key, opts)
     265            1 :         if err != nil && IsCorruptionError(err) {
     266            0 :                 err = h.reportCorruptionFn(meta, err)
     267            0 :         }
     268            1 :         return valRef, err
     269              : }
     270              : 
     271              : // findOrCreateBlob retrieves an existing blob reader or creates a new one for
     272              : // the given blob file. If a corruption error is encountered,
     273              : // reportCorruptionFn() is called.
     274              : func (h *fileCacheHandle) findOrCreateBlob(
     275              :         ctx context.Context, info base.ObjectInfo, stats block.InitFileReadStats,
     276            1 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue, initFileOpts], error) {
     277            1 :         ftyp, fileNum := info.FileInfo()
     278            1 :         key := fileCacheKey{
     279            1 :                 handle:   h,
     280            1 :                 fileNum:  fileNum,
     281            1 :                 fileType: ftyp,
     282            1 :         }
     283            1 :         valRef, err := h.fileCache.c.FindOrCreate(ctx, key, initFileOpts{stats: stats})
     284            1 :         if err != nil && IsCorruptionError(err) {
     285            0 :                 err = h.reportCorruptionFn(info, err)
     286            0 :         }
     287            1 :         return valRef, err
     288              : }
     289              : 
     290              : // Evict the given file from the file cache and the block cache.
     291            1 : func (h *fileCacheHandle) Evict(fileNum base.DiskFileNum, fileType base.FileType) {
     292            1 :         defer func() {
     293            1 :                 if p := recover(); p != nil {
     294            0 :                         panic(fmt.Sprintf("pebble: evicting in-use file %s(%s): %v", fileNum, fileType, p))
     295              :                 }
     296              :         }()
     297            1 :         h.fileCache.c.Evict(fileCacheKey{handle: h, fileNum: fileNum, fileType: fileType})
     298            1 :         h.blockCacheHandle.EvictFile(fileNum)
     299              : }
     300              : 
     301            1 : func (h *fileCacheHandle) SSTStatsCollector() *block.CategoryStatsCollector {
     302            1 :         return &h.sstStatsCollector
     303            1 : }
     304              : 
     305              : // Metrics returns metrics for the file cache. Note that the CacheMetrics track
     306              : // the global cache which is shared between multiple handles (stores). The
     307              : // FilterMetrics are per-handle.
     308            0 : func (h *fileCacheHandle) Metrics() (FileCacheMetrics, FilterMetrics) {
     309            0 :         m := h.fileCache.c.Metrics()
     310            0 : 
     311            0 :         // The generic cache maintains a count of entries, but it doesn't know which
     312            0 :         // entries are sstables and which are blob files, which affects the memory
     313            0 :         // footprint of the table cache. So the FileCache maintains its own counts,
     314            0 :         // incremented when initializing a new value and decremented by the
     315            0 :         // releasing func.
     316            0 :         countSSTables := h.fileCache.counts.sstables.Load()
     317            0 :         countBlobFiles := h.fileCache.counts.blobFiles.Load()
     318            0 : 
     319            0 :         cm := FileCacheMetrics{
     320            0 :                 TableCount:    countSSTables,
     321            0 :                 BlobFileCount: countBlobFiles,
     322            0 :                 Hits:          m.Hits,
     323            0 :                 Misses:        m.Misses,
     324            0 :                 Size: m.Size + countSSTables*int64(unsafe.Sizeof(sstable.Reader{})) +
     325            0 :                         countBlobFiles*int64(unsafe.Sizeof(blob.FileReader{})),
     326            0 :         }
     327            0 :         fm := h.readerOpts.FilterMetricsTracker.Load()
     328            0 :         return cm, fm
     329            0 : }
     330              : 
     331              : func (h *fileCacheHandle) estimateSize(
     332              :         meta *manifest.TableMetadata, lower, upper []byte,
     333            1 : ) (size uint64, err error) {
     334            1 :         err = h.withReader(context.TODO(), block.NoReadEnv, meta, func(r *sstable.Reader, env sstable.ReadEnv) error {
     335            1 :                 size, err = r.EstimateDiskUsage(lower, upper, env, meta.IterTransforms())
     336            1 :                 return err
     337            1 :         })
     338            1 :         return size, err
     339              : }
     340              : 
     341              : func createReader(
     342              :         v *fileCacheValue, meta *manifest.TableMetadata,
     343            1 : ) (*sstable.Reader, sstable.ReadEnv) {
     344            1 :         r := v.mustSSTableReader()
     345            1 :         env := sstable.ReadEnv{}
     346            1 :         if meta.Virtual {
     347            1 :                 if invariants.Enabled {
     348            1 :                         if meta.VirtualParams.FileNum == 0 || meta.VirtualParams.Lower.UserKey == nil || meta.VirtualParams.Upper.UserKey == nil {
     349            0 :                                 panic("virtual params not initialized")
     350              :                         }
     351              :                 }
     352            1 :                 env.Virtual = meta.VirtualParams
     353            1 :                 env.IsSharedIngested = v.isShared && meta.SyntheticSeqNum() != 0
     354              :         }
     355            1 :         env.InternalBounds = &meta.PointKeyBounds
     356            1 :         return r, env
     357              : }
     358              : 
     359              : func (h *fileCacheHandle) withReader(
     360              :         ctx context.Context,
     361              :         blockEnv block.ReadEnv,
     362              :         meta *manifest.TableMetadata,
     363              :         fn func(*sstable.Reader, sstable.ReadEnv) error,
     364            1 : ) error {
     365            1 :         ref, err := h.findOrCreateTable(ctx, meta, optsFromBlockReadEnv(blockEnv))
     366            1 :         if err != nil {
     367            0 :                 return err
     368            0 :         }
     369            1 :         defer ref.Unref()
     370            1 :         v := ref.Value()
     371            1 :         blockEnv.ReportCorruptionFn = h.reportCorruptionFn
     372            1 :         blockEnv.ReportCorruptionArg = meta
     373            1 :         env := sstable.ReadEnv{Block: blockEnv}
     374            1 : 
     375            1 :         r := v.mustSSTableReader()
     376            1 :         if meta.Virtual {
     377            1 :                 if invariants.Enabled {
     378            1 :                         if meta.VirtualParams.FileNum == 0 || meta.VirtualParams.Lower.UserKey == nil || meta.VirtualParams.Upper.UserKey == nil {
     379            0 :                                 panic("virtual params not initialized")
     380              :                         }
     381              :                 }
     382            1 :                 env.Virtual = meta.VirtualParams
     383            1 :                 env.IsSharedIngested = v.isShared && meta.SyntheticSeqNum() != 0
     384              :         }
     385              : 
     386            1 :         return fn(r, env)
     387              : 
     388              : }
     389              : 
     390            0 : func (h *fileCacheHandle) IterCount() int64 {
     391            0 :         return int64(h.iterCount.Load())
     392            0 : }
     393              : 
     394              : // GetValueReader returns a blob.ValueReader for blob file identified by fileNum.
     395              : // Implements blob.ReaderProvider.
     396              : func (h *fileCacheHandle) GetValueReader(
     397              :         ctx context.Context, diskFile base.ObjectInfo, stats block.InitFileReadStats,
     398            1 : ) (r blob.ValueReader, closeFunc func(), err error) {
     399            1 :         ref, err := h.findOrCreateBlob(ctx, diskFile, stats)
     400            1 :         if err != nil {
     401            0 :                 return nil, nil, err
     402            0 :         }
     403            1 :         v := ref.Value()
     404            1 :         r = v.mustBlob()
     405            1 :         // NB: The call to findOrCreateBlob incremented the value's reference count.
     406            1 :         // The closeHook (v.closeHook) takes responsibility for unreferencing the
     407            1 :         // value. Take care to avoid introducing an allocation here by adding a
     408            1 :         // closure.
     409            1 :         closeHook := h.addReference(v)
     410            1 :         return r, closeHook, nil
     411              : }
     412              : 
     413              : // FileCache is a shareable cache for open files. Open files are exclusively
     414              : // sstable files today.
     415              : type FileCache struct {
     416              :         refs   atomic.Int64
     417              :         counts struct {
     418              :                 sstables  atomic.Int64
     419              :                 blobFiles atomic.Int64
     420              :         }
     421              : 
     422              :         c genericcache.Cache[fileCacheKey, fileCacheValue, initFileOpts]
     423              : }
     424              : 
     425              : // Ref adds a reference to the file cache. Once a file cache is constructed, the
     426              : // cache only remains valid if there is at least one reference to it.
     427            1 : func (c *FileCache) Ref() {
     428            1 :         v := c.refs.Add(1)
     429            1 :         // We don't want the reference count to ever go from 0 -> 1,
     430            1 :         // cause a reference count of 0 implies that we've closed the cache.
     431            1 :         if v <= 1 {
     432            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     433              :         }
     434              : }
     435              : 
     436              : // Unref removes a reference to the file cache.
     437            1 : func (c *FileCache) Unref() {
     438            1 :         v := c.refs.Add(-1)
     439            1 :         switch {
     440            0 :         case v < 0:
     441            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     442            1 :         case v == 0:
     443            1 :                 c.c.Close()
     444            1 :                 c.c = genericcache.Cache[fileCacheKey, fileCacheValue, initFileOpts]{}
     445              :         }
     446              : }
     447              : 
     448              : type initFileOpts struct {
     449              :         stats block.InitFileReadStats
     450              : }
     451              : 
     452            1 : func optsFromBlockReadEnv(blockEnv block.ReadEnv) initFileOpts {
     453            1 :         return initFileOpts{
     454            1 :                 stats: block.InitFileReadStats{
     455            1 :                         Stats:     blockEnv.Stats,
     456            1 :                         IterStats: blockEnv.IterStats,
     457            1 :                 },
     458            1 :         }
     459            1 : }
     460              : 
     461              : // NewFileCache will create a new file cache with one outstanding reference. It
     462              : // is the callers responsibility to call Unref if they will no longer hold a
     463              : // reference to the file cache.
     464            1 : func NewFileCache(numShards int, size int) *FileCache {
     465            1 :         if size == 0 {
     466            0 :                 panic("pebble: cannot create a file cache of size 0")
     467            1 :         } else if numShards == 0 {
     468            0 :                 panic("pebble: cannot create a file cache with 0 shards")
     469              :         }
     470              : 
     471            1 :         c := &FileCache{}
     472            1 : 
     473            1 :         // initFn is used whenever a new entry is added to the file cache.
     474            1 :         initFn := func(
     475            1 :                 ctx context.Context, key fileCacheKey, opts initFileOpts,
     476            1 :                 vRef genericcache.ValueRef[fileCacheKey, fileCacheValue, initFileOpts]) error {
     477            1 :                 v := vRef.Value()
     478            1 :                 handle := key.handle
     479            1 :                 v.readerProvider.init(c, key)
     480            1 :                 v.closeHook = func() {
     481            1 :                         // closeHook is called when an iterator is closed; the initialization of
     482            1 :                         // an iterator with this value will happen after a FindOrCreate() call
     483            1 :                         // with returns the same vRef.
     484            1 :                         vRef.Unref()
     485            1 :                         handle.iterCount.Add(-1)
     486            1 :                 }
     487            1 :                 reader, objMeta, err := handle.openFile(ctx, key.fileNum, key.fileType, opts)
     488            1 :                 if err != nil {
     489            0 :                         return errors.Wrapf(err, "pebble: backing file %s error", redact.Safe(key.fileNum))
     490            0 :                 }
     491            1 :                 v.reader = reader
     492            1 :                 v.isShared = objMeta.IsShared()
     493            1 :                 switch key.fileType {
     494            1 :                 case base.FileTypeTable:
     495            1 :                         c.counts.sstables.Add(1)
     496            1 :                 case base.FileTypeBlob:
     497            1 :                         c.counts.blobFiles.Add(1)
     498            0 :                 default:
     499            0 :                         panic("unexpected file type")
     500              :                 }
     501            1 :                 return nil
     502              :         }
     503              : 
     504            1 :         releaseFn := func(v *fileCacheValue) {
     505            1 :                 if v.reader != nil {
     506            1 :                         switch v.reader.(type) {
     507            1 :                         case *sstable.Reader:
     508            1 :                                 c.counts.sstables.Add(-1)
     509            1 :                         case *blob.FileReader:
     510            1 :                                 c.counts.blobFiles.Add(-1)
     511              :                         }
     512            1 :                         _ = v.reader.Close()
     513            1 :                         v.reader = nil
     514              :                 }
     515              :         }
     516              : 
     517            1 :         c.c.Init(size, numShards, initFn, releaseFn)
     518            1 : 
     519            1 :         // Hold a ref to the cache here.
     520            1 :         c.refs.Store(1)
     521            1 : 
     522            1 :         return c
     523              : }
     524              : 
     525              : type fileCacheKey struct {
     526              :         handle  *fileCacheHandle
     527              :         fileNum base.DiskFileNum
     528              :         // fileType describes the type of file being cached (blob or sstable). A
     529              :         // file number alone uniquely identifies every file within a DB, but we need
     530              :         // to propagate the type so the file cache looks for the correct file in
     531              :         // object storage / the filesystem.
     532              :         fileType base.FileType
     533              : }
     534              : 
     535              : // Shard implements the genericcache.Key interface.
     536            1 : func (k fileCacheKey) Shard(numShards int) int {
     537            1 :         // TODO(radu): maybe incorporate a handle ID.
     538            1 :         return int(uint64(k.fileNum) % uint64(numShards))
     539            1 : }
     540              : 
     541              : // checkAndIntersectFilters checks the specific table and block property filters
     542              : // for intersection with any available table and block-level properties. Returns
     543              : // true for ok if this table should be read by this iterator.
     544              : func checkAndIntersectFilters(
     545              :         r *sstable.Reader,
     546              :         blockPropertyFilters []BlockPropertyFilter,
     547              :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
     548              :         syntheticSuffix sstable.SyntheticSuffix,
     549            1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
     550            1 :         if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
     551            1 :                 filterer, err = sstable.IntersectsTable(
     552            1 :                         blockPropertyFilters,
     553            1 :                         boundLimitedFilter,
     554            1 :                         r.UserProperties,
     555            1 :                         syntheticSuffix,
     556            1 :                 )
     557            1 :                 // NB: IntersectsTable will return a nil filterer if the table-level
     558            1 :                 // properties indicate there's no intersection with the provided filters.
     559            1 :                 if filterer == nil || err != nil {
     560            1 :                         return false, nil, err
     561            1 :                 }
     562              :         }
     563            1 :         return true, filterer, nil
     564              : }
     565              : 
     566              : func (h *fileCacheHandle) newIters(
     567              :         ctx context.Context,
     568              :         file *manifest.TableMetadata,
     569              :         opts *IterOptions,
     570              :         internalOpts internalIterOpts,
     571              :         kinds iterKinds,
     572            1 : ) (iterSet, error) {
     573            1 :         // Calling findOrCreate gives us the responsibility of Unref()ing vRef.
     574            1 :         //
     575            1 :         // TODO(sumeer): it is possible that many goroutines concurrently miss on
     576            1 :         // the same file. Only one of them will create the sstable.Reader, and the
     577            1 :         // read latency incurred in that creation will only be accounted in that
     578            1 :         // iterator's stats. We should explicitly account for the latency of this in
     579            1 :         // the case of a miss, and include it somewhere in the iterator stats.
     580            1 :         vRef, err := h.findOrCreateTable(ctx, file, optsFromBlockReadEnv(internalOpts.readEnv.Block))
     581            1 :         if err != nil {
     582            0 :                 return iterSet{}, err
     583            0 :         }
     584              : 
     585            1 :         internalOpts.readEnv.Block.ReportCorruptionFn = h.reportCorruptionFn
     586            1 :         internalOpts.readEnv.Block.ReportCorruptionArg = file
     587            1 : 
     588            1 :         v := vRef.Value()
     589            1 :         r, env := createReader(v, file)
     590            1 :         internalOpts.readEnv.Virtual = env.Virtual
     591            1 :         internalOpts.readEnv.IsSharedIngested = env.IsSharedIngested
     592            1 :         internalOpts.readEnv.InternalBounds = env.InternalBounds
     593            1 :         if opts != nil && opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
     594            1 :                 internalOpts.readEnv.Block.Level = cache.MakeLevel(opts.layer.Level())
     595            1 :         }
     596              : 
     597            1 :         var iters iterSet
     598            1 :         if kinds.RangeKey() && file.HasRangeKeys {
     599            1 :                 iters.rangeKey, err = newRangeKeyIter(ctx, file, r, opts.SpanIterOptions(), internalOpts)
     600            1 :         }
     601            1 :         if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
     602            1 :                 iters.rangeDeletion, err = newRangeDelIter(ctx, file, r, h, internalOpts)
     603            1 :         }
     604            1 :         if kinds.Point() && err == nil {
     605            1 :                 iters.point, err = h.newPointIter(ctx, v, file, r, opts, internalOpts, h)
     606            1 :         }
     607            1 :         if err != nil {
     608            0 :                 // NB: There's a subtlety here: Because the point iterator is the last
     609            0 :                 // iterator we attempt to create, it's not possible for:
     610            0 :                 //   err != nil && iters.point != nil
     611            0 :                 // If it were possible, we'd need to account for it to avoid double
     612            0 :                 // unref-ing here, once during CloseAll and once during `unrefValue`.
     613            0 :                 _ = iters.CloseAll()
     614            0 :                 vRef.Unref()
     615            0 :                 return iterSet{}, err
     616            0 :         }
     617              :         // Only point iterators ever require the reader stay pinned in the cache. If
     618              :         // we're not returning a point iterator to the caller, we need to unref v.
     619              :         //
     620              :         // For point iterators, v.closeHook will be called which will release the ref.
     621            1 :         if iters.point == nil {
     622            1 :                 vRef.Unref()
     623            1 :         }
     624            1 :         return iters, nil
     625              : }
     626              : 
     627              : // For flushable ingests, we decide whether to use the bloom filter base on
     628              : // size.
     629              : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
     630              : 
     631              : // newPointIter is an internal helper that constructs a point iterator over a
     632              : // sstable. This function is for internal use only, and callers should use
     633              : // newIters instead.
     634              : func (h *fileCacheHandle) newPointIter(
     635              :         ctx context.Context,
     636              :         v *fileCacheValue,
     637              :         file *manifest.TableMetadata,
     638              :         reader *sstable.Reader,
     639              :         opts *IterOptions,
     640              :         internalOpts internalIterOpts,
     641              :         handle *fileCacheHandle,
     642            1 : ) (internalIterator, error) {
     643            1 :         var (
     644            1 :                 hideObsoletePoints bool = false
     645            1 :                 pointKeyFilters    []BlockPropertyFilter
     646            1 :                 filterer           *sstable.BlockPropertiesFilterer
     647            1 :         )
     648            1 :         r := v.mustSSTableReader()
     649            1 :         if opts != nil {
     650            1 :                 // This code is appending (at most one filter) in-place to
     651            1 :                 // opts.PointKeyFilters even though the slice is shared for iterators in
     652            1 :                 // the same iterator tree. This is acceptable since all the following
     653            1 :                 // properties are true:
     654            1 :                 // - The iterator tree is single threaded, so the shared backing for the
     655            1 :                 //   slice is being mutated in a single threaded manner.
     656            1 :                 // - Each shallow copy of the slice has its own notion of length.
     657            1 :                 // - The appended element is always the obsoleteKeyBlockPropertyFilter
     658            1 :                 //   struct, which is stateless, so overwriting that struct when creating
     659            1 :                 //   one sstable iterator is harmless to other sstable iterators that are
     660            1 :                 //   relying on that struct.
     661            1 :                 //
     662            1 :                 // An alternative would be to have different slices for different sstable
     663            1 :                 // iterators, but that requires more work to avoid allocations.
     664            1 :                 //
     665            1 :                 // TODO(bilal): for compaction reads of foreign sstables, we do hide
     666            1 :                 // obsolete points (see sstable.Reader.newCompactionIter) but we don't
     667            1 :                 // apply the obsolete block property filter. We could optimize this by
     668            1 :                 // applying the filter.
     669            1 :                 hideObsoletePoints, pointKeyFilters =
     670            1 :                         r.TryAddBlockPropertyFilterForHideObsoletePoints(
     671            1 :                                 opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
     672            1 : 
     673            1 :                 var ok bool
     674            1 :                 var err error
     675            1 :                 ok, filterer, err = checkAndIntersectFilters(r, pointKeyFilters,
     676            1 :                         internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
     677            1 :                 if err != nil {
     678            0 :                         return nil, err
     679            1 :                 } else if !ok {
     680            1 :                         // No point keys within the table match the filters.
     681            1 :                         return nil, nil
     682            1 :                 }
     683              :         }
     684              : 
     685            1 :         var iter sstable.Iterator
     686            1 :         filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
     687            1 :         if opts != nil {
     688            1 :                 // By default, we don't use block filters for L6 and restrict the size for
     689            1 :                 // flushable ingests, as these blocks can be very big.
     690            1 :                 if !opts.UseL6Filters {
     691            1 :                         if opts.layer == manifest.Level(6) {
     692            1 :                                 filterBlockSizeLimit = sstable.NeverUseFilterBlock
     693            1 :                         } else if opts.layer.IsFlushableIngests() {
     694            1 :                                 filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
     695            1 :                         }
     696              :                 }
     697            1 :                 if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
     698            1 :                         ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
     699            1 :                 }
     700              :         }
     701              : 
     702            1 :         if v.isShared && file.SyntheticSeqNum() != 0 {
     703            1 :                 // The table is shared and ingested.
     704            1 :                 hideObsoletePoints = true
     705            1 :         }
     706            1 :         transforms := file.IterTransforms()
     707            1 :         transforms.HideObsoletePoints = hideObsoletePoints
     708            1 :         if internalOpts.readEnv.Block.IterStats == nil && opts != nil {
     709            1 :                 internalOpts.readEnv.Block.IterStats = handle.SSTStatsCollector().Accumulator(uint64(uintptr(unsafe.Pointer(r))), opts.Category)
     710            1 :         }
     711            1 :         var blobReferences sstable.BlobReferences
     712            1 :         if r.Attributes.Has(sstable.AttributeBlobValues) {
     713            1 :                 if len(file.BlobReferences) == 0 {
     714            0 :                         return nil, errors.AssertionFailedf("pebble: sstable %s has blob values but no blob references", file.TableNum)
     715            0 :                 }
     716            1 :                 blobReferences = &file.BlobReferences
     717              :         }
     718            1 :         var err error
     719            1 :         if internalOpts.compaction {
     720            1 :                 iter, err = reader.NewCompactionIter(transforms, internalOpts.readEnv,
     721            1 :                         &v.readerProvider, sstable.TableBlobContext{
     722            1 :                                 ValueFetcher: internalOpts.blobValueFetcher,
     723            1 :                                 References:   blobReferences,
     724            1 :                         })
     725            1 :         } else {
     726            1 :                 iter, err = reader.NewPointIter(ctx, sstable.IterOptions{
     727            1 :                         Lower:                opts.GetLowerBound(),
     728            1 :                         Upper:                opts.GetUpperBound(),
     729            1 :                         Transforms:           transforms,
     730            1 :                         FilterBlockSizeLimit: filterBlockSizeLimit,
     731            1 :                         Filterer:             filterer,
     732            1 :                         Env:                  internalOpts.readEnv,
     733            1 :                         ReaderProvider:       &v.readerProvider,
     734            1 :                         BlobContext: sstable.TableBlobContext{
     735            1 :                                 ValueFetcher: internalOpts.blobValueFetcher,
     736            1 :                                 References:   blobReferences,
     737            1 :                         },
     738            1 :                         MaximumSuffixProperty: opts.GetMaximumSuffixProperty(),
     739            1 :                 })
     740            1 :         }
     741            1 :         if err != nil {
     742            0 :                 return nil, err
     743            0 :         }
     744              :         // NB: closeHook (v.closeHook) takes responsibility for calling
     745              :         // unrefValue(v) here. Take care to avoid introducing an allocation here by
     746              :         // adding a closure.
     747            1 :         closeHook := h.addReference(v)
     748            1 :         iter.SetCloseHook(closeHook)
     749            1 :         return iter, nil
     750              : }
     751              : 
     752            1 : func (h *fileCacheHandle) addReference(v *fileCacheValue) (closeHook func()) {
     753            1 :         h.iterCount.Add(1)
     754            1 :         closeHook = v.closeHook
     755            1 :         if invariants.RaceEnabled {
     756            0 :                 stack := debug.Stack()
     757            0 :                 h.raceMu.Lock()
     758            0 :                 refID := h.raceMu.nextRefID
     759            0 :                 h.raceMu.openRefs[refID] = stack
     760            0 :                 h.raceMu.nextRefID++
     761            0 :                 h.raceMu.Unlock()
     762            0 :                 // In race builds, this closeHook closure will force an allocation.
     763            0 :                 // Race builds are already unperformant (and allocate a stack trace), so
     764            0 :                 // we don't care.
     765            0 :                 closeHook = func() {
     766            0 :                         v.closeHook()
     767            0 :                         h.raceMu.Lock()
     768            0 :                         defer h.raceMu.Unlock()
     769            0 :                         delete(h.raceMu.openRefs, refID)
     770            0 :                 }
     771              :         }
     772            1 :         return closeHook
     773              : }
     774              : 
     775              : // SetupBlobReaderProvider creates a fileCachHandle blob.ReaderProvider for
     776              : // reading blob files. The caller is responsible for calling the returned cleanup
     777              : // function.
     778              : //
     779              : // NB: This function is intended for testing and tooling purposes only. It
     780              : // provides blob file access outside of normal database operations and is not
     781              : // used by databases opened through Open().
     782              : func SetupBlobReaderProvider(
     783              :         fs vfs.FS, path string, opts *Options, readOpts sstable.ReaderOptions,
     784            0 : ) (blob.ReaderProvider, func(), error) {
     785            0 :         var fc *FileCache
     786            0 :         var c *cache.Cache
     787            0 :         var ch *cache.Handle
     788            0 :         var objProvider objstorage.Provider
     789            0 :         var provider *fileCacheHandle
     790            0 : 
     791            0 :         // Helper to clean up resources in case of error.
     792            0 :         cleanup := func() {
     793            0 :                 if provider != nil {
     794            0 :                         _ = provider.Close()
     795            0 :                 }
     796            0 :                 if objProvider != nil {
     797            0 :                         _ = objProvider.Close()
     798            0 :                 }
     799            0 :                 if ch != nil {
     800            0 :                         ch.Close()
     801            0 :                 }
     802            0 :                 if c != nil {
     803            0 :                         c.Unref()
     804            0 :                 }
     805            0 :                 if fc != nil {
     806            0 :                         fc.Unref()
     807            0 :                 }
     808              :         }
     809              : 
     810            0 :         fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
     811            0 :         if opts.FileCache == nil {
     812            0 :                 fc = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
     813            0 :         } else {
     814            0 :                 fc = opts.FileCache
     815            0 :                 fc.Ref()
     816            0 :         }
     817              : 
     818            0 :         if opts.Cache == nil {
     819            0 :                 c = cache.New(opts.CacheSize)
     820            0 :         } else {
     821            0 :                 c = opts.Cache
     822            0 :                 c.Ref()
     823            0 :         }
     824            0 :         ch = c.NewHandle()
     825            0 : 
     826            0 :         var err error
     827            0 :         objProvider, err = objstorageprovider.Open(objstorageprovider.DefaultSettings(fs, path))
     828            0 :         if err != nil {
     829            0 :                 cleanup()
     830            0 :                 return nil, nil, err
     831            0 :         }
     832              : 
     833            0 :         provider = fc.newHandle(
     834            0 :                 ch,
     835            0 :                 objProvider,
     836            0 :                 opts.LoggerAndTracer,
     837            0 :                 readOpts,
     838            0 :                 func(base.ObjectInfo, error) error { return nil },
     839              :         )
     840              : 
     841            0 :         return provider, cleanup, nil
     842              : }
     843              : 
     844              : // newRangeDelIter is an internal helper that constructs an iterator over a
     845              : // sstable's range deletions. This function is for table-cache internal use
     846              : // only, and callers should use newIters instead.
     847              : func newRangeDelIter(
     848              :         ctx context.Context,
     849              :         file *manifest.TableMetadata,
     850              :         r *sstable.Reader,
     851              :         handle *fileCacheHandle,
     852              :         internalOpts internalIterOpts,
     853            1 : ) (keyspan.FragmentIterator, error) {
     854            1 :         // NB: range-del iterator does not maintain a reference to the table, nor
     855            1 :         // does it need to read from it after creation.
     856            1 :         rangeDelIter, err := r.NewRawRangeDelIter(ctx, file.FragmentIterTransforms(), internalOpts.readEnv)
     857            1 :         if err != nil {
     858            0 :                 return nil, err
     859            0 :         }
     860              :         // Assert expected bounds in tests.
     861            1 :         if invariants.Sometimes(50) && rangeDelIter != nil {
     862            1 :                 cmp := base.DefaultComparer.Compare
     863            1 :                 if handle.readerOpts.Comparer != nil {
     864            1 :                         cmp = handle.readerOpts.Comparer.Compare
     865            1 :                 }
     866            1 :                 rangeDelIter = keyspan.AssertBounds(
     867            1 :                         rangeDelIter, file.PointKeyBounds.Smallest(), file.PointKeyBounds.LargestUserKey(), cmp,
     868            1 :                 )
     869              :         }
     870            1 :         return rangeDelIter, nil
     871              : }
     872              : 
     873              : // newRangeKeyIter is an internal helper that constructs an iterator over a
     874              : // sstable's range keys. This function is for table-cache internal use only, and
     875              : // callers should use newIters instead.
     876              : func newRangeKeyIter(
     877              :         ctx context.Context,
     878              :         file *manifest.TableMetadata,
     879              :         r *sstable.Reader,
     880              :         opts keyspan.SpanIterOptions,
     881              :         internalOpts internalIterOpts,
     882            1 : ) (keyspan.FragmentIterator, error) {
     883            1 :         transforms := file.FragmentIterTransforms()
     884            1 :         // Don't filter a table's range keys if the file contains RANGEKEYDELs.
     885            1 :         // The RANGEKEYDELs may delete range keys in other levels. Skipping the
     886            1 :         // file's range key blocks may surface deleted range keys below. This is
     887            1 :         // done here, rather than deferring to the block-property collector in order
     888            1 :         // to maintain parity with point keys and the treatment of RANGEDELs.
     889            1 :         if !r.Attributes.Has(sstable.AttributeRangeKeyDels) && len(opts.RangeKeyFilters) > 0 {
     890            0 :                 ok, _, err := checkAndIntersectFilters(r, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
     891            0 :                 if err != nil {
     892            0 :                         return nil, err
     893            0 :                 } else if !ok {
     894            0 :                         return nil, nil
     895            0 :                 }
     896              :         }
     897              :         // TODO(radu): wrap in an AssertBounds.
     898            1 :         return r.NewRawRangeKeyIter(ctx, transforms, internalOpts.readEnv)
     899              : }
     900              : 
     901              : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
     902              : // specific table.
     903              : type tableCacheShardReaderProvider struct {
     904              :         c   *genericcache.Cache[fileCacheKey, fileCacheValue, initFileOpts]
     905              :         key fileCacheKey
     906              : 
     907              :         mu struct {
     908              :                 sync.Mutex
     909              :                 // r is the result of c.FindOrCreate(), only set iff refCount > 0.
     910              :                 r genericcache.ValueRef[fileCacheKey, fileCacheValue, initFileOpts]
     911              :                 // refCount is the number of GetReader() calls that have not received a
     912              :                 // corresponding Close().
     913              :                 refCount int
     914              :         }
     915              : }
     916              : 
     917              : var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{}
     918              : 
     919            1 : func (rp *tableCacheShardReaderProvider) init(fc *FileCache, key fileCacheKey) {
     920            1 :         rp.c = &fc.c
     921            1 :         rp.key = key
     922            1 :         rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue, initFileOpts]{}
     923            1 :         rp.mu.refCount = 0
     924            1 : }
     925              : 
     926              : // GetReader implements valblk.ReaderProvider. Note that it is not the
     927              : // responsibility of tableCacheShardReaderProvider to ensure that the file
     928              : // continues to exist. The ReaderProvider is used in iterators where the
     929              : // top-level iterator is pinning the read state and preventing the files from
     930              : // being deleted.
     931              : //
     932              : // The caller must call tableCacheShardReaderProvider.Close.
     933              : //
     934              : // Note that currently the Reader returned here is only used to read value
     935              : // blocks. This reader shouldn't be used for other purposes like reading keys
     936              : // outside of virtual sstable bounds.
     937              : //
     938              : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
     939              : // that the reader isn't used for other purposes.
     940              : func (rp *tableCacheShardReaderProvider) GetReader(
     941              :         ctx context.Context, stats block.InitFileReadStats,
     942            1 : ) (valblk.ExternalBlockReader, error) {
     943            1 :         rp.mu.Lock()
     944            1 :         defer rp.mu.Unlock()
     945            1 : 
     946            1 :         if rp.mu.refCount > 0 {
     947            0 :                 // We already have a value.
     948            0 :                 rp.mu.refCount++
     949            0 :                 return rp.mu.r.Value().mustSSTableReader(), nil
     950            0 :         }
     951              : 
     952              :         // Calling FindOrCreate gives us the responsibility of Unref()ing r, which
     953              :         // will happen when rp.mu.refCount reaches 0. Note that if the table is no
     954              :         // longer in the cache, FindOrCreate will need to do IO (through initFn in
     955              :         // NewFileCache) to initialize a new Reader. We hold rp.mu during this time so
     956              :         // that concurrent GetReader calls block until the Reader is created.
     957            1 :         r, err := rp.c.FindOrCreate(ctx, rp.key, initFileOpts{stats: stats})
     958            1 :         if err != nil {
     959            0 :                 return nil, err
     960            0 :         }
     961            1 :         rp.mu.r = r
     962            1 :         rp.mu.refCount = 1
     963            1 :         return r.Value().mustSSTableReader(), nil
     964              : }
     965              : 
     966              : // Close implements valblk.ReaderProvider.
     967            1 : func (rp *tableCacheShardReaderProvider) Close() {
     968            1 :         rp.mu.Lock()
     969            1 :         defer rp.mu.Unlock()
     970            1 :         rp.mu.refCount--
     971            1 :         if rp.mu.refCount <= 0 {
     972            1 :                 if rp.mu.refCount < 0 {
     973            0 :                         panic("pebble: sstable.ReaderProvider misuse")
     974              :                 }
     975            1 :                 rp.mu.r.Unref()
     976            1 :                 rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue, initFileOpts]{}
     977              :         }
     978              : }
     979              : 
     980              : // getTableProperties returns sst table properties for the backing file.
     981              : //
     982              : // WARNING! If file is a virtual table, we return the properties of the physical
     983              : // table.
     984              : func (h *fileCacheHandle) getTableProperties(
     985              :         file *manifest.TableMetadata,
     986            0 : ) (*sstable.Properties, error) {
     987            0 :         // Calling findOrCreateTable gives us the responsibility of decrementing v's
     988            0 :         // refCount here
     989            0 :         v, err := h.findOrCreateTable(context.TODO(), file, initFileOpts{})
     990            0 :         if err != nil {
     991            0 :                 return nil, err
     992            0 :         }
     993            0 :         defer v.Unref()
     994            0 : 
     995            0 :         r := v.Value().mustSSTableReader()
     996            0 :         props, err := r.ReadPropertiesBlock(context.TODO(), nil /* buffer pool */)
     997            0 :         if err != nil {
     998            0 :                 return nil, err
     999            0 :         }
    1000            0 :         return &props, nil
    1001              : }
    1002              : 
    1003              : type fileCacheValue struct {
    1004              :         closeHook func()
    1005              :         reader    io.Closer // *sstable.Reader or *blob.FileReader
    1006              :         isShared  bool
    1007              : 
    1008              :         // readerProvider is embedded here so that we only allocate it once as long as
    1009              :         // the table stays in the cache. Its state is not always logically tied to
    1010              :         // this specific fileCacheShard - if a table goes out of the cache and then
    1011              :         // comes back in, the readerProvider in a now-defunct fileCacheValue can
    1012              :         // still be used and will internally refer to the new fileCacheValue.
    1013              :         readerProvider tableCacheShardReaderProvider
    1014              : }
    1015              : 
    1016              : // mustSSTable retrieves the value's *sstable.Reader. It panics if the cached
    1017              : // file is not a sstable (i.e., it is a blob file).
    1018            1 : func (v *fileCacheValue) mustSSTableReader() *sstable.Reader {
    1019            1 :         return v.reader.(*sstable.Reader)
    1020            1 : }
    1021              : 
    1022              : // mustBlob retrieves the value's *blob.FileReader. It panics if the cached file
    1023              : // is not a blob file.
    1024            1 : func (v *fileCacheValue) mustBlob() *blob.FileReader {
    1025            1 :         return v.reader.(*blob.FileReader)
    1026            1 : }
    1027              : 
    1028              : // iterSet holds a set of iterators of various key kinds, all constructed over
    1029              : // the same data structure (eg, an sstable). A subset of the fields may be
    1030              : // populated depending on the `iterKinds` passed to newIters.
    1031              : type iterSet struct {
    1032              :         point         internalIterator
    1033              :         rangeDeletion keyspan.FragmentIterator
    1034              :         rangeKey      keyspan.FragmentIterator
    1035              : }
    1036              : 
    1037              : // TODO(jackson): Consider adding methods for fast paths that check whether an
    1038              : // iterator of a particular kind is nil, so that these call sites don't need to
    1039              : // reach into the struct's fields directly.
    1040              : 
    1041              : // Point returns the contained point iterator. If there is no point iterator,
    1042              : // Point returns a non-nil empty point iterator.
    1043            1 : func (s *iterSet) Point() internalIterator {
    1044            1 :         if s.point == nil {
    1045            1 :                 return emptyIter
    1046            1 :         }
    1047            1 :         return s.point
    1048              : }
    1049              : 
    1050              : // RangeDeletion returns the contained range deletion iterator. If there is no
    1051              : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
    1052              : // iterator.
    1053            1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
    1054            1 :         if s.rangeDeletion == nil {
    1055            1 :                 return emptyKeyspanIter
    1056            1 :         }
    1057            1 :         return s.rangeDeletion
    1058              : }
    1059              : 
    1060              : // RangeKey returns the contained range key iterator. If there is no range key
    1061              : // iterator, RangeKey returns a non-nil empty keyspan iterator.
    1062            1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
    1063            1 :         if s.rangeKey == nil {
    1064            0 :                 return emptyKeyspanIter
    1065            0 :         }
    1066            1 :         return s.rangeKey
    1067              : }
    1068              : 
    1069              : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
    1070              : // must be not be called on the constituent iterators.
    1071            1 : func (s *iterSet) CloseAll() error {
    1072            1 :         var err error
    1073            1 :         if s.point != nil {
    1074            1 :                 err = s.point.Close()
    1075            1 :                 s.point = nil
    1076            1 :         }
    1077            1 :         if s.rangeDeletion != nil {
    1078            1 :                 s.rangeDeletion.Close()
    1079            1 :                 s.rangeDeletion = nil
    1080            1 :         }
    1081            1 :         if s.rangeKey != nil {
    1082            1 :                 s.rangeKey.Close()
    1083            1 :                 s.rangeKey = nil
    1084            1 :         }
    1085            1 :         return err
    1086              : }
    1087              : 
    1088              : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
    1089              : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
    1090              : // represent a set of desired iterator kinds.
    1091              : type iterKinds uint8
    1092              : 
    1093            1 : func (t iterKinds) Point() bool         { return (t & iterPointKeys) != 0 }
    1094            1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
    1095            1 : func (t iterKinds) RangeKey() bool      { return (t & iterRangeKeys) != 0 }
    1096              : 
    1097              : const (
    1098              :         iterPointKeys iterKinds = 1 << iota
    1099              :         iterRangeDeletions
    1100              :         iterRangeKeys
    1101              : )
        

Generated by: LCOV version 2.0-1