LCOV - code coverage report
Current view: top level - pebble - file_cache.go (source / functions) Hit Total Coverage
Test: 2024-12-24 08:17Z 78d53457 - tests + meta.lcov Lines: 726 802 90.5 %
Date: 2024-12-24 08:18:09 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "fmt"
      11             :         "io"
      12             :         "runtime/debug"
      13             :         "sync"
      14             :         "sync/atomic"
      15             :         "unsafe"
      16             : 
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/cache"
      20             :         "github.com/cockroachdb/pebble/internal/invariants"
      21             :         "github.com/cockroachdb/pebble/internal/keyspan"
      22             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      23             :         "github.com/cockroachdb/pebble/internal/manifest"
      24             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      25             :         "github.com/cockroachdb/pebble/objstorage"
      26             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      27             :         "github.com/cockroachdb/pebble/sstable"
      28             :         "github.com/cockroachdb/pebble/sstable/valblk"
      29             : )
      30             : 
      31             : var emptyIter = &errorIter{err: nil}
      32             : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
      33             : 
      34             : // tableNewIters creates new iterators (point, range deletion and/or range key)
      35             : // for the given file metadata. Which of the various iterator kinds the user is
      36             : // requesting is specified with the iterKinds bitmap.
      37             : //
      38             : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
      39             : // populated with iterators.
      40             : //
      41             : // If a point iterator is requested and the operation was successful,
      42             : // iters.point is guaranteed to be non-nil and must be closed when the caller is
      43             : // finished.
      44             : //
      45             : // If a range deletion or range key iterator is requested, the corresponding
      46             : // iterator may be nil if the table does not contain any keys of the
      47             : // corresponding kind. The returned iterSet type provides RangeDeletion() and
      48             : // RangeKey() convenience methods that return non-nil empty iterators that may
      49             : // be used if the caller requires a non-nil iterator.
      50             : //
      51             : // On error, all iterators are nil.
      52             : //
      53             : // The only (non-test) implementation of tableNewIters is
      54             : // fileCacheContainer.newIters().
      55             : type tableNewIters func(
      56             :         ctx context.Context,
      57             :         file *manifest.FileMetadata,
      58             :         opts *IterOptions,
      59             :         internalOpts internalIterOpts,
      60             :         kinds iterKinds,
      61             : ) (iterSet, error)
      62             : 
      63             : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
      64             : // for the rangedel iterator returned by tableNewIters.
      65           2 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
      66           2 :         return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      67           2 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
      68           2 :                 if err != nil {
      69           0 :                         return nil, err
      70           0 :                 }
      71           2 :                 return iters.RangeDeletion(), nil
      72             :         }
      73             : }
      74             : 
      75             : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
      76             : // for the range key iterator returned by tableNewIters.
      77           2 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
      78           2 :         return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      79           2 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
      80           2 :                 if err != nil {
      81           1 :                         return nil, err
      82           1 :                 }
      83           2 :                 return iters.RangeKey(), nil
      84             :         }
      85             : }
      86             : 
      87             : // fileCacheOpts contains the db specific fields of a file cache. This is stored
      88             : // in the fileCacheContainer along with the file cache.
      89             : //
      90             : // NB: It is important to make sure that the fields in this struct are
      91             : // read-only. Since the fields here are shared by every single fileCacheShard,
      92             : // if non read-only fields are updated, we could have unnecessary evictions of
      93             : // those fields, and the surrounding fields from the CPU caches.
      94             : type fileCacheOpts struct {
      95             :         // iterCount keeps track of how many iterators are open. It is used to keep
      96             :         // track of leaked iterators on a per-db level.
      97             :         iterCount *atomic.Int32
      98             : 
      99             :         loggerAndTracer   LoggerAndTracer
     100             :         cache             *cache.Cache
     101             :         cacheID           cache.ID
     102             :         objProvider       objstorage.Provider
     103             :         readerOpts        sstable.ReaderOptions
     104             :         sstStatsCollector *sstable.CategoryStatsCollector
     105             : }
     106             : 
     107             : // fileCacheContainer contains the file cache and fields which are unique to the
     108             : // DB.
     109             : type fileCacheContainer struct {
     110             :         fileCache *FileCache
     111             : 
     112             :         // dbOpts contains fields relevant to the file cache which are unique to
     113             :         // each DB.
     114             :         dbOpts fileCacheOpts
     115             : }
     116             : 
     117             : // newFileCacheContainer will panic if the underlying block cache in the file
     118             : // cache doesn't match Options.Cache.
     119             : func newFileCacheContainer(
     120             :         fc *FileCache,
     121             :         cacheID cache.ID,
     122             :         objProvider objstorage.Provider,
     123             :         opts *Options,
     124             :         size int,
     125             :         sstStatsCollector *sstable.CategoryStatsCollector,
     126           2 : ) *fileCacheContainer {
     127           2 :         // We will release a ref to the file cache acquired here when
     128           2 :         // fileCacheContainer.close is called.
     129           2 :         if fc != nil {
     130           1 :                 if fc.cache != opts.Cache {
     131           0 :                         panic("pebble: underlying cache for the file cache and db are different")
     132             :                 }
     133           1 :                 fc.Ref()
     134           2 :         } else {
     135           2 :                 // NewFileCache should create a ref to fc which the container should
     136           2 :                 // drop whenever it is closed.
     137           2 :                 fc = NewFileCache(opts.Cache, opts.Experimental.FileCacheShards, size)
     138           2 :         }
     139             : 
     140           2 :         t := &fileCacheContainer{}
     141           2 :         t.fileCache = fc
     142           2 :         t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
     143           2 :         t.dbOpts.cache = opts.Cache
     144           2 :         t.dbOpts.cacheID = cacheID
     145           2 :         t.dbOpts.objProvider = objProvider
     146           2 :         t.dbOpts.readerOpts = opts.MakeReaderOptions()
     147           2 :         t.dbOpts.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
     148           2 :         t.dbOpts.iterCount = new(atomic.Int32)
     149           2 :         t.dbOpts.sstStatsCollector = sstStatsCollector
     150           2 :         return t
     151             : }
     152             : 
     153             : // Before calling close, make sure that there will be no further need
     154             : // to access any of the files associated with the store.
     155           2 : func (c *fileCacheContainer) close() error {
     156           2 :         // We want to do some cleanup work here. Check for leaked iterators
     157           2 :         // by the DB using this container. Note that we'll still perform cleanup
     158           2 :         // below in the case that there are leaked iterators.
     159           2 :         var err error
     160           2 :         if v := c.dbOpts.iterCount.Load(); v > 0 {
     161           1 :                 err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
     162           1 :         }
     163             : 
     164             :         // Release nodes here.
     165           2 :         for _, shard := range c.fileCache.shards {
     166           2 :                 if shard != nil {
     167           2 :                         shard.removeDB(&c.dbOpts)
     168           2 :                 }
     169             :         }
     170           2 :         return firstError(err, c.fileCache.Unref())
     171             : }
     172             : 
     173             : func (c *fileCacheContainer) newIters(
     174             :         ctx context.Context,
     175             :         file *manifest.FileMetadata,
     176             :         opts *IterOptions,
     177             :         internalOpts internalIterOpts,
     178             :         kinds iterKinds,
     179           2 : ) (iterSet, error) {
     180           2 :         return c.fileCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
     181           2 : }
     182             : 
     183             : // getTableProperties returns the properties associated with the backing physical
     184             : // table if the input metadata belongs to a virtual sstable.
     185           1 : func (c *fileCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
     186           1 :         return c.fileCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
     187           1 : }
     188             : 
     189           2 : func (c *fileCacheContainer) evict(fileNum base.DiskFileNum) {
     190           2 :         c.fileCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
     191           2 : }
     192             : 
     193           2 : func (c *fileCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
     194           2 :         var m CacheMetrics
     195           2 :         for i := range c.fileCache.shards {
     196           2 :                 s := c.fileCache.shards[i]
     197           2 :                 s.mu.RLock()
     198           2 :                 m.Count += int64(len(s.mu.nodes))
     199           2 :                 s.mu.RUnlock()
     200           2 :                 m.Hits += s.hits.Load()
     201           2 :                 m.Misses += s.misses.Load()
     202           2 :         }
     203           2 :         m.Size = m.Count * int64(unsafe.Sizeof(fileCacheNode{})+unsafe.Sizeof(fileCacheValue{})+unsafe.Sizeof(sstable.Reader{}))
     204           2 :         f := c.dbOpts.readerOpts.FilterMetricsTracker.Load()
     205           2 :         return m, f
     206             : }
     207             : 
     208             : func (c *fileCacheContainer) estimateSize(
     209             :         meta *fileMetadata, lower, upper []byte,
     210           2 : ) (size uint64, err error) {
     211           2 :         c.withCommonReader(meta, func(cr sstable.CommonReader) error {
     212           2 :                 size, err = cr.EstimateDiskUsage(lower, upper)
     213           2 :                 return err
     214           2 :         })
     215           2 :         return size, err
     216             : }
     217             : 
     218             : // createCommonReader creates a Reader for this file.
     219           2 : func createCommonReader(v *fileCacheValue, file *fileMetadata) sstable.CommonReader {
     220           2 :         // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
     221           2 :         var cr sstable.CommonReader = v.reader
     222           2 :         if file.Virtual {
     223           2 :                 virtualReader := sstable.MakeVirtualReader(
     224           2 :                         v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
     225           2 :                 )
     226           2 :                 cr = &virtualReader
     227           2 :         }
     228           2 :         return cr
     229             : }
     230             : 
     231             : func (c *fileCacheContainer) withCommonReader(
     232             :         meta *fileMetadata, fn func(sstable.CommonReader) error,
     233           2 : ) error {
     234           2 :         s := c.fileCache.getShard(meta.FileBacking.DiskFileNum)
     235           2 :         v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
     236           2 :         defer s.unrefValue(v)
     237           2 :         if v.err != nil {
     238           0 :                 return v.err
     239           0 :         }
     240           2 :         return fn(createCommonReader(v, meta))
     241             : }
     242             : 
     243           2 : func (c *fileCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
     244           2 :         s := c.fileCache.getShard(meta.FileBacking.DiskFileNum)
     245           2 :         v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
     246           2 :         defer s.unrefValue(v)
     247           2 :         if v.err != nil {
     248           1 :                 return v.err
     249           1 :         }
     250           2 :         return fn(v.reader)
     251             : }
     252             : 
     253             : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
     254             : func (c *fileCacheContainer) withVirtualReader(
     255             :         meta virtualMeta, fn func(sstable.VirtualReader) error,
     256           2 : ) error {
     257           2 :         s := c.fileCache.getShard(meta.FileBacking.DiskFileNum)
     258           2 :         v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
     259           2 :         defer s.unrefValue(v)
     260           2 :         if v.err != nil {
     261           0 :                 return v.err
     262           0 :         }
     263           2 :         provider := c.dbOpts.objProvider
     264           2 :         objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
     265           2 :         if err != nil {
     266           0 :                 return err
     267           0 :         }
     268           2 :         return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
     269             : }
     270             : 
     271           2 : func (c *fileCacheContainer) iterCount() int64 {
     272           2 :         return int64(c.dbOpts.iterCount.Load())
     273           2 : }
     274             : 
     275             : // FileCache is a shareable cache for open files. Open files are exclusively
     276             : // sstable files today.
     277             : type FileCache struct {
     278             :         refs atomic.Int64
     279             : 
     280             :         cache  *Cache
     281             :         shards []*fileCacheShard
     282             : }
     283             : 
     284             : // Ref adds a reference to the file cache. Once a file cache is constructed, the
     285             : // cache only remains valid if there is at least one reference to it.
     286           1 : func (c *FileCache) Ref() {
     287           1 :         v := c.refs.Add(1)
     288           1 :         // We don't want the reference count to ever go from 0 -> 1,
     289           1 :         // cause a reference count of 0 implies that we've closed the cache.
     290           1 :         if v <= 1 {
     291           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     292             :         }
     293             : }
     294             : 
     295             : // Unref removes a reference to the file cache.
     296           2 : func (c *FileCache) Unref() error {
     297           2 :         v := c.refs.Add(-1)
     298           2 :         switch {
     299           1 :         case v < 0:
     300           1 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     301           2 :         case v == 0:
     302           2 :                 var err error
     303           2 :                 for i := range c.shards {
     304           2 :                         // The cache shard is not allocated yet, nothing to close.
     305           2 :                         if c.shards[i] == nil {
     306           0 :                                 continue
     307             :                         }
     308           2 :                         err = firstError(err, c.shards[i].Close())
     309             :                 }
     310             : 
     311             :                 // Unref the cache which we create a reference to when the file cache is
     312             :                 // first instantiated.
     313           2 :                 c.cache.Unref()
     314           2 :                 return err
     315             :         }
     316           1 :         return nil
     317             : }
     318             : 
     319             : // NewFileCache will create a new file cache with one outstanding reference. It
     320             : // is the callers responsibility to call Unref if they will no longer hold a
     321             : // reference to the file cache.
     322           2 : func NewFileCache(cache *Cache, numShards int, size int) *FileCache {
     323           2 :         if size == 0 {
     324           0 :                 panic("pebble: cannot create a file cache of size 0")
     325           2 :         } else if numShards == 0 {
     326           0 :                 panic("pebble: cannot create a file cache with 0 shards")
     327             :         }
     328             : 
     329           2 :         c := &FileCache{}
     330           2 :         c.cache = cache
     331           2 :         c.cache.Ref()
     332           2 : 
     333           2 :         c.shards = make([]*fileCacheShard, numShards)
     334           2 :         for i := range c.shards {
     335           2 :                 c.shards[i] = &fileCacheShard{}
     336           2 :                 c.shards[i].init(size / len(c.shards))
     337           2 :         }
     338             : 
     339             :         // Hold a ref to the cache here.
     340           2 :         c.refs.Store(1)
     341           2 : 
     342           2 :         return c
     343             : }
     344             : 
     345           2 : func (c *FileCache) getShard(fileNum base.DiskFileNum) *fileCacheShard {
     346           2 :         return c.shards[uint64(fileNum)%uint64(len(c.shards))]
     347           2 : }
     348             : 
     349             : type fileCacheKey struct {
     350             :         cacheID cache.ID
     351             :         fileNum base.DiskFileNum
     352             : }
     353             : 
     354             : type fileCacheShard struct {
     355             :         hits      atomic.Int64
     356             :         misses    atomic.Int64
     357             :         iterCount atomic.Int32
     358             : 
     359             :         size int
     360             : 
     361             :         mu struct {
     362             :                 sync.RWMutex
     363             :                 nodes map[fileCacheKey]*fileCacheNode
     364             :                 // The iters map is only created and populated in race builds.
     365             :                 iters map[io.Closer][]byte
     366             : 
     367             :                 handHot  *fileCacheNode
     368             :                 handCold *fileCacheNode
     369             :                 handTest *fileCacheNode
     370             : 
     371             :                 coldTarget int
     372             :                 sizeHot    int
     373             :                 sizeCold   int
     374             :                 sizeTest   int
     375             :         }
     376             :         releasing       sync.WaitGroup
     377             :         releasingCh     chan *fileCacheValue
     378             :         releaseLoopExit sync.WaitGroup
     379             : }
     380             : 
     381           2 : func (c *fileCacheShard) init(size int) {
     382           2 :         c.size = size
     383           2 : 
     384           2 :         c.mu.nodes = make(map[fileCacheKey]*fileCacheNode)
     385           2 :         c.mu.coldTarget = size
     386           2 :         c.releasingCh = make(chan *fileCacheValue, 100)
     387           2 :         c.releaseLoopExit.Add(1)
     388           2 :         go c.releaseLoop()
     389           2 : 
     390           2 :         if invariants.RaceEnabled {
     391           0 :                 c.mu.iters = make(map[io.Closer][]byte)
     392           0 :         }
     393             : }
     394             : 
     395           2 : func (c *fileCacheShard) releaseLoop() {
     396           2 :         defer c.releaseLoopExit.Done()
     397           2 :         for v := range c.releasingCh {
     398           2 :                 v.release(c)
     399           2 :         }
     400             : }
     401             : 
     402             : // checkAndIntersectFilters checks the specific table and block property filters
     403             : // for intersection with any available table and block-level properties. Returns
     404             : // true for ok if this table should be read by this iterator.
     405             : func (c *fileCacheShard) checkAndIntersectFilters(
     406             :         v *fileCacheValue,
     407             :         blockPropertyFilters []BlockPropertyFilter,
     408             :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
     409             :         syntheticSuffix sstable.SyntheticSuffix,
     410           2 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
     411           2 :         if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
     412           2 :                 filterer, err = sstable.IntersectsTable(
     413           2 :                         blockPropertyFilters,
     414           2 :                         boundLimitedFilter,
     415           2 :                         v.reader.Properties.UserProperties,
     416           2 :                         syntheticSuffix,
     417           2 :                 )
     418           2 :                 // NB: IntersectsTable will return a nil filterer if the table-level
     419           2 :                 // properties indicate there's no intersection with the provided filters.
     420           2 :                 if filterer == nil || err != nil {
     421           2 :                         return false, nil, err
     422           2 :                 }
     423             :         }
     424           2 :         return true, filterer, nil
     425             : }
     426             : 
     427             : func (c *fileCacheShard) newIters(
     428             :         ctx context.Context,
     429             :         file *manifest.FileMetadata,
     430             :         opts *IterOptions,
     431             :         internalOpts internalIterOpts,
     432             :         dbOpts *fileCacheOpts,
     433             :         kinds iterKinds,
     434           2 : ) (iterSet, error) {
     435           2 :         // TODO(sumeer): constructing the Reader should also use a plumbed context,
     436           2 :         // since parts of the sstable are read during the construction. The Reader
     437           2 :         // should not remember that context since the Reader can be long-lived.
     438           2 : 
     439           2 :         // Calling findNode gives us the responsibility of decrementing v's
     440           2 :         // refCount. If opening the underlying table resulted in error, then we
     441           2 :         // decrement this straight away. Otherwise, we pass that responsibility to
     442           2 :         // the sstable iterator, which decrements when it is closed.
     443           2 :         v := c.findNode(ctx, file.FileBacking, dbOpts)
     444           2 :         if v.err != nil {
     445           1 :                 defer c.unrefValue(v)
     446           1 :                 return iterSet{}, v.err
     447           1 :         }
     448             : 
     449             :         // Note: This suffers an allocation for virtual sstables.
     450           2 :         cr := createCommonReader(v, file)
     451           2 :         var iters iterSet
     452           2 :         var err error
     453           2 :         if kinds.RangeKey() && file.HasRangeKeys {
     454           2 :                 iters.rangeKey, err = c.newRangeKeyIter(ctx, v, file, cr, opts.SpanIterOptions())
     455           2 :         }
     456           2 :         if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
     457           2 :                 iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
     458           2 :         }
     459           2 :         if kinds.Point() && err == nil {
     460           2 :                 iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
     461           2 :         }
     462           2 :         if err != nil {
     463           1 :                 // NB: There's a subtlety here: Because the point iterator is the last
     464           1 :                 // iterator we attempt to create, it's not possible for:
     465           1 :                 //   err != nil && iters.point != nil
     466           1 :                 // If it were possible, we'd need to account for it to avoid double
     467           1 :                 // unref-ing here, once during CloseAll and once during `unrefValue`.
     468           1 :                 iters.CloseAll()
     469           1 :                 c.unrefValue(v)
     470           1 :                 return iterSet{}, err
     471           1 :         }
     472             :         // Only point iterators ever require the reader stay pinned in the cache. If
     473             :         // we're not returning a point iterator to the caller, we need to unref v.
     474           2 :         if iters.point == nil {
     475           2 :                 c.unrefValue(v)
     476           2 :         }
     477           2 :         return iters, nil
     478             : }
     479             : 
     480             : // For flushable ingests, we decide whether to use the bloom filter base on
     481             : // size.
     482             : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
     483             : 
     484             : // newPointIter is an internal helper that constructs a point iterator over a
     485             : // sstable. This function is for internal use only, and callers should use
     486             : // newIters instead.
     487             : func (c *fileCacheShard) newPointIter(
     488             :         ctx context.Context,
     489             :         v *fileCacheValue,
     490             :         file *manifest.FileMetadata,
     491             :         cr sstable.CommonReader,
     492             :         opts *IterOptions,
     493             :         internalOpts internalIterOpts,
     494             :         dbOpts *fileCacheOpts,
     495           2 : ) (internalIterator, error) {
     496           2 :         var (
     497           2 :                 hideObsoletePoints bool = false
     498           2 :                 pointKeyFilters    []BlockPropertyFilter
     499           2 :                 filterer           *sstable.BlockPropertiesFilterer
     500           2 :         )
     501           2 :         if opts != nil {
     502           2 :                 // This code is appending (at most one filter) in-place to
     503           2 :                 // opts.PointKeyFilters even though the slice is shared for iterators in
     504           2 :                 // the same iterator tree. This is acceptable since all the following
     505           2 :                 // properties are true:
     506           2 :                 // - The iterator tree is single threaded, so the shared backing for the
     507           2 :                 //   slice is being mutated in a single threaded manner.
     508           2 :                 // - Each shallow copy of the slice has its own notion of length.
     509           2 :                 // - The appended element is always the obsoleteKeyBlockPropertyFilter
     510           2 :                 //   struct, which is stateless, so overwriting that struct when creating
     511           2 :                 //   one sstable iterator is harmless to other sstable iterators that are
     512           2 :                 //   relying on that struct.
     513           2 :                 //
     514           2 :                 // An alternative would be to have different slices for different sstable
     515           2 :                 // iterators, but that requires more work to avoid allocations.
     516           2 :                 //
     517           2 :                 // TODO(bilal): for compaction reads of foreign sstables, we do hide
     518           2 :                 // obsolete points (see sstable.Reader.newCompactionIter) but we don't
     519           2 :                 // apply the obsolete block property filter. We could optimize this by
     520           2 :                 // applying the filter.
     521           2 :                 hideObsoletePoints, pointKeyFilters =
     522           2 :                         v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
     523           2 :                                 opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
     524           2 : 
     525           2 :                 var ok bool
     526           2 :                 var err error
     527           2 :                 ok, filterer, err = c.checkAndIntersectFilters(v, pointKeyFilters,
     528           2 :                         internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
     529           2 :                 if err != nil {
     530           0 :                         return nil, err
     531           2 :                 } else if !ok {
     532           2 :                         // No point keys within the table match the filters.
     533           2 :                         return nil, nil
     534           2 :                 }
     535             :         }
     536             : 
     537           2 :         var iter sstable.Iterator
     538           2 :         filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
     539           2 :         if opts != nil {
     540           2 :                 // By default, we don't use block filters for L6 and restrict the size for
     541           2 :                 // flushable ingests, as these blocks can be very big.
     542           2 :                 if !opts.UseL6Filters {
     543           2 :                         if opts.layer == manifest.Level(6) {
     544           2 :                                 filterBlockSizeLimit = sstable.NeverUseFilterBlock
     545           2 :                         } else if opts.layer.IsFlushableIngests() {
     546           2 :                                 filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
     547           2 :                         }
     548             :                 }
     549           2 :                 if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
     550           2 :                         ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
     551           2 :                 }
     552             :         }
     553           2 :         tableFormat, err := v.reader.TableFormat()
     554           2 :         if err != nil {
     555           0 :                 return nil, err
     556           0 :         }
     557             : 
     558           2 :         if v.isShared && file.SyntheticSeqNum() != 0 {
     559           2 :                 if tableFormat < sstable.TableFormatPebblev4 {
     560           0 :                         return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
     561           0 :                 }
     562             :                 // The table is shared and ingested.
     563           2 :                 hideObsoletePoints = true
     564             :         }
     565           2 :         transforms := file.IterTransforms()
     566           2 :         transforms.HideObsoletePoints = hideObsoletePoints
     567           2 :         iterStatsAccum := internalOpts.iterStatsAccumulator
     568           2 :         if iterStatsAccum == nil && opts != nil && dbOpts.sstStatsCollector != nil {
     569           2 :                 iterStatsAccum = dbOpts.sstStatsCollector.Accumulator(
     570           2 :                         uint64(uintptr(unsafe.Pointer(v.reader))), opts.Category)
     571           2 :         }
     572           2 :         if internalOpts.compaction {
     573           2 :                 iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, &v.readerProvider, internalOpts.bufferPool)
     574           2 :         } else {
     575           2 :                 iter, err = cr.NewPointIter(
     576           2 :                         ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit,
     577           2 :                         internalOpts.stats, iterStatsAccum, &v.readerProvider)
     578           2 :         }
     579           2 :         if err != nil {
     580           1 :                 return nil, err
     581           1 :         }
     582             :         // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
     583             :         // care to avoid introducing an allocation here by adding a closure.
     584           2 :         iter.SetCloseHook(v.closeHook)
     585           2 :         c.iterCount.Add(1)
     586           2 :         dbOpts.iterCount.Add(1)
     587           2 :         if invariants.RaceEnabled {
     588           0 :                 c.mu.Lock()
     589           0 :                 c.mu.iters[iter] = debug.Stack()
     590           0 :                 c.mu.Unlock()
     591           0 :         }
     592           2 :         return iter, nil
     593             : }
     594             : 
     595             : // newRangeDelIter is an internal helper that constructs an iterator over a
     596             : // sstable's range deletions. This function is for table-cache internal use
     597             : // only, and callers should use newIters instead.
     598             : func (c *fileCacheShard) newRangeDelIter(
     599             :         ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *fileCacheOpts,
     600           2 : ) (keyspan.FragmentIterator, error) {
     601           2 :         // NB: range-del iterator does not maintain a reference to the table, nor
     602           2 :         // does it need to read from it after creation.
     603           2 :         rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms())
     604           2 :         if err != nil {
     605           1 :                 return nil, err
     606           1 :         }
     607             :         // Assert expected bounds in tests.
     608           2 :         if invariants.Sometimes(50) && rangeDelIter != nil {
     609           2 :                 cmp := base.DefaultComparer.Compare
     610           2 :                 if dbOpts.readerOpts.Comparer != nil {
     611           2 :                         cmp = dbOpts.readerOpts.Comparer.Compare
     612           2 :                 }
     613           2 :                 rangeDelIter = keyspan.AssertBounds(
     614           2 :                         rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
     615           2 :                 )
     616             :         }
     617           2 :         return rangeDelIter, nil
     618             : }
     619             : 
     620             : // newRangeKeyIter is an internal helper that constructs an iterator over a
     621             : // sstable's range keys. This function is for table-cache internal use only, and
     622             : // callers should use newIters instead.
     623             : func (c *fileCacheShard) newRangeKeyIter(
     624             :         ctx context.Context,
     625             :         v *fileCacheValue,
     626             :         file *fileMetadata,
     627             :         cr sstable.CommonReader,
     628             :         opts keyspan.SpanIterOptions,
     629           2 : ) (keyspan.FragmentIterator, error) {
     630           2 :         transforms := file.FragmentIterTransforms()
     631           2 :         // Don't filter a table's range keys if the file contains RANGEKEYDELs.
     632           2 :         // The RANGEKEYDELs may delete range keys in other levels. Skipping the
     633           2 :         // file's range key blocks may surface deleted range keys below. This is
     634           2 :         // done here, rather than deferring to the block-property collector in order
     635           2 :         // to maintain parity with point keys and the treatment of RANGEDELs.
     636           2 :         if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
     637           0 :                 ok, _, err := c.checkAndIntersectFilters(v, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
     638           0 :                 if err != nil {
     639           0 :                         return nil, err
     640           0 :                 } else if !ok {
     641           0 :                         return nil, nil
     642           0 :                 }
     643             :         }
     644             :         // TODO(radu): wrap in an AssertBounds.
     645           2 :         return cr.NewRawRangeKeyIter(ctx, transforms)
     646             : }
     647             : 
     648             : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
     649             : // specific table.
     650             : type tableCacheShardReaderProvider struct {
     651             :         c              *fileCacheShard
     652             :         dbOpts         *fileCacheOpts
     653             :         backingFileNum base.DiskFileNum
     654             : 
     655             :         mu struct {
     656             :                 sync.Mutex
     657             :                 // v is the result of findNode. Whenever it is not null, we hold a refcount
     658             :                 // on the fileCacheValue.
     659             :                 v *fileCacheValue
     660             :                 // refCount is the number of GetReader() calls that have not received a
     661             :                 // corresponding Close().
     662             :                 refCount int
     663             :         }
     664             : }
     665             : 
     666             : var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{}
     667             : 
     668             : func (rp *tableCacheShardReaderProvider) init(
     669             :         c *fileCacheShard, dbOpts *fileCacheOpts, backingFileNum base.DiskFileNum,
     670           2 : ) {
     671           2 :         rp.c = c
     672           2 :         rp.dbOpts = dbOpts
     673           2 :         rp.backingFileNum = backingFileNum
     674           2 :         rp.mu.v = nil
     675           2 :         rp.mu.refCount = 0
     676           2 : }
     677             : 
     678             : // GetReader implements sstable.ReaderProvider. Note that it is not the
     679             : // responsibility of tableCacheShardReaderProvider to ensure that the file
     680             : // continues to exist. The ReaderProvider is used in iterators where the
     681             : // top-level iterator is pinning the read state and preventing the files from
     682             : // being deleted.
     683             : //
     684             : // The caller must call tableCacheShardReaderProvider.Close.
     685             : //
     686             : // Note that currently the Reader returned here is only used to read value
     687             : // blocks. This reader shouldn't be used for other purposes like reading keys
     688             : // outside of virtual sstable bounds.
     689             : //
     690             : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
     691             : // that the reader isn't used for other purposes.
     692             : func (rp *tableCacheShardReaderProvider) GetReader(
     693             :         ctx context.Context,
     694           2 : ) (valblk.ExternalBlockReader, error) {
     695           2 :         rp.mu.Lock()
     696           2 :         defer rp.mu.Unlock()
     697           2 : 
     698           2 :         if rp.mu.v != nil {
     699           0 :                 rp.mu.refCount++
     700           0 :                 return rp.mu.v.reader, nil
     701           0 :         }
     702             : 
     703             :         // Calling findNodeInternal gives us the responsibility of decrementing v's
     704             :         // refCount. Note that if the table is no longer in the cache,
     705             :         // findNodeInternal will need to do IO to initialize a new Reader. We hold
     706             :         // rp.mu during this time so that concurrent GetReader calls block until the
     707             :         // Reader is created.
     708           2 :         v := rp.c.findNodeInternal(ctx, rp.backingFileNum, rp.dbOpts)
     709           2 :         if v.err != nil {
     710           0 :                 defer rp.c.unrefValue(v)
     711           0 :                 return nil, v.err
     712           0 :         }
     713           2 :         rp.mu.v = v
     714           2 :         rp.mu.refCount = 1
     715           2 :         return v.reader, nil
     716             : }
     717             : 
     718             : // Close implements sstable.ReaderProvider.
     719           2 : func (rp *tableCacheShardReaderProvider) Close() {
     720           2 :         rp.mu.Lock()
     721           2 :         defer rp.mu.Unlock()
     722           2 :         rp.mu.refCount--
     723           2 :         if rp.mu.refCount <= 0 {
     724           2 :                 if rp.mu.refCount < 0 {
     725           0 :                         panic("pebble: sstable.ReaderProvider misuse")
     726             :                 }
     727           2 :                 rp.c.unrefValue(rp.mu.v)
     728           2 :                 rp.mu.v = nil
     729             :         }
     730             : }
     731             : 
     732             : // getTableProperties return sst table properties for target file
     733             : func (c *fileCacheShard) getTableProperties(
     734             :         file *fileMetadata, dbOpts *fileCacheOpts,
     735           1 : ) (*sstable.Properties, error) {
     736           1 :         // Calling findNode gives us the responsibility of decrementing v's refCount here
     737           1 :         v := c.findNode(context.TODO(), file.FileBacking, dbOpts)
     738           1 :         defer c.unrefValue(v)
     739           1 : 
     740           1 :         if v.err != nil {
     741           0 :                 return nil, v.err
     742           0 :         }
     743           1 :         return &v.reader.Properties, nil
     744             : }
     745             : 
     746             : // releaseNode releases a node from the fileCacheShard.
     747             : //
     748             : // c.mu must be held when calling this.
     749           1 : func (c *fileCacheShard) releaseNode(n *fileCacheNode) {
     750           1 :         c.unlinkNode(n)
     751           1 :         c.clearNode(n)
     752           1 : }
     753             : 
     754             : // unlinkNode removes a node from the fileCacheShard, leaving the shard
     755             : // reference in place.
     756             : //
     757             : // c.mu must be held when calling this.
     758           2 : func (c *fileCacheShard) unlinkNode(n *fileCacheNode) {
     759           2 :         key := fileCacheKey{n.cacheID, n.fileNum}
     760           2 :         delete(c.mu.nodes, key)
     761           2 : 
     762           2 :         switch n.ptype {
     763           2 :         case fileCacheNodeHot:
     764           2 :                 c.mu.sizeHot--
     765           2 :         case fileCacheNodeCold:
     766           2 :                 c.mu.sizeCold--
     767           2 :         case fileCacheNodeTest:
     768           2 :                 c.mu.sizeTest--
     769             :         }
     770             : 
     771           2 :         if n == c.mu.handHot {
     772           2 :                 c.mu.handHot = c.mu.handHot.prev()
     773           2 :         }
     774           2 :         if n == c.mu.handCold {
     775           2 :                 c.mu.handCold = c.mu.handCold.prev()
     776           2 :         }
     777           2 :         if n == c.mu.handTest {
     778           2 :                 c.mu.handTest = c.mu.handTest.prev()
     779           2 :         }
     780             : 
     781           2 :         if n.unlink() == n {
     782           2 :                 // This was the last entry in the cache.
     783           2 :                 c.mu.handHot = nil
     784           2 :                 c.mu.handCold = nil
     785           2 :                 c.mu.handTest = nil
     786           2 :         }
     787             : 
     788           2 :         n.links.prev = nil
     789           2 :         n.links.next = nil
     790             : }
     791             : 
     792           2 : func (c *fileCacheShard) clearNode(n *fileCacheNode) {
     793           2 :         if v := n.value; v != nil {
     794           2 :                 n.value = nil
     795           2 :                 c.unrefValue(v)
     796           2 :         }
     797             : }
     798             : 
     799             : // unrefValue decrements the reference count for the specified value, releasing
     800             : // it if the reference count fell to 0. Note that the value has a reference if
     801             : // it is present in fileCacheShard.mu.nodes, so a reference count of 0 means the
     802             : // node has already been removed from that map.
     803           2 : func (c *fileCacheShard) unrefValue(v *fileCacheValue) {
     804           2 :         if v.refCount.Add(-1) == 0 {
     805           2 :                 c.releasing.Add(1)
     806           2 :                 c.releasingCh <- v
     807           2 :         }
     808             : }
     809             : 
     810             : // findNode returns the node for the table with the given file number, creating
     811             : // that node if it didn't already exist. The caller is responsible for
     812             : // decrementing the returned node's refCount.
     813             : func (c *fileCacheShard) findNode(
     814             :         ctx context.Context, b *fileBacking, dbOpts *fileCacheOpts,
     815           2 : ) *fileCacheValue {
     816           2 :         // The backing must have a positive refcount (otherwise it could be deleted at any time).
     817           2 :         b.MustHaveRefs()
     818           2 :         // Caution! Here fileMetadata can be a physical or virtual table. File cache
     819           2 :         // sstable  readers are associated with the physical backings. All virtual
     820           2 :         // tables with the same backing will use the same reader from the cache; so
     821           2 :         // no information that can differ among these virtual tables can be passed
     822           2 :         // to findNodeInternal.
     823           2 :         backingFileNum := b.DiskFileNum
     824           2 : 
     825           2 :         return c.findNodeInternal(ctx, backingFileNum, dbOpts)
     826           2 : }
     827             : 
     828             : func (c *fileCacheShard) findNodeInternal(
     829             :         ctx context.Context, backingFileNum base.DiskFileNum, dbOpts *fileCacheOpts,
     830           2 : ) *fileCacheValue {
     831           2 :         // Fast-path for a hit in the cache.
     832           2 :         c.mu.RLock()
     833           2 :         key := fileCacheKey{dbOpts.cacheID, backingFileNum}
     834           2 :         if n := c.mu.nodes[key]; n != nil && n.value != nil {
     835           2 :                 // Fast-path hit.
     836           2 :                 //
     837           2 :                 // The caller is responsible for decrementing the refCount.
     838           2 :                 v := n.value
     839           2 :                 v.refCount.Add(1)
     840           2 :                 c.mu.RUnlock()
     841           2 :                 n.referenced.Store(true)
     842           2 :                 c.hits.Add(1)
     843           2 :                 <-v.loaded
     844           2 :                 return v
     845           2 :         }
     846           2 :         c.mu.RUnlock()
     847           2 : 
     848           2 :         c.mu.Lock()
     849           2 : 
     850           2 :         n := c.mu.nodes[key]
     851           2 :         switch {
     852           2 :         case n == nil:
     853           2 :                 // Slow-path miss of a non-existent node.
     854           2 :                 n = &fileCacheNode{
     855           2 :                         fileNum: backingFileNum,
     856           2 :                         ptype:   fileCacheNodeCold,
     857           2 :                 }
     858           2 :                 c.addNode(n, dbOpts)
     859           2 :                 c.mu.sizeCold++
     860             : 
     861           2 :         case n.value != nil:
     862           2 :                 // Slow-path hit of a hot or cold node.
     863           2 :                 //
     864           2 :                 // The caller is responsible for decrementing the refCount.
     865           2 :                 v := n.value
     866           2 :                 v.refCount.Add(1)
     867           2 :                 n.referenced.Store(true)
     868           2 :                 c.hits.Add(1)
     869           2 :                 c.mu.Unlock()
     870           2 :                 <-v.loaded
     871           2 :                 return v
     872             : 
     873           2 :         default:
     874           2 :                 // Slow-path miss of a test node.
     875           2 :                 c.unlinkNode(n)
     876           2 :                 c.mu.coldTarget++
     877           2 :                 if c.mu.coldTarget > c.size {
     878           2 :                         c.mu.coldTarget = c.size
     879           2 :                 }
     880             : 
     881           2 :                 n.referenced.Store(false)
     882           2 :                 n.ptype = fileCacheNodeHot
     883           2 :                 c.addNode(n, dbOpts)
     884           2 :                 c.mu.sizeHot++
     885             :         }
     886             : 
     887           2 :         c.misses.Add(1)
     888           2 : 
     889           2 :         v := &fileCacheValue{
     890           2 :                 loaded: make(chan struct{}),
     891           2 :         }
     892           2 :         v.readerProvider.init(c, dbOpts, backingFileNum)
     893           2 :         v.refCount.Store(2)
     894           2 :         // Cache the closure invoked when an iterator is closed. This avoids an
     895           2 :         // allocation on every call to newIters.
     896           2 :         v.closeHook = func(i sstable.Iterator) error {
     897           2 :                 if invariants.RaceEnabled {
     898           0 :                         c.mu.Lock()
     899           0 :                         delete(c.mu.iters, i)
     900           0 :                         c.mu.Unlock()
     901           0 :                 }
     902           2 :                 c.unrefValue(v)
     903           2 :                 c.iterCount.Add(-1)
     904           2 :                 dbOpts.iterCount.Add(-1)
     905           2 :                 return nil
     906             :         }
     907           2 :         n.value = v
     908           2 : 
     909           2 :         c.mu.Unlock()
     910           2 : 
     911           2 :         // Note adding to the cache lists must complete before we begin loading the
     912           2 :         // table as a failure during load will result in the node being unlinked.
     913           2 :         v.load(ctx, backingFileNum, c, dbOpts)
     914           2 :         return v
     915             : }
     916             : 
     917           2 : func (c *fileCacheShard) addNode(n *fileCacheNode, dbOpts *fileCacheOpts) {
     918           2 :         c.evictNodes()
     919           2 :         n.cacheID = dbOpts.cacheID
     920           2 :         key := fileCacheKey{n.cacheID, n.fileNum}
     921           2 :         c.mu.nodes[key] = n
     922           2 : 
     923           2 :         n.links.next = n
     924           2 :         n.links.prev = n
     925           2 :         if c.mu.handHot == nil {
     926           2 :                 // First element.
     927           2 :                 c.mu.handHot = n
     928           2 :                 c.mu.handCold = n
     929           2 :                 c.mu.handTest = n
     930           2 :         } else {
     931           2 :                 c.mu.handHot.link(n)
     932           2 :         }
     933             : 
     934           2 :         if c.mu.handCold == c.mu.handHot {
     935           2 :                 c.mu.handCold = c.mu.handCold.prev()
     936           2 :         }
     937             : }
     938             : 
     939           2 : func (c *fileCacheShard) evictNodes() {
     940           2 :         for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
     941           2 :                 c.runHandCold()
     942           2 :         }
     943             : }
     944             : 
     945           2 : func (c *fileCacheShard) runHandCold() {
     946           2 :         n := c.mu.handCold
     947           2 :         if n.ptype == fileCacheNodeCold {
     948           2 :                 if n.referenced.Load() {
     949           2 :                         n.referenced.Store(false)
     950           2 :                         n.ptype = fileCacheNodeHot
     951           2 :                         c.mu.sizeCold--
     952           2 :                         c.mu.sizeHot++
     953           2 :                 } else {
     954           2 :                         c.clearNode(n)
     955           2 :                         n.ptype = fileCacheNodeTest
     956           2 :                         c.mu.sizeCold--
     957           2 :                         c.mu.sizeTest++
     958           2 :                         for c.size < c.mu.sizeTest && c.mu.handTest != nil {
     959           2 :                                 c.runHandTest()
     960           2 :                         }
     961             :                 }
     962             :         }
     963             : 
     964           2 :         c.mu.handCold = c.mu.handCold.next()
     965           2 : 
     966           2 :         for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
     967           2 :                 c.runHandHot()
     968           2 :         }
     969             : }
     970             : 
     971           2 : func (c *fileCacheShard) runHandHot() {
     972           2 :         if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
     973           2 :                 c.runHandTest()
     974           2 :                 if c.mu.handHot == nil {
     975           0 :                         return
     976           0 :                 }
     977             :         }
     978             : 
     979           2 :         n := c.mu.handHot
     980           2 :         if n.ptype == fileCacheNodeHot {
     981           2 :                 if n.referenced.Load() {
     982           2 :                         n.referenced.Store(false)
     983           2 :                 } else {
     984           2 :                         n.ptype = fileCacheNodeCold
     985           2 :                         c.mu.sizeHot--
     986           2 :                         c.mu.sizeCold++
     987           2 :                 }
     988             :         }
     989             : 
     990           2 :         c.mu.handHot = c.mu.handHot.next()
     991             : }
     992             : 
     993           2 : func (c *fileCacheShard) runHandTest() {
     994           2 :         if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
     995           2 :                 c.runHandCold()
     996           2 :                 if c.mu.handTest == nil {
     997           0 :                         return
     998           0 :                 }
     999             :         }
    1000             : 
    1001           2 :         n := c.mu.handTest
    1002           2 :         if n.ptype == fileCacheNodeTest {
    1003           2 :                 c.mu.coldTarget--
    1004           2 :                 if c.mu.coldTarget < 0 {
    1005           2 :                         c.mu.coldTarget = 0
    1006           2 :                 }
    1007           2 :                 c.unlinkNode(n)
    1008           2 :                 c.clearNode(n)
    1009             :         }
    1010             : 
    1011           2 :         c.mu.handTest = c.mu.handTest.next()
    1012             : }
    1013             : 
    1014           2 : func (c *fileCacheShard) evict(fileNum base.DiskFileNum, dbOpts *fileCacheOpts, allowLeak bool) {
    1015           2 :         c.mu.Lock()
    1016           2 :         key := fileCacheKey{dbOpts.cacheID, fileNum}
    1017           2 :         n := c.mu.nodes[key]
    1018           2 :         var v *fileCacheValue
    1019           2 :         if n != nil {
    1020           2 :                 // NB: This is equivalent to fileCacheShard.releaseNode(), but we
    1021           2 :                 // perform the fileCacheShard.release() call synchronously below to
    1022           2 :                 // ensure the sstable file descriptor is closed before returning. Note
    1023           2 :                 // that fileCacheShard.releasing needs to be incremented while holding
    1024           2 :                 // fileCacheShard.mu in order to avoid a race with Close()
    1025           2 :                 c.unlinkNode(n)
    1026           2 :                 v = n.value
    1027           2 :                 if v != nil {
    1028           2 :                         if !allowLeak {
    1029           2 :                                 if t := v.refCount.Add(-1); t != 0 {
    1030           0 :                                         dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
    1031           0 :                                 }
    1032             :                         }
    1033           2 :                         c.releasing.Add(1)
    1034             :                 }
    1035             :         }
    1036             : 
    1037           2 :         c.mu.Unlock()
    1038           2 : 
    1039           2 :         if v != nil {
    1040           2 :                 v.release(c)
    1041           2 :         }
    1042             : 
    1043           2 :         dbOpts.cache.EvictFile(dbOpts.cacheID, fileNum)
    1044             : }
    1045             : 
    1046             : // removeDB evicts any nodes which have a reference to the DB
    1047             : // associated with dbOpts.cacheID. Make sure that there will
    1048             : // be no more accesses to the files associated with the DB.
    1049           2 : func (c *fileCacheShard) removeDB(dbOpts *fileCacheOpts) {
    1050           2 :         var fileNums []base.DiskFileNum
    1051           2 : 
    1052           2 :         c.mu.RLock()
    1053           2 :         // Collect the fileNums which need to be cleaned.
    1054           2 :         var firstNode *fileCacheNode
    1055           2 :         node := c.mu.handHot
    1056           2 :         for node != firstNode {
    1057           2 :                 if firstNode == nil {
    1058           2 :                         firstNode = node
    1059           2 :                 }
    1060             : 
    1061           2 :                 if node.cacheID == dbOpts.cacheID {
    1062           2 :                         fileNums = append(fileNums, node.fileNum)
    1063           2 :                 }
    1064           2 :                 node = node.next()
    1065             :         }
    1066           2 :         c.mu.RUnlock()
    1067           2 : 
    1068           2 :         // Evict all the nodes associated with the DB.
    1069           2 :         // This should synchronously close all the files
    1070           2 :         // associated with the DB.
    1071           2 :         for _, fileNum := range fileNums {
    1072           2 :                 c.evict(fileNum, dbOpts, true)
    1073           2 :         }
    1074             : }
    1075             : 
    1076           2 : func (c *fileCacheShard) Close() error {
    1077           2 :         c.mu.Lock()
    1078           2 :         defer c.mu.Unlock()
    1079           2 : 
    1080           2 :         // Check for leaked iterators. Note that we'll still perform cleanup below in
    1081           2 :         // the case that there are leaked iterators.
    1082           2 :         var err error
    1083           2 :         if v := c.iterCount.Load(); v > 0 {
    1084           1 :                 if !invariants.RaceEnabled {
    1085           1 :                         err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
    1086           1 :                 } else {
    1087           0 :                         var buf bytes.Buffer
    1088           0 :                         for _, stack := range c.mu.iters {
    1089           0 :                                 fmt.Fprintf(&buf, "%s\n", stack)
    1090           0 :                         }
    1091           0 :                         err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
    1092             :                 }
    1093             :         }
    1094             : 
    1095           2 :         for c.mu.handHot != nil {
    1096           0 :                 n := c.mu.handHot
    1097           0 :                 if n.value != nil {
    1098           0 :                         if n.value.refCount.Add(-1) == 0 {
    1099           0 :                                 c.releasing.Add(1)
    1100           0 :                                 c.releasingCh <- n.value
    1101           0 :                         }
    1102             :                 }
    1103           0 :                 c.unlinkNode(n)
    1104             :         }
    1105           2 :         c.mu.nodes = nil
    1106           2 :         c.mu.handHot = nil
    1107           2 :         c.mu.handCold = nil
    1108           2 :         c.mu.handTest = nil
    1109           2 : 
    1110           2 :         // Only shutdown the releasing goroutine if there were no leaked
    1111           2 :         // iterators. If there were leaked iterators, we leave the goroutine running
    1112           2 :         // and the releasingCh open so that a subsequent iterator close can
    1113           2 :         // complete. This behavior is used by iterator leak tests. Leaking the
    1114           2 :         // goroutine for these tests is less bad not closing the iterator which
    1115           2 :         // triggers other warnings about block cache handles not being released.
    1116           2 :         if err != nil {
    1117           1 :                 c.releasing.Wait()
    1118           1 :                 return err
    1119           1 :         }
    1120             : 
    1121           2 :         close(c.releasingCh)
    1122           2 :         c.releasing.Wait()
    1123           2 :         c.releaseLoopExit.Wait()
    1124           2 :         return err
    1125             : }
    1126             : 
    1127             : type fileCacheValue struct {
    1128             :         closeHook func(i sstable.Iterator) error
    1129             :         reader    *sstable.Reader
    1130             :         err       error
    1131             :         loaded    chan struct{}
    1132             :         // Reference count for the value. The reader is closed when the reference
    1133             :         // count drops to zero.
    1134             :         refCount atomic.Int32
    1135             :         isShared bool
    1136             : 
    1137             :         // readerProvider is embedded here so that we only allocate it once as long as
    1138             :         // the table stays in the cache. Its state is not always logically tied to
    1139             :         // this specific fileCacheShard - if a table goes out of the cache and then
    1140             :         // comes back in, the readerProvider in a now-defunct fileCacheValue can
    1141             :         // still be used and will internally refer to the new fileCacheValue.
    1142             :         readerProvider tableCacheShardReaderProvider
    1143             : }
    1144             : 
    1145             : func (v *fileCacheValue) load(
    1146             :         ctx context.Context, backingFileNum base.DiskFileNum, c *fileCacheShard, dbOpts *fileCacheOpts,
    1147           2 : ) {
    1148           2 :         // Try opening the file first.
    1149           2 :         var f objstorage.Readable
    1150           2 :         var err error
    1151           2 :         f, err = dbOpts.objProvider.OpenForReading(
    1152           2 :                 ctx, fileTypeTable, backingFileNum, objstorage.OpenOptions{MustExist: true},
    1153           2 :         )
    1154           2 :         if err == nil {
    1155           2 :                 o := dbOpts.readerOpts
    1156           2 :                 o.SetInternalCacheOpts(sstableinternal.CacheOptions{
    1157           2 :                         Cache:   dbOpts.cache,
    1158           2 :                         CacheID: dbOpts.cacheID,
    1159           2 :                         FileNum: backingFileNum,
    1160           2 :                 })
    1161           2 :                 v.reader, err = sstable.NewReader(ctx, f, o)
    1162           2 :         }
    1163           2 :         if err == nil {
    1164           2 :                 var objMeta objstorage.ObjectMetadata
    1165           2 :                 objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, backingFileNum)
    1166           2 :                 v.isShared = objMeta.IsShared()
    1167           2 :         }
    1168           2 :         if err != nil {
    1169           1 :                 v.err = errors.Wrapf(
    1170           1 :                         err, "pebble: backing file %s error", backingFileNum)
    1171           1 :         }
    1172           2 :         if v.err != nil {
    1173           1 :                 c.mu.Lock()
    1174           1 :                 defer c.mu.Unlock()
    1175           1 :                 // Lookup the node in the cache again as it might have already been
    1176           1 :                 // removed.
    1177           1 :                 key := fileCacheKey{dbOpts.cacheID, backingFileNum}
    1178           1 :                 n := c.mu.nodes[key]
    1179           1 :                 if n != nil && n.value == v {
    1180           1 :                         c.releaseNode(n)
    1181           1 :                 }
    1182             :         }
    1183           2 :         close(v.loaded)
    1184             : }
    1185             : 
    1186           2 : func (v *fileCacheValue) release(c *fileCacheShard) {
    1187           2 :         <-v.loaded
    1188           2 :         // Nothing to be done about an error at this point. Close the reader if it is
    1189           2 :         // open.
    1190           2 :         if v.reader != nil {
    1191           2 :                 _ = v.reader.Close()
    1192           2 :         }
    1193           2 :         c.releasing.Done()
    1194             : }
    1195             : 
    1196             : type fileCacheNodeType int8
    1197             : 
    1198             : const (
    1199             :         fileCacheNodeTest fileCacheNodeType = iota
    1200             :         fileCacheNodeCold
    1201             :         fileCacheNodeHot
    1202             : )
    1203             : 
    1204           0 : func (p fileCacheNodeType) String() string {
    1205           0 :         switch p {
    1206           0 :         case fileCacheNodeTest:
    1207           0 :                 return "test"
    1208           0 :         case fileCacheNodeCold:
    1209           0 :                 return "cold"
    1210           0 :         case fileCacheNodeHot:
    1211           0 :                 return "hot"
    1212             :         }
    1213           0 :         return "unknown"
    1214             : }
    1215             : 
    1216             : type fileCacheNode struct {
    1217             :         fileNum base.DiskFileNum
    1218             :         value   *fileCacheValue
    1219             : 
    1220             :         links struct {
    1221             :                 next *fileCacheNode
    1222             :                 prev *fileCacheNode
    1223             :         }
    1224             :         ptype fileCacheNodeType
    1225             :         // referenced is atomically set to indicate that this entry has been accessed
    1226             :         // since the last time one of the clock hands swept it.
    1227             :         referenced atomic.Bool
    1228             : 
    1229             :         // Storing the cache id associated with the DB instance here
    1230             :         // avoids the need to thread the dbOpts struct through many functions.
    1231             :         cacheID cache.ID
    1232             : }
    1233             : 
    1234           2 : func (n *fileCacheNode) next() *fileCacheNode {
    1235           2 :         if n == nil {
    1236           0 :                 return nil
    1237           0 :         }
    1238           2 :         return n.links.next
    1239             : }
    1240             : 
    1241           2 : func (n *fileCacheNode) prev() *fileCacheNode {
    1242           2 :         if n == nil {
    1243           0 :                 return nil
    1244           0 :         }
    1245           2 :         return n.links.prev
    1246             : }
    1247             : 
    1248           2 : func (n *fileCacheNode) link(s *fileCacheNode) {
    1249           2 :         s.links.prev = n.links.prev
    1250           2 :         s.links.prev.links.next = s
    1251           2 :         s.links.next = n
    1252           2 :         s.links.next.links.prev = s
    1253           2 : }
    1254             : 
    1255           2 : func (n *fileCacheNode) unlink() *fileCacheNode {
    1256           2 :         next := n.links.next
    1257           2 :         n.links.prev.links.next = n.links.next
    1258           2 :         n.links.next.links.prev = n.links.prev
    1259           2 :         n.links.prev = n
    1260           2 :         n.links.next = n
    1261           2 :         return next
    1262           2 : }
    1263             : 
    1264             : // iterSet holds a set of iterators of various key kinds, all constructed over
    1265             : // the same data structure (eg, an sstable). A subset of the fields may be
    1266             : // populated depending on the `iterKinds` passed to newIters.
    1267             : type iterSet struct {
    1268             :         point         internalIterator
    1269             :         rangeDeletion keyspan.FragmentIterator
    1270             :         rangeKey      keyspan.FragmentIterator
    1271             : }
    1272             : 
    1273             : // TODO(jackson): Consider adding methods for fast paths that check whether an
    1274             : // iterator of a particular kind is nil, so that these call sites don't need to
    1275             : // reach into the struct's fields directly.
    1276             : 
    1277             : // Point returns the contained point iterator. If there is no point iterator,
    1278             : // Point returns a non-nil empty point iterator.
    1279           2 : func (s *iterSet) Point() internalIterator {
    1280           2 :         if s.point == nil {
    1281           2 :                 return emptyIter
    1282           2 :         }
    1283           2 :         return s.point
    1284             : }
    1285             : 
    1286             : // RangeDeletion returns the contained range deletion iterator. If there is no
    1287             : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
    1288             : // iterator.
    1289           2 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
    1290           2 :         if s.rangeDeletion == nil {
    1291           2 :                 return emptyKeyspanIter
    1292           2 :         }
    1293           2 :         return s.rangeDeletion
    1294             : }
    1295             : 
    1296             : // RangeKey returns the contained range key iterator. If there is no range key
    1297             : // iterator, RangeKey returns a non-nil empty keyspan iterator.
    1298           2 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
    1299           2 :         if s.rangeKey == nil {
    1300           0 :                 return emptyKeyspanIter
    1301           0 :         }
    1302           2 :         return s.rangeKey
    1303             : }
    1304             : 
    1305             : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
    1306             : // must be not be called on the constituent iterators.
    1307           2 : func (s *iterSet) CloseAll() error {
    1308           2 :         var err error
    1309           2 :         if s.point != nil {
    1310           2 :                 err = s.point.Close()
    1311           2 :                 s.point = nil
    1312           2 :         }
    1313           2 :         if s.rangeDeletion != nil {
    1314           2 :                 s.rangeDeletion.Close()
    1315           2 :                 s.rangeDeletion = nil
    1316           2 :         }
    1317           2 :         if s.rangeKey != nil {
    1318           2 :                 s.rangeKey.Close()
    1319           2 :                 s.rangeKey = nil
    1320           2 :         }
    1321           2 :         return err
    1322             : }
    1323             : 
    1324             : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
    1325             : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
    1326             : // represent a set of desired iterator kinds.
    1327             : type iterKinds uint8
    1328             : 
    1329           2 : func (t iterKinds) Point() bool         { return (t & iterPointKeys) != 0 }
    1330           2 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
    1331           2 : func (t iterKinds) RangeKey() bool      { return (t & iterRangeKeys) != 0 }
    1332             : 
    1333             : const (
    1334             :         iterPointKeys iterKinds = 1 << iota
    1335             :         iterRangeDeletions
    1336             :         iterRangeKeys
    1337             : )

Generated by: LCOV version 1.14