LCOV - code coverage report
Current view: top level - pebble - file_cache.go (source / functions) Coverage Total Hit
Test: 2025-03-10 08:17Z de0c5a11 - meta test only.lcov Lines: 79.3 % 473 375
Test Date: 2025-03-10 08:18:31 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "fmt"
      11              :         "io"
      12              :         "runtime/debug"
      13              :         "sync"
      14              :         "sync/atomic"
      15              :         "unsafe"
      16              : 
      17              :         "github.com/cockroachdb/errors"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/cache"
      20              :         "github.com/cockroachdb/pebble/internal/genericcache"
      21              :         "github.com/cockroachdb/pebble/internal/invariants"
      22              :         "github.com/cockroachdb/pebble/internal/keyspan"
      23              :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      24              :         "github.com/cockroachdb/pebble/internal/manifest"
      25              :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      26              :         "github.com/cockroachdb/pebble/objstorage"
      27              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      28              :         "github.com/cockroachdb/pebble/sstable"
      29              :         "github.com/cockroachdb/pebble/sstable/block"
      30              :         "github.com/cockroachdb/pebble/sstable/valblk"
      31              :         "github.com/cockroachdb/redact"
      32              : )
      33              : 
      34              : var emptyIter = &errorIter{err: nil}
      35              : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
      36              : 
      37              : // tableNewIters creates new iterators (point, range deletion and/or range key)
      38              : // for the given table metadata. Which of the various iterator kinds the user is
      39              : // requesting is specified with the iterKinds bitmap.
      40              : //
      41              : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
      42              : // populated with iterators.
      43              : //
      44              : // If a point iterator is requested and the operation was successful,
      45              : // iters.point is guaranteed to be non-nil and must be closed when the caller is
      46              : // finished.
      47              : //
      48              : // If a range deletion or range key iterator is requested, the corresponding
      49              : // iterator may be nil if the table does not contain any keys of the
      50              : // corresponding kind. The returned iterSet type provides RangeDeletion() and
      51              : // RangeKey() convenience methods that return non-nil empty iterators that may
      52              : // be used if the caller requires a non-nil iterator.
      53              : //
      54              : // On error, all iterators are nil.
      55              : //
      56              : // The only (non-test) implementation of tableNewIters is
      57              : // fileCacheHandle.newIters().
      58              : type tableNewIters func(
      59              :         ctx context.Context,
      60              :         file *manifest.TableMetadata,
      61              :         opts *IterOptions,
      62              :         internalOpts internalIterOpts,
      63              :         kinds iterKinds,
      64              : ) (iterSet, error)
      65              : 
      66              : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
      67              : // for the rangedel iterator returned by tableNewIters.
      68            1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
      69            1 :         return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      70            1 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
      71            1 :                 if err != nil {
      72            0 :                         return nil, err
      73            0 :                 }
      74            1 :                 return iters.RangeDeletion(), nil
      75              :         }
      76              : }
      77              : 
      78              : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
      79              : // for the range key iterator returned by tableNewIters.
      80            1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
      81            1 :         return func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      82            1 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
      83            1 :                 if err != nil {
      84            0 :                         return nil, err
      85            0 :                 }
      86            1 :                 return iters.RangeKey(), nil
      87              :         }
      88              : }
      89              : 
      90              : // fileCacheHandle is used to access the file cache. Each DB has its own handle.
      91              : type fileCacheHandle struct {
      92              :         fileCache *FileCache
      93              : 
      94              :         // The handle contains fields which are unique to each DB. Note that these get
      95              :         // accessed from all shards, so keep read-only fields separate for read-write
      96              :         // fields.
      97              :         loggerAndTracer  LoggerAndTracer
      98              :         blockCacheHandle *cache.Handle
      99              :         objProvider      objstorage.Provider
     100              :         readerOpts       sstable.ReaderOptions
     101              : 
     102              :         // iterCount keeps track of how many iterators are open. It is used to keep
     103              :         // track of leaked iterators on a per-db level.
     104              :         iterCount         atomic.Int32
     105              :         sstStatsCollector block.CategoryStatsCollector
     106              : 
     107              :         // reportCorruptionFn is used for block.ReadEnv.ReportCorruptionFn. It expects
     108              :         // the first argument to be a `*TableMetadata`.
     109              :         reportCorruptionFn func(any, error)
     110              : 
     111              :         // This struct is only populated in race builds.
     112              :         raceMu struct {
     113              :                 sync.Mutex
     114              :                 // nextRefID is the next ID to allocate for a new reference.
     115              :                 nextRefID uint64
     116              :                 // openRefs maps reference IDs to the stack trace recorded at creation
     117              :                 // time. It's used to track which call paths leaked open references to
     118              :                 // files.
     119              :                 openRefs map[uint64][]byte
     120              :         }
     121              : }
     122              : 
     123              : // newHandle creates a handle for the FileCache which has its own options. Each
     124              : // handle has its own set of files in the cache, separate from those of other
     125              : // handles.
     126              : func (c *FileCache) newHandle(
     127              :         cacheHandle *cache.Handle,
     128              :         objProvider objstorage.Provider,
     129              :         loggerAndTracer LoggerAndTracer,
     130              :         readerOpts sstable.ReaderOptions,
     131              :         reportSSTableCorruptionFn func(*manifest.TableMetadata, error),
     132            1 : ) *fileCacheHandle {
     133            1 :         c.Ref()
     134            1 : 
     135            1 :         t := &fileCacheHandle{
     136            1 :                 fileCache:        c,
     137            1 :                 loggerAndTracer:  loggerAndTracer,
     138            1 :                 blockCacheHandle: cacheHandle,
     139            1 :                 objProvider:      objProvider,
     140            1 :         }
     141            1 :         t.readerOpts = readerOpts
     142            1 :         t.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
     143            1 :         if reportSSTableCorruptionFn != nil {
     144            1 :                 t.reportCorruptionFn = func(arg any, err error) {
     145            0 :                         reportSSTableCorruptionFn(arg.(*manifest.TableMetadata), err)
     146            0 :                 }
     147              :         }
     148            1 :         if invariants.RaceEnabled {
     149            0 :                 t.raceMu.openRefs = make(map[uint64][]byte)
     150            0 :         }
     151            1 :         return t
     152              : }
     153              : 
     154              : // Close the handle, make sure that there will be no further need
     155              : // to access any of the files associated with the store.
     156            1 : func (h *fileCacheHandle) Close() error {
     157            1 :         // We want to do some cleanup work here. Check for leaked iterators
     158            1 :         // by the DB using this container. Note that we'll still perform cleanup
     159            1 :         // below in the case that there are leaked iterators.
     160            1 :         var err error
     161            1 :         if v := h.iterCount.Load(); v > 0 {
     162            0 :                 if !invariants.RaceEnabled {
     163            0 :                         err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
     164            0 :                 } else {
     165            0 :                         var buf bytes.Buffer
     166            0 :                         for _, stack := range h.raceMu.openRefs {
     167            0 :                                 fmt.Fprintf(&buf, "%s\n", stack)
     168            0 :                         }
     169            0 :                         err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
     170              :                 }
     171              :         }
     172              : 
     173              :         // EvictAll would panic if there are still outstanding references.
     174            1 :         if err == nil {
     175            1 :                 keys := h.fileCache.c.EvictAll(func(key fileCacheKey) bool {
     176            1 :                         return key.handle == h
     177            1 :                 })
     178              :                 // Evict any associated blocks in the cache.
     179            1 :                 for i := range keys {
     180            1 :                         h.blockCacheHandle.EvictFile(keys[i].fileNum)
     181            1 :                 }
     182              :         }
     183              : 
     184            1 :         h.fileCache.Unref()
     185            1 :         // TODO(radu): we have to tolerate metrics() calls after close (see
     186            1 :         // https://github.com/cockroachdb/cockroach/issues/140454).
     187            1 :         // *h = fileCacheHandle{}
     188            1 :         return err
     189              : }
     190              : 
     191              : // openFile is called when we insert a new entry in the file cache.
     192              : func (h *fileCacheHandle) openFile(
     193              :         ctx context.Context, fileNum base.DiskFileNum,
     194            1 : ) (*sstable.Reader, objstorage.ObjectMetadata, error) {
     195            1 :         f, err := h.objProvider.OpenForReading(
     196            1 :                 ctx, base.FileTypeTable, fileNum, objstorage.OpenOptions{MustExist: true},
     197            1 :         )
     198            1 :         if err != nil {
     199            0 :                 return nil, objstorage.ObjectMetadata{}, err
     200            0 :         }
     201            1 :         o := h.readerOpts
     202            1 :         o.CacheOpts = sstableinternal.CacheOptions{
     203            1 :                 CacheHandle: h.blockCacheHandle,
     204            1 :                 FileNum:     fileNum,
     205            1 :         }
     206            1 :         r, err := sstable.NewReader(ctx, f, o)
     207            1 :         if err != nil {
     208            0 :                 return nil, objstorage.ObjectMetadata{}, err
     209            0 :         }
     210            1 :         objMeta, err := h.objProvider.Lookup(base.FileTypeTable, fileNum)
     211            1 :         if err != nil {
     212            0 :                 r.Close()
     213            0 :                 return nil, objstorage.ObjectMetadata{}, err
     214            0 :         }
     215            1 :         return r, objMeta, nil
     216              : }
     217              : 
     218              : // findOrCreate retrieves an existing reader or creates a new one for the
     219              : // backing file of the given table. If a corruption error is encountered,
     220              : // reportCorruptionFn() is called.
     221              : func (h *fileCacheHandle) findOrCreate(
     222              :         ctx context.Context, meta *manifest.TableMetadata,
     223            1 : ) (genericcache.ValueRef[fileCacheKey, fileCacheValue], error) {
     224            1 :         key := fileCacheKey{
     225            1 :                 handle:  h,
     226            1 :                 fileNum: meta.FileBacking.DiskFileNum,
     227            1 :         }
     228            1 :         valRef, err := h.fileCache.c.FindOrCreate(ctx, key)
     229            1 :         if err != nil && IsCorruptionError(err) {
     230            0 :                 h.reportCorruptionFn(meta, err)
     231            0 :         }
     232            1 :         return valRef, err
     233              : }
     234              : 
     235              : // Evict the given file from the file cache and the block cache.
     236            1 : func (h *fileCacheHandle) Evict(fileNum base.DiskFileNum) {
     237            1 :         h.fileCache.c.Evict(fileCacheKey{handle: h, fileNum: fileNum})
     238            1 :         h.blockCacheHandle.EvictFile(fileNum)
     239            1 : }
     240              : 
     241            1 : func (h *fileCacheHandle) SSTStatsCollector() *block.CategoryStatsCollector {
     242            1 :         return &h.sstStatsCollector
     243            1 : }
     244              : 
     245              : // Metrics returns metrics for the file cache. Note that the CacheMetrics track
     246              : // the global cache which is shared between multiple handles (stores). The
     247              : // FilterMetrics are per-handle.
     248            1 : func (h *fileCacheHandle) Metrics() (CacheMetrics, FilterMetrics) {
     249            1 :         m := h.fileCache.c.Metrics()
     250            1 :         cm := CacheMetrics{
     251            1 :                 Hits:   m.Hits,
     252            1 :                 Misses: m.Misses,
     253            1 :                 Count:  m.Count,
     254            1 :                 Size:   m.Size + m.Count*int64(unsafe.Sizeof(sstable.Reader{})),
     255            1 :         }
     256            1 :         fm := h.readerOpts.FilterMetricsTracker.Load()
     257            1 :         return cm, fm
     258            1 : }
     259              : 
     260              : func (h *fileCacheHandle) estimateSize(
     261              :         meta *tableMetadata, lower, upper []byte,
     262            1 : ) (size uint64, err error) {
     263            1 :         err = h.withCommonReader(context.TODO(), block.NoReadEnv, meta, func(cr sstable.CommonReader, env block.ReadEnv) error {
     264            1 :                 size, err = cr.EstimateDiskUsage(lower, upper)
     265            1 :                 return err
     266            1 :         })
     267            1 :         return size, err
     268              : }
     269              : 
     270              : // createCommonReader creates a Reader for this file.
     271            1 : func createCommonReader(v *fileCacheValue, file *tableMetadata) sstable.CommonReader {
     272            1 :         // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
     273            1 :         r := v.mustSSTableReader()
     274            1 :         var cr sstable.CommonReader = r
     275            1 :         if file.Virtual {
     276            1 :                 virtualReader := sstable.MakeVirtualReader(
     277            1 :                         r, file.VirtualMeta().VirtualReaderParams(v.isShared),
     278            1 :                 )
     279            1 :                 cr = &virtualReader
     280            1 :         }
     281            1 :         return cr
     282              : }
     283              : 
     284              : func (h *fileCacheHandle) withCommonReader(
     285              :         ctx context.Context,
     286              :         env block.ReadEnv,
     287              :         meta *tableMetadata,
     288              :         fn func(sstable.CommonReader, block.ReadEnv) error,
     289            1 : ) error {
     290            1 :         ref, err := h.findOrCreate(ctx, meta)
     291            1 :         if err != nil {
     292            0 :                 return err
     293            0 :         }
     294            1 :         defer ref.Unref()
     295            1 :         env.ReportCorruptionFn = h.reportCorruptionFn
     296            1 :         env.ReportCorruptionArg = meta
     297            1 :         return fn(createCommonReader(ref.Value(), meta), env)
     298              : }
     299              : 
     300              : func (h *fileCacheHandle) withReader(
     301              :         ctx context.Context,
     302              :         env block.ReadEnv,
     303              :         meta physicalMeta,
     304              :         fn func(*sstable.Reader, block.ReadEnv) error,
     305            1 : ) error {
     306            1 :         ref, err := h.findOrCreate(ctx, meta.TableMetadata)
     307            1 :         if err != nil {
     308            0 :                 return err
     309            0 :         }
     310            1 :         defer ref.Unref()
     311            1 :         env.ReportCorruptionFn = h.reportCorruptionFn
     312            1 :         env.ReportCorruptionArg = meta.TableMetadata
     313            1 :         return fn(ref.Value().reader.(*sstable.Reader), env)
     314              : }
     315              : 
     316              : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
     317              : func (h *fileCacheHandle) withVirtualReader(
     318              :         ctx context.Context,
     319              :         env block.ReadEnv,
     320              :         meta virtualMeta,
     321              :         fn func(sstable.VirtualReader, block.ReadEnv) error,
     322            1 : ) error {
     323            1 :         ref, err := h.findOrCreate(ctx, meta.TableMetadata)
     324            1 :         if err != nil {
     325            0 :                 return err
     326            0 :         }
     327            1 :         defer ref.Unref()
     328            1 :         v := ref.Value()
     329            1 :         env.ReportCorruptionFn = h.reportCorruptionFn
     330            1 :         env.ReportCorruptionArg = &meta.TableMetadata
     331            1 :         return fn(sstable.MakeVirtualReader(v.mustSSTableReader(), meta.VirtualReaderParams(v.isShared)), env)
     332              : }
     333              : 
     334            1 : func (h *fileCacheHandle) IterCount() int64 {
     335            1 :         return int64(h.iterCount.Load())
     336            1 : }
     337              : 
     338              : // FileCache is a shareable cache for open files. Open files are exclusively
     339              : // sstable files today.
     340              : type FileCache struct {
     341              :         refs atomic.Int64
     342              : 
     343              :         c genericcache.Cache[fileCacheKey, fileCacheValue]
     344              : }
     345              : 
     346              : // Ref adds a reference to the file cache. Once a file cache is constructed, the
     347              : // cache only remains valid if there is at least one reference to it.
     348            1 : func (c *FileCache) Ref() {
     349            1 :         v := c.refs.Add(1)
     350            1 :         // We don't want the reference count to ever go from 0 -> 1,
     351            1 :         // cause a reference count of 0 implies that we've closed the cache.
     352            1 :         if v <= 1 {
     353            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     354              :         }
     355              : }
     356              : 
     357              : // Unref removes a reference to the file cache.
     358            1 : func (c *FileCache) Unref() {
     359            1 :         v := c.refs.Add(-1)
     360            1 :         switch {
     361            0 :         case v < 0:
     362            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     363            1 :         case v == 0:
     364            1 :                 c.c.Close()
     365            1 :                 c.c = genericcache.Cache[fileCacheKey, fileCacheValue]{}
     366              :         }
     367              : }
     368              : 
     369              : // NewFileCache will create a new file cache with one outstanding reference. It
     370              : // is the callers responsibility to call Unref if they will no longer hold a
     371              : // reference to the file cache.
     372            1 : func NewFileCache(numShards int, size int) *FileCache {
     373            1 :         if size == 0 {
     374            0 :                 panic("pebble: cannot create a file cache of size 0")
     375            1 :         } else if numShards == 0 {
     376            0 :                 panic("pebble: cannot create a file cache with 0 shards")
     377              :         }
     378              : 
     379            1 :         c := &FileCache{}
     380            1 : 
     381            1 :         // initFn is used whenever a new entry is added to the file cache.
     382            1 :         initFn := func(ctx context.Context, key fileCacheKey, vRef genericcache.ValueRef[fileCacheKey, fileCacheValue]) error {
     383            1 :                 v := vRef.Value()
     384            1 :                 handle := key.handle
     385            1 :                 v.readerProvider.init(c, key)
     386            1 :                 v.closeHook = func() {
     387            1 :                         // closeHook is called when an iterator is closed; the initialization of
     388            1 :                         // an iterator with this value will happen after a FindOrCreate() call
     389            1 :                         // with returns the same vRef.
     390            1 :                         vRef.Unref()
     391            1 :                         handle.iterCount.Add(-1)
     392            1 :                 }
     393            1 :                 reader, objMeta, err := handle.openFile(ctx, key.fileNum)
     394            1 :                 if err != nil {
     395            0 :                         return errors.Wrapf(err, "pebble: backing file %s error", redact.Safe(key.fileNum))
     396            0 :                 }
     397            1 :                 v.reader = reader
     398            1 :                 v.isShared = objMeta.IsShared()
     399            1 :                 return nil
     400              :         }
     401              : 
     402            1 :         releaseFn := func(v *fileCacheValue) {
     403            1 :                 if v.reader != nil {
     404            1 :                         _ = v.reader.Close()
     405            1 :                         v.reader = nil
     406            1 :                 }
     407              :         }
     408              : 
     409            1 :         c.c.Init(size, numShards, initFn, releaseFn)
     410            1 : 
     411            1 :         // Hold a ref to the cache here.
     412            1 :         c.refs.Store(1)
     413            1 : 
     414            1 :         return c
     415              : }
     416              : 
     417              : type fileCacheKey struct {
     418              :         handle  *fileCacheHandle
     419              :         fileNum base.DiskFileNum
     420              : }
     421              : 
     422              : // Shard implements the genericcache.Key interface.
     423            1 : func (k fileCacheKey) Shard(numShards int) int {
     424            1 :         // TODO(radu): maybe incorporate a handle ID.
     425            1 :         return int(uint64(k.fileNum) % uint64(numShards))
     426            1 : }
     427              : 
     428              : // checkAndIntersectFilters checks the specific table and block property filters
     429              : // for intersection with any available table and block-level properties. Returns
     430              : // true for ok if this table should be read by this iterator.
     431              : func checkAndIntersectFilters(
     432              :         r *sstable.Reader,
     433              :         blockPropertyFilters []BlockPropertyFilter,
     434              :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
     435              :         syntheticSuffix sstable.SyntheticSuffix,
     436            1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
     437            1 :         if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
     438            1 :                 filterer, err = sstable.IntersectsTable(
     439            1 :                         blockPropertyFilters,
     440            1 :                         boundLimitedFilter,
     441            1 :                         r.Properties.UserProperties,
     442            1 :                         syntheticSuffix,
     443            1 :                 )
     444            1 :                 // NB: IntersectsTable will return a nil filterer if the table-level
     445            1 :                 // properties indicate there's no intersection with the provided filters.
     446            1 :                 if filterer == nil || err != nil {
     447            1 :                         return false, nil, err
     448            1 :                 }
     449              :         }
     450            1 :         return true, filterer, nil
     451              : }
     452              : 
     453              : func (h *fileCacheHandle) newIters(
     454              :         ctx context.Context,
     455              :         file *manifest.TableMetadata,
     456              :         opts *IterOptions,
     457              :         internalOpts internalIterOpts,
     458              :         kinds iterKinds,
     459            1 : ) (iterSet, error) {
     460            1 :         // Calling findOrCreate gives us the responsibility of Unref()ing vRef.
     461            1 :         vRef, err := h.findOrCreate(ctx, file)
     462            1 :         if err != nil {
     463            0 :                 return iterSet{}, err
     464            0 :         }
     465              : 
     466            1 :         internalOpts.readEnv.ReportCorruptionFn = h.reportCorruptionFn
     467            1 :         internalOpts.readEnv.ReportCorruptionArg = file
     468            1 : 
     469            1 :         v := vRef.Value()
     470            1 :         r := v.mustSSTableReader()
     471            1 :         // Note: This suffers an allocation for virtual sstables.
     472            1 :         cr := createCommonReader(v, file)
     473            1 :         var iters iterSet
     474            1 :         if kinds.RangeKey() && file.HasRangeKeys {
     475            1 :                 iters.rangeKey, err = newRangeKeyIter(ctx, r, file, cr, opts.SpanIterOptions(), internalOpts)
     476            1 :         }
     477            1 :         if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
     478            1 :                 iters.rangeDeletion, err = newRangeDelIter(ctx, file, cr, h, internalOpts)
     479            1 :         }
     480            1 :         if kinds.Point() && err == nil {
     481            1 :                 iters.point, err = h.newPointIter(ctx, v, file, cr, opts, internalOpts, h)
     482            1 :         }
     483            1 :         if err != nil {
     484            0 :                 // NB: There's a subtlety here: Because the point iterator is the last
     485            0 :                 // iterator we attempt to create, it's not possible for:
     486            0 :                 //   err != nil && iters.point != nil
     487            0 :                 // If it were possible, we'd need to account for it to avoid double
     488            0 :                 // unref-ing here, once during CloseAll and once during `unrefValue`.
     489            0 :                 iters.CloseAll()
     490            0 :                 vRef.Unref()
     491            0 :                 return iterSet{}, err
     492            0 :         }
     493              :         // Only point iterators ever require the reader stay pinned in the cache. If
     494              :         // we're not returning a point iterator to the caller, we need to unref v.
     495              :         //
     496              :         // For point iterators, v.closeHook will be called which will release the ref.
     497            1 :         if iters.point == nil {
     498            1 :                 vRef.Unref()
     499            1 :         }
     500            1 :         return iters, nil
     501              : }
     502              : 
     503              : // For flushable ingests, we decide whether to use the bloom filter base on
     504              : // size.
     505              : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
     506              : 
     507              : // newPointIter is an internal helper that constructs a point iterator over a
     508              : // sstable. This function is for internal use only, and callers should use
     509              : // newIters instead.
     510              : func (h *fileCacheHandle) newPointIter(
     511              :         ctx context.Context,
     512              :         v *fileCacheValue,
     513              :         file *manifest.TableMetadata,
     514              :         cr sstable.CommonReader,
     515              :         opts *IterOptions,
     516              :         internalOpts internalIterOpts,
     517              :         handle *fileCacheHandle,
     518            1 : ) (internalIterator, error) {
     519            1 :         var (
     520            1 :                 hideObsoletePoints bool = false
     521            1 :                 pointKeyFilters    []BlockPropertyFilter
     522            1 :                 filterer           *sstable.BlockPropertiesFilterer
     523            1 :         )
     524            1 :         r := v.mustSSTableReader()
     525            1 :         if opts != nil {
     526            1 :                 // This code is appending (at most one filter) in-place to
     527            1 :                 // opts.PointKeyFilters even though the slice is shared for iterators in
     528            1 :                 // the same iterator tree. This is acceptable since all the following
     529            1 :                 // properties are true:
     530            1 :                 // - The iterator tree is single threaded, so the shared backing for the
     531            1 :                 //   slice is being mutated in a single threaded manner.
     532            1 :                 // - Each shallow copy of the slice has its own notion of length.
     533            1 :                 // - The appended element is always the obsoleteKeyBlockPropertyFilter
     534            1 :                 //   struct, which is stateless, so overwriting that struct when creating
     535            1 :                 //   one sstable iterator is harmless to other sstable iterators that are
     536            1 :                 //   relying on that struct.
     537            1 :                 //
     538            1 :                 // An alternative would be to have different slices for different sstable
     539            1 :                 // iterators, but that requires more work to avoid allocations.
     540            1 :                 //
     541            1 :                 // TODO(bilal): for compaction reads of foreign sstables, we do hide
     542            1 :                 // obsolete points (see sstable.Reader.newCompactionIter) but we don't
     543            1 :                 // apply the obsolete block property filter. We could optimize this by
     544            1 :                 // applying the filter.
     545            1 :                 hideObsoletePoints, pointKeyFilters =
     546            1 :                         r.TryAddBlockPropertyFilterForHideObsoletePoints(
     547            1 :                                 opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
     548            1 : 
     549            1 :                 var ok bool
     550            1 :                 var err error
     551            1 :                 ok, filterer, err = checkAndIntersectFilters(r, pointKeyFilters,
     552            1 :                         internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
     553            1 :                 if err != nil {
     554            0 :                         return nil, err
     555            1 :                 } else if !ok {
     556            1 :                         // No point keys within the table match the filters.
     557            1 :                         return nil, nil
     558            1 :                 }
     559              :         }
     560              : 
     561            1 :         var iter sstable.Iterator
     562            1 :         filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
     563            1 :         if opts != nil {
     564            1 :                 // By default, we don't use block filters for L6 and restrict the size for
     565            1 :                 // flushable ingests, as these blocks can be very big.
     566            1 :                 if !opts.UseL6Filters {
     567            1 :                         if opts.layer == manifest.Level(6) {
     568            1 :                                 filterBlockSizeLimit = sstable.NeverUseFilterBlock
     569            1 :                         } else if opts.layer.IsFlushableIngests() {
     570            1 :                                 filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
     571            1 :                         }
     572              :                 }
     573            1 :                 if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
     574            1 :                         ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
     575            1 :                 }
     576              :         }
     577            1 :         tableFormat, err := r.TableFormat()
     578            1 :         if err != nil {
     579            0 :                 return nil, err
     580            0 :         }
     581              : 
     582            1 :         if v.isShared && file.SyntheticSeqNum() != 0 {
     583            1 :                 if tableFormat < sstable.TableFormatPebblev4 {
     584            0 :                         return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
     585            0 :                 }
     586              :                 // The table is shared and ingested.
     587            1 :                 hideObsoletePoints = true
     588              :         }
     589            1 :         transforms := file.IterTransforms()
     590            1 :         transforms.HideObsoletePoints = hideObsoletePoints
     591            1 :         if internalOpts.readEnv.IterStats == nil && opts != nil {
     592            1 :                 internalOpts.readEnv.IterStats = handle.SSTStatsCollector().Accumulator(uint64(uintptr(unsafe.Pointer(r))), opts.Category)
     593            1 :         }
     594            1 :         if internalOpts.compaction {
     595            1 :                 iter, err = cr.NewCompactionIter(transforms, internalOpts.readEnv, &v.readerProvider)
     596            1 :         } else {
     597            1 :                 iter, err = cr.NewPointIter(
     598            1 :                         ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer,
     599            1 :                         filterBlockSizeLimit, internalOpts.readEnv, &v.readerProvider)
     600            1 :         }
     601            1 :         if err != nil {
     602            0 :                 return nil, err
     603            0 :         }
     604              :         // NB: closeHook (v.closeHook) takes responsibility for calling
     605              :         // unrefValue(v) here. Take care to avoid introducing an allocation here by
     606              :         // adding a closure.
     607            1 :         closeHook := h.addReference(v)
     608            1 :         iter.SetCloseHook(closeHook)
     609            1 :         return iter, nil
     610              : }
     611              : 
     612            1 : func (h *fileCacheHandle) addReference(v *fileCacheValue) (closeHook func()) {
     613            1 :         h.iterCount.Add(1)
     614            1 :         closeHook = v.closeHook
     615            1 :         if invariants.RaceEnabled {
     616            0 :                 stack := debug.Stack()
     617            0 :                 h.raceMu.Lock()
     618            0 :                 refID := h.raceMu.nextRefID
     619            0 :                 h.raceMu.openRefs[refID] = stack
     620            0 :                 h.raceMu.nextRefID++
     621            0 :                 h.raceMu.Unlock()
     622            0 :                 // In race builds, this closeHook closure will force an allocation.
     623            0 :                 // Race builds are already unperformant (and allocate a stack trace), so
     624            0 :                 // we don't care.
     625            0 :                 closeHook = func() {
     626            0 :                         v.closeHook()
     627            0 :                         h.raceMu.Lock()
     628            0 :                         defer h.raceMu.Unlock()
     629            0 :                         delete(h.raceMu.openRefs, refID)
     630            0 :                 }
     631              :         }
     632            1 :         return closeHook
     633              : }
     634              : 
     635              : // newRangeDelIter is an internal helper that constructs an iterator over a
     636              : // sstable's range deletions. This function is for table-cache internal use
     637              : // only, and callers should use newIters instead.
     638              : func newRangeDelIter(
     639              :         ctx context.Context,
     640              :         file *manifest.TableMetadata,
     641              :         cr sstable.CommonReader,
     642              :         handle *fileCacheHandle,
     643              :         internalOpts internalIterOpts,
     644            1 : ) (keyspan.FragmentIterator, error) {
     645            1 :         // NB: range-del iterator does not maintain a reference to the table, nor
     646            1 :         // does it need to read from it after creation.
     647            1 :         rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms(), internalOpts.readEnv)
     648            1 :         if err != nil {
     649            0 :                 return nil, err
     650            0 :         }
     651              :         // Assert expected bounds in tests.
     652            1 :         if invariants.Sometimes(50) && rangeDelIter != nil {
     653            1 :                 cmp := base.DefaultComparer.Compare
     654            1 :                 if handle.readerOpts.Comparer != nil {
     655            1 :                         cmp = handle.readerOpts.Comparer.Compare
     656            1 :                 }
     657            1 :                 rangeDelIter = keyspan.AssertBounds(
     658            1 :                         rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
     659            1 :                 )
     660              :         }
     661            1 :         return rangeDelIter, nil
     662              : }
     663              : 
     664              : // newRangeKeyIter is an internal helper that constructs an iterator over a
     665              : // sstable's range keys. This function is for table-cache internal use only, and
     666              : // callers should use newIters instead.
     667              : func newRangeKeyIter(
     668              :         ctx context.Context,
     669              :         r *sstable.Reader,
     670              :         file *tableMetadata,
     671              :         cr sstable.CommonReader,
     672              :         opts keyspan.SpanIterOptions,
     673              :         internalOpts internalIterOpts,
     674            1 : ) (keyspan.FragmentIterator, error) {
     675            1 :         transforms := file.FragmentIterTransforms()
     676            1 :         // Don't filter a table's range keys if the file contains RANGEKEYDELs.
     677            1 :         // The RANGEKEYDELs may delete range keys in other levels. Skipping the
     678            1 :         // file's range key blocks may surface deleted range keys below. This is
     679            1 :         // done here, rather than deferring to the block-property collector in order
     680            1 :         // to maintain parity with point keys and the treatment of RANGEDELs.
     681            1 :         if r.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
     682            0 :                 ok, _, err := checkAndIntersectFilters(r, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
     683            0 :                 if err != nil {
     684            0 :                         return nil, err
     685            0 :                 } else if !ok {
     686            0 :                         return nil, nil
     687            0 :                 }
     688              :         }
     689              :         // TODO(radu): wrap in an AssertBounds.
     690            1 :         return cr.NewRawRangeKeyIter(ctx, transforms, internalOpts.readEnv)
     691              : }
     692              : 
     693              : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
     694              : // specific table.
     695              : type tableCacheShardReaderProvider struct {
     696              :         c   *genericcache.Cache[fileCacheKey, fileCacheValue]
     697              :         key fileCacheKey
     698              : 
     699              :         mu struct {
     700              :                 sync.Mutex
     701              :                 // r is the result of c.FindOrCreate(), only set iff refCount > 0.
     702              :                 r genericcache.ValueRef[fileCacheKey, fileCacheValue]
     703              :                 // refCount is the number of GetReader() calls that have not received a
     704              :                 // corresponding Close().
     705              :                 refCount int
     706              :         }
     707              : }
     708              : 
     709              : var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{}
     710              : 
     711            1 : func (rp *tableCacheShardReaderProvider) init(fc *FileCache, key fileCacheKey) {
     712            1 :         rp.c = &fc.c
     713            1 :         rp.key = key
     714            1 :         rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
     715            1 :         rp.mu.refCount = 0
     716            1 : }
     717              : 
     718              : // GetReader implements sstable.ReaderProvider. Note that it is not the
     719              : // responsibility of tableCacheShardReaderProvider to ensure that the file
     720              : // continues to exist. The ReaderProvider is used in iterators where the
     721              : // top-level iterator is pinning the read state and preventing the files from
     722              : // being deleted.
     723              : //
     724              : // The caller must call tableCacheShardReaderProvider.Close.
     725              : //
     726              : // Note that currently the Reader returned here is only used to read value
     727              : // blocks. This reader shouldn't be used for other purposes like reading keys
     728              : // outside of virtual sstable bounds.
     729              : //
     730              : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
     731              : // that the reader isn't used for other purposes.
     732              : func (rp *tableCacheShardReaderProvider) GetReader(
     733              :         ctx context.Context,
     734            1 : ) (valblk.ExternalBlockReader, error) {
     735            1 :         rp.mu.Lock()
     736            1 :         defer rp.mu.Unlock()
     737            1 : 
     738            1 :         if rp.mu.refCount > 0 {
     739            0 :                 // We already have a value.
     740            0 :                 rp.mu.refCount++
     741            0 :                 return rp.mu.r.Value().mustSSTableReader(), nil
     742            0 :         }
     743              : 
     744              :         // Calling FindOrCreate gives us the responsibility of Unref()ing r, which
     745              :         // will happen when rp.mu.refCount reaches 0. Note that if the table is no
     746              :         // longer in the cache, FindOrCreate will need to do IO (through initFn in
     747              :         // NewFileCache) to initialize a new Reader. We hold rp.mu during this time so
     748              :         // that concurrent GetReader calls block until the Reader is created.
     749            1 :         r, err := rp.c.FindOrCreate(ctx, rp.key)
     750            1 :         if err != nil {
     751            0 :                 return nil, err
     752            0 :         }
     753            1 :         rp.mu.r = r
     754            1 :         rp.mu.refCount = 1
     755            1 :         return r.Value().mustSSTableReader(), nil
     756              : }
     757              : 
     758              : // Close implements sstable.ReaderProvider.
     759            1 : func (rp *tableCacheShardReaderProvider) Close() {
     760            1 :         rp.mu.Lock()
     761            1 :         defer rp.mu.Unlock()
     762            1 :         rp.mu.refCount--
     763            1 :         if rp.mu.refCount <= 0 {
     764            1 :                 if rp.mu.refCount < 0 {
     765            0 :                         panic("pebble: sstable.ReaderProvider misuse")
     766              :                 }
     767            1 :                 rp.mu.r.Unref()
     768            1 :                 rp.mu.r = genericcache.ValueRef[fileCacheKey, fileCacheValue]{}
     769              :         }
     770              : }
     771              : 
     772              : // getTableProperties returns sst table properties for the backing file.
     773              : //
     774              : // WARNING! If file is a virtual table, we return the properties of the physical
     775              : // table.
     776            0 : func (h *fileCacheHandle) getTableProperties(file *tableMetadata) (*sstable.Properties, error) {
     777            0 :         // Calling findOrCreate gives us the responsibility of decrementing v's refCount here
     778            0 :         v, err := h.findOrCreate(context.TODO(), file)
     779            0 :         if err != nil {
     780            0 :                 return nil, err
     781            0 :         }
     782            0 :         defer v.Unref()
     783            0 : 
     784            0 :         r := v.Value().mustSSTableReader()
     785            0 :         return &r.Properties, nil
     786              : }
     787              : 
     788              : type fileCacheValue struct {
     789              :         closeHook func()
     790              :         reader    io.Closer // *sstable.Reader
     791              :         isShared  bool
     792              : 
     793              :         // readerProvider is embedded here so that we only allocate it once as long as
     794              :         // the table stays in the cache. Its state is not always logically tied to
     795              :         // this specific fileCacheShard - if a table goes out of the cache and then
     796              :         // comes back in, the readerProvider in a now-defunct fileCacheValue can
     797              :         // still be used and will internally refer to the new fileCacheValue.
     798              :         readerProvider tableCacheShardReaderProvider
     799              : }
     800              : 
     801              : // mustSSTable retrieves the value's *sstable.Reader. It panics if the cached
     802              : // file is not a sstable (i.e., it is a blob file).
     803            1 : func (v *fileCacheValue) mustSSTableReader() *sstable.Reader {
     804            1 :         return v.reader.(*sstable.Reader)
     805            1 : }
     806              : 
     807              : // iterSet holds a set of iterators of various key kinds, all constructed over
     808              : // the same data structure (eg, an sstable). A subset of the fields may be
     809              : // populated depending on the `iterKinds` passed to newIters.
     810              : type iterSet struct {
     811              :         point         internalIterator
     812              :         rangeDeletion keyspan.FragmentIterator
     813              :         rangeKey      keyspan.FragmentIterator
     814              : }
     815              : 
     816              : // TODO(jackson): Consider adding methods for fast paths that check whether an
     817              : // iterator of a particular kind is nil, so that these call sites don't need to
     818              : // reach into the struct's fields directly.
     819              : 
     820              : // Point returns the contained point iterator. If there is no point iterator,
     821              : // Point returns a non-nil empty point iterator.
     822            1 : func (s *iterSet) Point() internalIterator {
     823            1 :         if s.point == nil {
     824            1 :                 return emptyIter
     825            1 :         }
     826            1 :         return s.point
     827              : }
     828              : 
     829              : // RangeDeletion returns the contained range deletion iterator. If there is no
     830              : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
     831              : // iterator.
     832            1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
     833            1 :         if s.rangeDeletion == nil {
     834            1 :                 return emptyKeyspanIter
     835            1 :         }
     836            1 :         return s.rangeDeletion
     837              : }
     838              : 
     839              : // RangeKey returns the contained range key iterator. If there is no range key
     840              : // iterator, RangeKey returns a non-nil empty keyspan iterator.
     841            1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
     842            1 :         if s.rangeKey == nil {
     843            0 :                 return emptyKeyspanIter
     844            0 :         }
     845            1 :         return s.rangeKey
     846              : }
     847              : 
     848              : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
     849              : // must be not be called on the constituent iterators.
     850            1 : func (s *iterSet) CloseAll() error {
     851            1 :         var err error
     852            1 :         if s.point != nil {
     853            1 :                 err = s.point.Close()
     854            1 :                 s.point = nil
     855            1 :         }
     856            1 :         if s.rangeDeletion != nil {
     857            1 :                 s.rangeDeletion.Close()
     858            1 :                 s.rangeDeletion = nil
     859            1 :         }
     860            1 :         if s.rangeKey != nil {
     861            1 :                 s.rangeKey.Close()
     862            1 :                 s.rangeKey = nil
     863            1 :         }
     864            1 :         return err
     865              : }
     866              : 
     867              : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
     868              : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
     869              : // represent a set of desired iterator kinds.
     870              : type iterKinds uint8
     871              : 
     872            1 : func (t iterKinds) Point() bool         { return (t & iterPointKeys) != 0 }
     873            1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
     874            1 : func (t iterKinds) RangeKey() bool      { return (t & iterRangeKeys) != 0 }
     875              : 
     876              : const (
     877              :         iterPointKeys iterKinds = 1 << iota
     878              :         iterRangeDeletions
     879              :         iterRangeKeys
     880              : )
        

Generated by: LCOV version 2.0-1