LCOV - code coverage report
Current view: top level - pebble - table_cache.go (source / functions) Hit Total Coverage
Test: 2024-01-03 08:16Z 1cce3d01 - tests + meta.lcov Lines: 712 797 89.3 %
Date: 2024-01-03 08:17:11 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "fmt"
      11             :         "io"
      12             :         "runtime/debug"
      13             :         "runtime/pprof"
      14             :         "sync"
      15             :         "sync/atomic"
      16             :         "unsafe"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble/internal/base"
      20             :         "github.com/cockroachdb/pebble/internal/invariants"
      21             :         "github.com/cockroachdb/pebble/internal/keyspan"
      22             :         "github.com/cockroachdb/pebble/internal/manifest"
      23             :         "github.com/cockroachdb/pebble/internal/private"
      24             :         "github.com/cockroachdb/pebble/objstorage"
      25             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      26             :         "github.com/cockroachdb/pebble/sstable"
      27             : )
      28             : 
      29             : var emptyIter = &errorIter{err: nil}
      30             : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
      31             : 
      32             : // filteredAll is a singleton internalIterator implementation used when an
      33             : // sstable does contain point keys, but all the keys are filtered by the active
      34             : // PointKeyFilters set in the iterator's IterOptions.
      35             : //
      36             : // filteredAll implements filteredIter, ensuring the level iterator recognizes
      37             : // when it may need to return file boundaries to keep the rangeDelIter open
      38             : // during mergingIter operation.
      39             : var filteredAll = &filteredAllKeysIter{errorIter: errorIter{err: nil}}
      40             : 
      41             : var _ filteredIter = filteredAll
      42             : 
      43             : type filteredAllKeysIter struct {
      44             :         errorIter
      45             : }
      46             : 
      47           1 : func (s *filteredAllKeysIter) MaybeFilteredKeys() bool {
      48           1 :         return true
      49           1 : }
      50             : 
      51             : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
      52             : 
      53             : // tableCacheOpts contains the db specific fields
      54             : // of a table cache. This is stored in the tableCacheContainer
      55             : // along with the table cache.
      56             : // NB: It is important to make sure that the fields in this
      57             : // struct are read-only. Since the fields here are shared
      58             : // by every single tableCacheShard, if non read-only fields
      59             : // are updated, we could have unnecessary evictions of those
      60             : // fields, and the surrounding fields from the CPU caches.
      61             : type tableCacheOpts struct {
      62             :         // iterCount keeps track of how many iterators are open. It is used to keep
      63             :         // track of leaked iterators on a per-db level.
      64             :         iterCount *atomic.Int32
      65             : 
      66             :         loggerAndTracer   LoggerAndTracer
      67             :         cacheID           uint64
      68             :         objProvider       objstorage.Provider
      69             :         opts              sstable.ReaderOptions
      70             :         filterMetrics     *sstable.FilterMetricsTracker
      71             :         sstStatsCollector *sstable.CategoryStatsCollector
      72             : }
      73             : 
      74             : // tableCacheContainer contains the table cache and
      75             : // fields which are unique to the DB.
      76             : type tableCacheContainer struct {
      77             :         tableCache *TableCache
      78             : 
      79             :         // dbOpts contains fields relevant to the table cache
      80             :         // which are unique to each DB.
      81             :         dbOpts tableCacheOpts
      82             : }
      83             : 
      84             : // newTableCacheContainer will panic if the underlying cache in the table cache
      85             : // doesn't match Options.Cache.
      86             : func newTableCacheContainer(
      87             :         tc *TableCache,
      88             :         cacheID uint64,
      89             :         objProvider objstorage.Provider,
      90             :         opts *Options,
      91             :         size int,
      92             :         sstStatsCollector *sstable.CategoryStatsCollector,
      93           2 : ) *tableCacheContainer {
      94           2 :         // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
      95           2 :         if tc != nil {
      96           1 :                 if tc.cache != opts.Cache {
      97           0 :                         panic("pebble: underlying cache for the table cache and db are different")
      98             :                 }
      99           1 :                 tc.Ref()
     100           2 :         } else {
     101           2 :                 // NewTableCache should create a ref to tc which the container should
     102           2 :                 // drop whenever it is closed.
     103           2 :                 tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
     104           2 :         }
     105             : 
     106           2 :         t := &tableCacheContainer{}
     107           2 :         t.tableCache = tc
     108           2 :         t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
     109           2 :         t.dbOpts.cacheID = cacheID
     110           2 :         t.dbOpts.objProvider = objProvider
     111           2 :         t.dbOpts.opts = opts.MakeReaderOptions()
     112           2 :         t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
     113           2 :         t.dbOpts.iterCount = new(atomic.Int32)
     114           2 :         t.dbOpts.sstStatsCollector = sstStatsCollector
     115           2 :         return t
     116             : }
     117             : 
     118             : // Before calling close, make sure that there will be no further need
     119             : // to access any of the files associated with the store.
     120           2 : func (c *tableCacheContainer) close() error {
     121           2 :         // We want to do some cleanup work here. Check for leaked iterators
     122           2 :         // by the DB using this container. Note that we'll still perform cleanup
     123           2 :         // below in the case that there are leaked iterators.
     124           2 :         var err error
     125           2 :         if v := c.dbOpts.iterCount.Load(); v > 0 {
     126           1 :                 err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
     127           1 :         }
     128             : 
     129             :         // Release nodes here.
     130           2 :         for _, shard := range c.tableCache.shards {
     131           2 :                 if shard != nil {
     132           2 :                         shard.removeDB(&c.dbOpts)
     133           2 :                 }
     134             :         }
     135           2 :         return firstError(err, c.tableCache.Unref())
     136             : }
     137             : 
     138             : func (c *tableCacheContainer) newIters(
     139             :         ctx context.Context,
     140             :         file *manifest.FileMetadata,
     141             :         opts *IterOptions,
     142             :         internalOpts internalIterOpts,
     143           2 : ) (internalIterator, keyspan.FragmentIterator, error) {
     144           2 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts)
     145           2 : }
     146             : 
     147             : func (c *tableCacheContainer) newRangeKeyIter(
     148             :         file *manifest.FileMetadata, opts keyspan.SpanIterOptions,
     149           2 : ) (keyspan.FragmentIterator, error) {
     150           2 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).newRangeKeyIter(file, opts, &c.dbOpts)
     151           2 : }
     152             : 
     153             : // getTableProperties returns the properties associated with the backing physical
     154             : // table if the input metadata belongs to a virtual sstable.
     155           1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
     156           1 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
     157           1 : }
     158             : 
     159           2 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
     160           2 :         c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
     161           2 : }
     162             : 
     163           2 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
     164           2 :         var m CacheMetrics
     165           2 :         for i := range c.tableCache.shards {
     166           2 :                 s := c.tableCache.shards[i]
     167           2 :                 s.mu.RLock()
     168           2 :                 m.Count += int64(len(s.mu.nodes))
     169           2 :                 s.mu.RUnlock()
     170           2 :                 m.Hits += s.hits.Load()
     171           2 :                 m.Misses += s.misses.Load()
     172           2 :         }
     173           2 :         m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
     174           2 :         f := c.dbOpts.filterMetrics.Load()
     175           2 :         return m, f
     176             : }
     177             : 
     178             : func (c *tableCacheContainer) estimateSize(
     179             :         meta *fileMetadata, lower, upper []byte,
     180           2 : ) (size uint64, err error) {
     181           2 :         if meta.Virtual {
     182           2 :                 err = c.withVirtualReader(
     183           2 :                         meta.VirtualMeta(),
     184           2 :                         func(r sstable.VirtualReader) (err error) {
     185           2 :                                 size, err = r.EstimateDiskUsage(lower, upper)
     186           2 :                                 return err
     187           2 :                         },
     188             :                 )
     189           2 :         } else {
     190           2 :                 err = c.withReader(
     191           2 :                         meta.PhysicalMeta(),
     192           2 :                         func(r *sstable.Reader) (err error) {
     193           2 :                                 size, err = r.EstimateDiskUsage(lower, upper)
     194           2 :                                 return err
     195           2 :                         },
     196             :                 )
     197             :         }
     198           2 :         if err != nil {
     199           0 :                 return 0, err
     200           0 :         }
     201           2 :         return size, nil
     202             : }
     203             : 
     204             : // createCommonReader creates a Reader for this file. isForeign, if true for
     205             : // virtual sstables, is passed into the vSSTable reader so its iterators can
     206             : // collapse obsolete points accordingly.
     207             : func createCommonReader(
     208             :         v *tableCacheValue, file *fileMetadata, isForeign bool,
     209           2 : ) sstable.CommonReader {
     210           2 :         // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
     211           2 :         var cr sstable.CommonReader = v.reader
     212           2 :         if file.Virtual {
     213           2 :                 virtualReader := sstable.MakeVirtualReader(
     214           2 :                         v.reader, file.VirtualMeta(), isForeign,
     215           2 :                 )
     216           2 :                 cr = &virtualReader
     217           2 :         }
     218           2 :         return cr
     219             : }
     220             : 
     221             : func (c *tableCacheContainer) withCommonReader(
     222             :         meta *fileMetadata, fn func(sstable.CommonReader) error,
     223           2 : ) error {
     224           2 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     225           2 :         v := s.findNode(meta, &c.dbOpts)
     226           2 :         defer s.unrefValue(v)
     227           2 :         if v.err != nil {
     228           0 :                 return v.err
     229           0 :         }
     230           2 :         provider := c.dbOpts.objProvider
     231           2 :         objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
     232           2 :         if err != nil {
     233           0 :                 return err
     234           0 :         }
     235           2 :         return fn(createCommonReader(v, meta, provider.IsSharedForeign(objMeta)))
     236             : }
     237             : 
     238           2 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
     239           2 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     240           2 :         v := s.findNode(meta.FileMetadata, &c.dbOpts)
     241           2 :         defer s.unrefValue(v)
     242           2 :         if v.err != nil {
     243           1 :                 return v.err
     244           1 :         }
     245           2 :         return fn(v.reader)
     246             : }
     247             : 
     248             : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
     249             : func (c *tableCacheContainer) withVirtualReader(
     250             :         meta virtualMeta, fn func(sstable.VirtualReader) error,
     251           2 : ) error {
     252           2 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     253           2 :         v := s.findNode(meta.FileMetadata, &c.dbOpts)
     254           2 :         defer s.unrefValue(v)
     255           2 :         if v.err != nil {
     256           0 :                 return v.err
     257           0 :         }
     258           2 :         provider := c.dbOpts.objProvider
     259           2 :         objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
     260           2 :         if err != nil {
     261           0 :                 return err
     262           0 :         }
     263           2 :         return fn(sstable.MakeVirtualReader(v.reader, meta, provider.IsSharedForeign(objMeta)))
     264             : }
     265             : 
     266           2 : func (c *tableCacheContainer) iterCount() int64 {
     267           2 :         return int64(c.dbOpts.iterCount.Load())
     268           2 : }
     269             : 
     270             : // TableCache is a shareable cache for open sstables.
     271             : type TableCache struct {
     272             :         refs atomic.Int64
     273             : 
     274             :         cache  *Cache
     275             :         shards []*tableCacheShard
     276             : }
     277             : 
     278             : // Ref adds a reference to the table cache. Once tableCache.init returns,
     279             : // the table cache only remains valid if there is at least one reference
     280             : // to it.
     281           1 : func (c *TableCache) Ref() {
     282           1 :         v := c.refs.Add(1)
     283           1 :         // We don't want the reference count to ever go from 0 -> 1,
     284           1 :         // cause a reference count of 0 implies that we've closed the cache.
     285           1 :         if v <= 1 {
     286           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     287             :         }
     288             : }
     289             : 
     290             : // Unref removes a reference to the table cache.
     291           2 : func (c *TableCache) Unref() error {
     292           2 :         v := c.refs.Add(-1)
     293           2 :         switch {
     294           1 :         case v < 0:
     295           1 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     296           2 :         case v == 0:
     297           2 :                 var err error
     298           2 :                 for i := range c.shards {
     299           2 :                         // The cache shard is not allocated yet, nothing to close
     300           2 :                         if c.shards[i] == nil {
     301           0 :                                 continue
     302             :                         }
     303           2 :                         err = firstError(err, c.shards[i].Close())
     304             :                 }
     305             : 
     306             :                 // Unref the cache which we create a reference to when the tableCache
     307             :                 // is first instantiated.
     308           2 :                 c.cache.Unref()
     309           2 :                 return err
     310             :         }
     311           1 :         return nil
     312             : }
     313             : 
     314             : // NewTableCache will create a reference to the table cache. It is the callers responsibility
     315             : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
     316           2 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
     317           2 :         if size == 0 {
     318           0 :                 panic("pebble: cannot create a table cache of size 0")
     319           2 :         } else if numShards == 0 {
     320           0 :                 panic("pebble: cannot create a table cache with 0 shards")
     321             :         }
     322             : 
     323           2 :         c := &TableCache{}
     324           2 :         c.cache = cache
     325           2 :         c.cache.Ref()
     326           2 : 
     327           2 :         c.shards = make([]*tableCacheShard, numShards)
     328           2 :         for i := range c.shards {
     329           2 :                 c.shards[i] = &tableCacheShard{}
     330           2 :                 c.shards[i].init(size / len(c.shards))
     331           2 :         }
     332             : 
     333             :         // Hold a ref to the cache here.
     334           2 :         c.refs.Store(1)
     335           2 : 
     336           2 :         return c
     337             : }
     338             : 
     339           2 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
     340           2 :         return c.shards[uint64(fileNum.FileNum())%uint64(len(c.shards))]
     341           2 : }
     342             : 
     343             : type tableCacheKey struct {
     344             :         cacheID uint64
     345             :         fileNum base.DiskFileNum
     346             : }
     347             : 
     348             : type tableCacheShard struct {
     349             :         hits      atomic.Int64
     350             :         misses    atomic.Int64
     351             :         iterCount atomic.Int32
     352             : 
     353             :         size int
     354             : 
     355             :         mu struct {
     356             :                 sync.RWMutex
     357             :                 nodes map[tableCacheKey]*tableCacheNode
     358             :                 // The iters map is only created and populated in race builds.
     359             :                 iters map[io.Closer][]byte
     360             : 
     361             :                 handHot  *tableCacheNode
     362             :                 handCold *tableCacheNode
     363             :                 handTest *tableCacheNode
     364             : 
     365             :                 coldTarget int
     366             :                 sizeHot    int
     367             :                 sizeCold   int
     368             :                 sizeTest   int
     369             :         }
     370             :         releasing       sync.WaitGroup
     371             :         releasingCh     chan *tableCacheValue
     372             :         releaseLoopExit sync.WaitGroup
     373             : }
     374             : 
     375           2 : func (c *tableCacheShard) init(size int) {
     376           2 :         c.size = size
     377           2 : 
     378           2 :         c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
     379           2 :         c.mu.coldTarget = size
     380           2 :         c.releasingCh = make(chan *tableCacheValue, 100)
     381           2 :         c.releaseLoopExit.Add(1)
     382           2 :         go c.releaseLoop()
     383           2 : 
     384           2 :         if invariants.RaceEnabled {
     385           0 :                 c.mu.iters = make(map[io.Closer][]byte)
     386           0 :         }
     387             : }
     388             : 
     389           2 : func (c *tableCacheShard) releaseLoop() {
     390           2 :         pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
     391           2 :                 defer c.releaseLoopExit.Done()
     392           2 :                 for v := range c.releasingCh {
     393           2 :                         v.release(c)
     394           2 :                 }
     395             :         })
     396             : }
     397             : 
     398             : // checkAndIntersectFilters checks the specific table and block property filters
     399             : // for intersection with any available table and block-level properties. Returns
     400             : // true for ok if this table should be read by this iterator.
     401             : func (c *tableCacheShard) checkAndIntersectFilters(
     402             :         v *tableCacheValue,
     403             :         tableFilter func(userProps map[string]string) bool,
     404             :         blockPropertyFilters []BlockPropertyFilter,
     405             :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
     406           2 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
     407           2 :         if tableFilter != nil &&
     408           2 :                 !tableFilter(v.reader.Properties.UserProperties) {
     409           1 :                 return false, nil, nil
     410           1 :         }
     411             : 
     412           2 :         if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
     413           2 :                 filterer, err = sstable.IntersectsTable(
     414           2 :                         blockPropertyFilters,
     415           2 :                         boundLimitedFilter,
     416           2 :                         v.reader.Properties.UserProperties,
     417           2 :                 )
     418           2 :                 // NB: IntersectsTable will return a nil filterer if the table-level
     419           2 :                 // properties indicate there's no intersection with the provided filters.
     420           2 :                 if filterer == nil || err != nil {
     421           2 :                         return false, nil, err
     422           2 :                 }
     423             :         }
     424           2 :         return true, filterer, nil
     425             : }
     426             : 
     427             : func (c *tableCacheShard) newIters(
     428             :         ctx context.Context,
     429             :         file *manifest.FileMetadata,
     430             :         opts *IterOptions,
     431             :         internalOpts internalIterOpts,
     432             :         dbOpts *tableCacheOpts,
     433           2 : ) (internalIterator, keyspan.FragmentIterator, error) {
     434           2 :         // TODO(sumeer): constructing the Reader should also use a plumbed context,
     435           2 :         // since parts of the sstable are read during the construction. The Reader
     436           2 :         // should not remember that context since the Reader can be long-lived.
     437           2 : 
     438           2 :         // Calling findNode gives us the responsibility of decrementing v's
     439           2 :         // refCount. If opening the underlying table resulted in error, then we
     440           2 :         // decrement this straight away. Otherwise, we pass that responsibility to
     441           2 :         // the sstable iterator, which decrements when it is closed.
     442           2 :         v := c.findNode(file, dbOpts)
     443           2 :         if v.err != nil {
     444           1 :                 defer c.unrefValue(v)
     445           1 :                 return nil, nil, v.err
     446           1 :         }
     447             : 
     448           2 :         hideObsoletePoints := false
     449           2 :         var pointKeyFilters []BlockPropertyFilter
     450           2 :         if opts != nil {
     451           2 :                 // This code is appending (at most one filter) in-place to
     452           2 :                 // opts.PointKeyFilters even though the slice is shared for iterators in
     453           2 :                 // the same iterator tree. This is acceptable since all the following
     454           2 :                 // properties are true:
     455           2 :                 // - The iterator tree is single threaded, so the shared backing for the
     456           2 :                 //   slice is being mutated in a single threaded manner.
     457           2 :                 // - Each shallow copy of the slice has its own notion of length.
     458           2 :                 // - The appended element is always the obsoleteKeyBlockPropertyFilter
     459           2 :                 //   struct, which is stateless, so overwriting that struct when creating
     460           2 :                 //   one sstable iterator is harmless to other sstable iterators that are
     461           2 :                 //   relying on that struct.
     462           2 :                 //
     463           2 :                 // An alternative would be to have different slices for different sstable
     464           2 :                 // iterators, but that requires more work to avoid allocations.
     465           2 :                 //
     466           2 :                 // TODO(bilal): for compaction reads of foreign sstables, we do hide
     467           2 :                 // obsolete points (see sstable.Reader.newCompactionIter) but we don't
     468           2 :                 // apply the obsolete block property filter. We could optimize this by
     469           2 :                 // applying the filter.
     470           2 :                 hideObsoletePoints, pointKeyFilters =
     471           2 :                         v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
     472           2 :                                 opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
     473           2 :         }
     474           2 :         ok := true
     475           2 :         var filterer *sstable.BlockPropertiesFilterer
     476           2 :         var err error
     477           2 :         if opts != nil {
     478           2 :                 ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
     479           2 :                         pointKeyFilters, internalOpts.boundLimitedFilter)
     480           2 :         }
     481           2 :         if err != nil {
     482           0 :                 c.unrefValue(v)
     483           0 :                 return nil, nil, err
     484           0 :         }
     485             : 
     486           2 :         provider := dbOpts.objProvider
     487           2 :         // Check if this file is a foreign file.
     488           2 :         objMeta, err := provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
     489           2 :         if err != nil {
     490           0 :                 return nil, nil, err
     491           0 :         }
     492             : 
     493             :         // Note: This suffers an allocation for virtual sstables.
     494           2 :         cr := createCommonReader(v, file, provider.IsSharedForeign(objMeta))
     495           2 : 
     496           2 :         // NB: range-del iterator does not maintain a reference to the table, nor
     497           2 :         // does it need to read from it after creation.
     498           2 :         rangeDelIter, err := cr.NewRawRangeDelIter()
     499           2 :         if err != nil {
     500           1 :                 c.unrefValue(v)
     501           1 :                 return nil, nil, err
     502           1 :         }
     503             : 
     504             :         // Assert expected bounds in tests.
     505           2 :         if invariants.Enabled && rangeDelIter != nil {
     506           2 :                 cmp := base.DefaultComparer.Compare
     507           2 :                 if dbOpts.opts.Comparer != nil {
     508           2 :                         cmp = dbOpts.opts.Comparer.Compare
     509           2 :                 }
     510             :                 // TODO(radu): we should be using AssertBounds, but it currently fails in
     511             :                 // some cases (#3167).
     512           2 :                 rangeDelIter = keyspan.AssertUserKeyBounds(
     513           2 :                         rangeDelIter, file.SmallestPointKey.UserKey, file.LargestPointKey.UserKey, cmp,
     514           2 :                 )
     515             :         }
     516             : 
     517           2 :         if !ok {
     518           2 :                 c.unrefValue(v)
     519           2 :                 // Return an empty iterator. This iterator has no mutable state, so
     520           2 :                 // using a singleton is fine.
     521           2 :                 // NB: We still return the potentially non-empty rangeDelIter. This
     522           2 :                 // ensures the iterator observes the file's range deletions even if the
     523           2 :                 // block property filters exclude all the file's point keys. The range
     524           2 :                 // deletions may still delete keys lower in the LSM in files that DO
     525           2 :                 // match the active filters.
     526           2 :                 //
     527           2 :                 // The point iterator returned must implement the filteredIter
     528           2 :                 // interface, so that the level iterator surfaces file boundaries when
     529           2 :                 // range deletions are present.
     530           2 :                 return filteredAll, rangeDelIter, err
     531           2 :         }
     532             : 
     533           2 :         var iter sstable.Iterator
     534           2 :         useFilter := true
     535           2 :         if opts != nil {
     536           2 :                 useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
     537           2 :                 ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
     538           2 :         }
     539           2 :         tableFormat, err := v.reader.TableFormat()
     540           2 :         if err != nil {
     541           0 :                 return nil, nil, err
     542           0 :         }
     543           2 :         var rp sstable.ReaderProvider
     544           2 :         if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
     545           2 :                 rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
     546           2 :         }
     547             : 
     548           2 :         if objMeta.IsShared() && v.reader.Properties.GlobalSeqNum != 0 {
     549           2 :                 if tableFormat < sstable.TableFormatPebblev4 {
     550           0 :                         return nil, nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
     551           0 :                 }
     552             :                 // The table is shared and ingested.
     553           2 :                 hideObsoletePoints = true
     554             :         }
     555           2 :         var categoryAndQoS sstable.CategoryAndQoS
     556           2 :         if opts != nil {
     557           2 :                 categoryAndQoS = opts.CategoryAndQoS
     558           2 :         }
     559           2 :         if internalOpts.bytesIterated != nil {
     560           2 :                 iter, err = cr.NewCompactionIter(
     561           2 :                         internalOpts.bytesIterated, categoryAndQoS, dbOpts.sstStatsCollector, rp,
     562           2 :                         internalOpts.bufferPool)
     563           2 :         } else {
     564           2 :                 iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
     565           2 :                         ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter,
     566           2 :                         internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp)
     567           2 :         }
     568           2 :         if err != nil {
     569           1 :                 if rangeDelIter != nil {
     570           1 :                         _ = rangeDelIter.Close()
     571           1 :                 }
     572           1 :                 c.unrefValue(v)
     573           1 :                 return nil, nil, err
     574             :         }
     575             :         // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
     576             :         // care to avoid introducing an allocation here by adding a closure.
     577           2 :         iter.SetCloseHook(v.closeHook)
     578           2 : 
     579           2 :         c.iterCount.Add(1)
     580           2 :         dbOpts.iterCount.Add(1)
     581           2 :         if invariants.RaceEnabled {
     582           0 :                 c.mu.Lock()
     583           0 :                 c.mu.iters[iter] = debug.Stack()
     584           0 :                 c.mu.Unlock()
     585           0 :         }
     586           2 :         return iter, rangeDelIter, nil
     587             : }
     588             : 
     589             : func (c *tableCacheShard) newRangeKeyIter(
     590             :         file *manifest.FileMetadata, opts keyspan.SpanIterOptions, dbOpts *tableCacheOpts,
     591           2 : ) (keyspan.FragmentIterator, error) {
     592           2 :         // Calling findNode gives us the responsibility of decrementing v's
     593           2 :         // refCount. If opening the underlying table resulted in error, then we
     594           2 :         // decrement this straight away. Otherwise, we pass that responsibility to
     595           2 :         // the sstable iterator, which decrements when it is closed.
     596           2 :         v := c.findNode(file, dbOpts)
     597           2 :         if v.err != nil {
     598           0 :                 defer c.unrefValue(v)
     599           0 :                 return nil, v.err
     600           0 :         }
     601             : 
     602           2 :         ok := true
     603           2 :         var err error
     604           2 :         // Don't filter a table's range keys if the file contains RANGEKEYDELs.
     605           2 :         // The RANGEKEYDELs may delete range keys in other levels. Skipping the
     606           2 :         // file's range key blocks may surface deleted range keys below. This is
     607           2 :         // done here, rather than deferring to the block-property collector in order
     608           2 :         // to maintain parity with point keys and the treatment of RANGEDELs.
     609           2 :         if v.reader.Properties.NumRangeKeyDels == 0 {
     610           2 :                 ok, _, err = c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil)
     611           2 :         }
     612           2 :         if err != nil {
     613           0 :                 c.unrefValue(v)
     614           0 :                 return nil, err
     615           0 :         }
     616           2 :         if !ok {
     617           0 :                 c.unrefValue(v)
     618           0 :                 // Return the empty iterator. This iterator has no mutable state, so
     619           0 :                 // using a singleton is fine.
     620           0 :                 return emptyKeyspanIter, err
     621           0 :         }
     622             : 
     623           2 :         var iter keyspan.FragmentIterator
     624           2 :         if file.Virtual {
     625           2 :                 provider := dbOpts.objProvider
     626           2 :                 var objMeta objstorage.ObjectMetadata
     627           2 :                 objMeta, err = provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
     628           2 :                 if err == nil {
     629           2 :                         virtualReader := sstable.MakeVirtualReader(
     630           2 :                                 v.reader, file.VirtualMeta(), provider.IsSharedForeign(objMeta),
     631           2 :                         )
     632           2 :                         iter, err = virtualReader.NewRawRangeKeyIter()
     633           2 :                 }
     634           2 :         } else {
     635           2 :                 iter, err = v.reader.NewRawRangeKeyIter()
     636           2 :         }
     637             : 
     638             :         // iter is a block iter that holds the entire value of the block in memory.
     639             :         // No need to hold onto a ref of the cache value.
     640           2 :         c.unrefValue(v)
     641           2 : 
     642           2 :         if err != nil {
     643           1 :                 return nil, err
     644           1 :         }
     645             : 
     646           2 :         if iter == nil {
     647           2 :                 // NewRawRangeKeyIter can return nil even if there's no error. However,
     648           2 :                 // the keyspan.LevelIter expects a non-nil iterator if err is nil.
     649           2 :                 return emptyKeyspanIter, nil
     650           2 :         }
     651             : 
     652           2 :         return iter, nil
     653             : }
     654             : 
     655             : type tableCacheShardReaderProvider struct {
     656             :         c      *tableCacheShard
     657             :         file   *manifest.FileMetadata
     658             :         dbOpts *tableCacheOpts
     659             :         v      *tableCacheValue
     660             : }
     661             : 
     662             : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
     663             : 
     664             : // GetReader implements sstable.ReaderProvider. Note that it is not the
     665             : // responsibility of tableCacheShardReaderProvider to ensure that the file
     666             : // continues to exist. The ReaderProvider is used in iterators where the
     667             : // top-level iterator is pinning the read state and preventing the files from
     668             : // being deleted.
     669             : //
     670             : // The caller must call tableCacheShardReaderProvider.Close.
     671             : //
     672             : // Note that currently the Reader returned here is only used to read value
     673             : // blocks. This reader shouldn't be used for other purposes like reading keys
     674             : // outside of virtual sstable bounds.
     675             : //
     676             : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
     677             : // that the reader isn't used for other purposes.
     678           2 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
     679           2 :         // Calling findNode gives us the responsibility of decrementing v's
     680           2 :         // refCount.
     681           2 :         v := rp.c.findNode(rp.file, rp.dbOpts)
     682           2 :         if v.err != nil {
     683           0 :                 defer rp.c.unrefValue(v)
     684           0 :                 return nil, v.err
     685           0 :         }
     686           2 :         rp.v = v
     687           2 :         return v.reader, nil
     688             : }
     689             : 
     690             : // Close implements sstable.ReaderProvider.
     691           2 : func (rp *tableCacheShardReaderProvider) Close() {
     692           2 :         rp.c.unrefValue(rp.v)
     693           2 :         rp.v = nil
     694           2 : }
     695             : 
     696             : // getTableProperties return sst table properties for target file
     697             : func (c *tableCacheShard) getTableProperties(
     698             :         file *fileMetadata, dbOpts *tableCacheOpts,
     699           1 : ) (*sstable.Properties, error) {
     700           1 :         // Calling findNode gives us the responsibility of decrementing v's refCount here
     701           1 :         v := c.findNode(file, dbOpts)
     702           1 :         defer c.unrefValue(v)
     703           1 : 
     704           1 :         if v.err != nil {
     705           0 :                 return nil, v.err
     706           0 :         }
     707           1 :         return &v.reader.Properties, nil
     708             : }
     709             : 
     710             : // releaseNode releases a node from the tableCacheShard.
     711             : //
     712             : // c.mu must be held when calling this.
     713           1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
     714           1 :         c.unlinkNode(n)
     715           1 :         c.clearNode(n)
     716           1 : }
     717             : 
     718             : // unlinkNode removes a node from the tableCacheShard, leaving the shard
     719             : // reference in place.
     720             : //
     721             : // c.mu must be held when calling this.
     722           2 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
     723           2 :         key := tableCacheKey{n.cacheID, n.fileNum}
     724           2 :         delete(c.mu.nodes, key)
     725           2 : 
     726           2 :         switch n.ptype {
     727           2 :         case tableCacheNodeHot:
     728           2 :                 c.mu.sizeHot--
     729           2 :         case tableCacheNodeCold:
     730           2 :                 c.mu.sizeCold--
     731           2 :         case tableCacheNodeTest:
     732           2 :                 c.mu.sizeTest--
     733             :         }
     734             : 
     735           2 :         if n == c.mu.handHot {
     736           2 :                 c.mu.handHot = c.mu.handHot.prev()
     737           2 :         }
     738           2 :         if n == c.mu.handCold {
     739           2 :                 c.mu.handCold = c.mu.handCold.prev()
     740           2 :         }
     741           2 :         if n == c.mu.handTest {
     742           2 :                 c.mu.handTest = c.mu.handTest.prev()
     743           2 :         }
     744             : 
     745           2 :         if n.unlink() == n {
     746           2 :                 // This was the last entry in the cache.
     747           2 :                 c.mu.handHot = nil
     748           2 :                 c.mu.handCold = nil
     749           2 :                 c.mu.handTest = nil
     750           2 :         }
     751             : 
     752           2 :         n.links.prev = nil
     753           2 :         n.links.next = nil
     754             : }
     755             : 
     756           2 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
     757           2 :         if v := n.value; v != nil {
     758           2 :                 n.value = nil
     759           2 :                 c.unrefValue(v)
     760           2 :         }
     761             : }
     762             : 
     763             : // unrefValue decrements the reference count for the specified value, releasing
     764             : // it if the reference count fell to 0. Note that the value has a reference if
     765             : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
     766             : // the node has already been removed from that map.
     767           2 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
     768           2 :         if v.refCount.Add(-1) == 0 {
     769           2 :                 c.releasing.Add(1)
     770           2 :                 c.releasingCh <- v
     771           2 :         }
     772             : }
     773             : 
     774             : // findNode returns the node for the table with the given file number, creating
     775             : // that node if it didn't already exist. The caller is responsible for
     776             : // decrementing the returned node's refCount.
     777           2 : func (c *tableCacheShard) findNode(meta *fileMetadata, dbOpts *tableCacheOpts) *tableCacheValue {
     778           2 :         v := c.findNodeInternal(meta, dbOpts)
     779           2 : 
     780           2 :         // Loading a file before its global sequence number is known (eg,
     781           2 :         // during ingest before entering the commit pipeline) can pollute
     782           2 :         // the cache with incorrect state. In invariant builds, verify
     783           2 :         // that the global sequence number of the returned reader matches.
     784           2 :         if invariants.Enabled {
     785           2 :                 if v.reader != nil && meta.LargestSeqNum == meta.SmallestSeqNum &&
     786           2 :                         v.reader.Properties.GlobalSeqNum != meta.SmallestSeqNum {
     787           0 :                         panic(errors.AssertionFailedf("file %s loaded from table cache with the wrong global sequence number %d",
     788           0 :                                 meta, v.reader.Properties.GlobalSeqNum))
     789             :                 }
     790             :         }
     791           2 :         return v
     792             : }
     793             : 
     794             : func (c *tableCacheShard) findNodeInternal(
     795             :         meta *fileMetadata, dbOpts *tableCacheOpts,
     796           2 : ) *tableCacheValue {
     797           2 :         if refs := meta.Refs(); refs <= 0 {
     798           0 :                 panic(errors.AssertionFailedf("attempting to load file %s with refs=%d from table cache",
     799           0 :                         meta, refs))
     800             :         }
     801             :         // Fast-path for a hit in the cache.
     802           2 :         c.mu.RLock()
     803           2 :         key := tableCacheKey{dbOpts.cacheID, meta.FileBacking.DiskFileNum}
     804           2 :         if n := c.mu.nodes[key]; n != nil && n.value != nil {
     805           2 :                 // Fast-path hit.
     806           2 :                 //
     807           2 :                 // The caller is responsible for decrementing the refCount.
     808           2 :                 v := n.value
     809           2 :                 v.refCount.Add(1)
     810           2 :                 c.mu.RUnlock()
     811           2 :                 n.referenced.Store(true)
     812           2 :                 c.hits.Add(1)
     813           2 :                 <-v.loaded
     814           2 :                 return v
     815           2 :         }
     816           2 :         c.mu.RUnlock()
     817           2 : 
     818           2 :         c.mu.Lock()
     819           2 : 
     820           2 :         n := c.mu.nodes[key]
     821           2 :         switch {
     822           2 :         case n == nil:
     823           2 :                 // Slow-path miss of a non-existent node.
     824           2 :                 n = &tableCacheNode{
     825           2 :                         fileNum: meta.FileBacking.DiskFileNum,
     826           2 :                         ptype:   tableCacheNodeCold,
     827           2 :                 }
     828           2 :                 c.addNode(n, dbOpts)
     829           2 :                 c.mu.sizeCold++
     830             : 
     831           2 :         case n.value != nil:
     832           2 :                 // Slow-path hit of a hot or cold node.
     833           2 :                 //
     834           2 :                 // The caller is responsible for decrementing the refCount.
     835           2 :                 v := n.value
     836           2 :                 v.refCount.Add(1)
     837           2 :                 n.referenced.Store(true)
     838           2 :                 c.hits.Add(1)
     839           2 :                 c.mu.Unlock()
     840           2 :                 <-v.loaded
     841           2 :                 return v
     842             : 
     843           2 :         default:
     844           2 :                 // Slow-path miss of a test node.
     845           2 :                 c.unlinkNode(n)
     846           2 :                 c.mu.coldTarget++
     847           2 :                 if c.mu.coldTarget > c.size {
     848           2 :                         c.mu.coldTarget = c.size
     849           2 :                 }
     850             : 
     851           2 :                 n.referenced.Store(false)
     852           2 :                 n.ptype = tableCacheNodeHot
     853           2 :                 c.addNode(n, dbOpts)
     854           2 :                 c.mu.sizeHot++
     855             :         }
     856             : 
     857           2 :         c.misses.Add(1)
     858           2 : 
     859           2 :         v := &tableCacheValue{
     860           2 :                 loaded: make(chan struct{}),
     861           2 :         }
     862           2 :         v.refCount.Store(2)
     863           2 :         // Cache the closure invoked when an iterator is closed. This avoids an
     864           2 :         // allocation on every call to newIters.
     865           2 :         v.closeHook = func(i sstable.Iterator) error {
     866           2 :                 if invariants.RaceEnabled {
     867           0 :                         c.mu.Lock()
     868           0 :                         delete(c.mu.iters, i)
     869           0 :                         c.mu.Unlock()
     870           0 :                 }
     871           2 :                 c.unrefValue(v)
     872           2 :                 c.iterCount.Add(-1)
     873           2 :                 dbOpts.iterCount.Add(-1)
     874           2 :                 return nil
     875             :         }
     876           2 :         n.value = v
     877           2 : 
     878           2 :         c.mu.Unlock()
     879           2 : 
     880           2 :         // Note adding to the cache lists must complete before we begin loading the
     881           2 :         // table as a failure during load will result in the node being unlinked.
     882           2 :         pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
     883           2 :                 v.load(
     884           2 :                         loadInfo{
     885           2 :                                 backingFileNum: meta.FileBacking.DiskFileNum,
     886           2 :                                 smallestSeqNum: meta.SmallestSeqNum,
     887           2 :                                 largestSeqNum:  meta.LargestSeqNum,
     888           2 :                         }, c, dbOpts)
     889           2 :         })
     890           2 :         return v
     891             : }
     892             : 
     893           2 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
     894           2 :         c.evictNodes()
     895           2 :         n.cacheID = dbOpts.cacheID
     896           2 :         key := tableCacheKey{n.cacheID, n.fileNum}
     897           2 :         c.mu.nodes[key] = n
     898           2 : 
     899           2 :         n.links.next = n
     900           2 :         n.links.prev = n
     901           2 :         if c.mu.handHot == nil {
     902           2 :                 // First element.
     903           2 :                 c.mu.handHot = n
     904           2 :                 c.mu.handCold = n
     905           2 :                 c.mu.handTest = n
     906           2 :         } else {
     907           2 :                 c.mu.handHot.link(n)
     908           2 :         }
     909             : 
     910           2 :         if c.mu.handCold == c.mu.handHot {
     911           2 :                 c.mu.handCold = c.mu.handCold.prev()
     912           2 :         }
     913             : }
     914             : 
     915           2 : func (c *tableCacheShard) evictNodes() {
     916           2 :         for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
     917           2 :                 c.runHandCold()
     918           2 :         }
     919             : }
     920             : 
     921           2 : func (c *tableCacheShard) runHandCold() {
     922           2 :         n := c.mu.handCold
     923           2 :         if n.ptype == tableCacheNodeCold {
     924           2 :                 if n.referenced.Load() {
     925           2 :                         n.referenced.Store(false)
     926           2 :                         n.ptype = tableCacheNodeHot
     927           2 :                         c.mu.sizeCold--
     928           2 :                         c.mu.sizeHot++
     929           2 :                 } else {
     930           2 :                         c.clearNode(n)
     931           2 :                         n.ptype = tableCacheNodeTest
     932           2 :                         c.mu.sizeCold--
     933           2 :                         c.mu.sizeTest++
     934           2 :                         for c.size < c.mu.sizeTest && c.mu.handTest != nil {
     935           1 :                                 c.runHandTest()
     936           1 :                         }
     937             :                 }
     938             :         }
     939             : 
     940           2 :         c.mu.handCold = c.mu.handCold.next()
     941           2 : 
     942           2 :         for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
     943           2 :                 c.runHandHot()
     944           2 :         }
     945             : }
     946             : 
     947           2 : func (c *tableCacheShard) runHandHot() {
     948           2 :         if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
     949           2 :                 c.runHandTest()
     950           2 :                 if c.mu.handHot == nil {
     951           0 :                         return
     952           0 :                 }
     953             :         }
     954             : 
     955           2 :         n := c.mu.handHot
     956           2 :         if n.ptype == tableCacheNodeHot {
     957           2 :                 if n.referenced.Load() {
     958           2 :                         n.referenced.Store(false)
     959           2 :                 } else {
     960           2 :                         n.ptype = tableCacheNodeCold
     961           2 :                         c.mu.sizeHot--
     962           2 :                         c.mu.sizeCold++
     963           2 :                 }
     964             :         }
     965             : 
     966           2 :         c.mu.handHot = c.mu.handHot.next()
     967             : }
     968             : 
     969           2 : func (c *tableCacheShard) runHandTest() {
     970           2 :         if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
     971           2 :                 c.runHandCold()
     972           2 :                 if c.mu.handTest == nil {
     973           0 :                         return
     974           0 :                 }
     975             :         }
     976             : 
     977           2 :         n := c.mu.handTest
     978           2 :         if n.ptype == tableCacheNodeTest {
     979           2 :                 c.mu.coldTarget--
     980           2 :                 if c.mu.coldTarget < 0 {
     981           1 :                         c.mu.coldTarget = 0
     982           1 :                 }
     983           2 :                 c.unlinkNode(n)
     984           2 :                 c.clearNode(n)
     985             :         }
     986             : 
     987           2 :         c.mu.handTest = c.mu.handTest.next()
     988             : }
     989             : 
     990           2 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
     991           2 :         c.mu.Lock()
     992           2 :         key := tableCacheKey{dbOpts.cacheID, fileNum}
     993           2 :         n := c.mu.nodes[key]
     994           2 :         var v *tableCacheValue
     995           2 :         if n != nil {
     996           2 :                 // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
     997           2 :                 // the tableCacheNode.release() call synchronously below to ensure the
     998           2 :                 // sstable file descriptor is closed before returning. Note that
     999           2 :                 // tableCacheShard.releasing needs to be incremented while holding
    1000           2 :                 // tableCacheShard.mu in order to avoid a race with Close()
    1001           2 :                 c.unlinkNode(n)
    1002           2 :                 v = n.value
    1003           2 :                 if v != nil {
    1004           2 :                         if !allowLeak {
    1005           2 :                                 if t := v.refCount.Add(-1); t != 0 {
    1006           0 :                                         dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
    1007           0 :                                 }
    1008             :                         }
    1009           2 :                         c.releasing.Add(1)
    1010             :                 }
    1011             :         }
    1012             : 
    1013           2 :         c.mu.Unlock()
    1014           2 : 
    1015           2 :         if v != nil {
    1016           2 :                 v.release(c)
    1017           2 :         }
    1018             : 
    1019           2 :         dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
    1020             : }
    1021             : 
    1022             : // removeDB evicts any nodes which have a reference to the DB
    1023             : // associated with dbOpts.cacheID. Make sure that there will
    1024             : // be no more accesses to the files associated with the DB.
    1025           2 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
    1026           2 :         var fileNums []base.DiskFileNum
    1027           2 : 
    1028           2 :         c.mu.RLock()
    1029           2 :         // Collect the fileNums which need to be cleaned.
    1030           2 :         var firstNode *tableCacheNode
    1031           2 :         node := c.mu.handHot
    1032           2 :         for node != firstNode {
    1033           2 :                 if firstNode == nil {
    1034           2 :                         firstNode = node
    1035           2 :                 }
    1036             : 
    1037           2 :                 if node.cacheID == dbOpts.cacheID {
    1038           2 :                         fileNums = append(fileNums, node.fileNum)
    1039           2 :                 }
    1040           2 :                 node = node.next()
    1041             :         }
    1042           2 :         c.mu.RUnlock()
    1043           2 : 
    1044           2 :         // Evict all the nodes associated with the DB.
    1045           2 :         // This should synchronously close all the files
    1046           2 :         // associated with the DB.
    1047           2 :         for _, fileNum := range fileNums {
    1048           2 :                 c.evict(fileNum, dbOpts, true)
    1049           2 :         }
    1050             : }
    1051             : 
    1052           2 : func (c *tableCacheShard) Close() error {
    1053           2 :         c.mu.Lock()
    1054           2 :         defer c.mu.Unlock()
    1055           2 : 
    1056           2 :         // Check for leaked iterators. Note that we'll still perform cleanup below in
    1057           2 :         // the case that there are leaked iterators.
    1058           2 :         var err error
    1059           2 :         if v := c.iterCount.Load(); v > 0 {
    1060           1 :                 if !invariants.RaceEnabled {
    1061           1 :                         err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
    1062           1 :                 } else {
    1063           0 :                         var buf bytes.Buffer
    1064           0 :                         for _, stack := range c.mu.iters {
    1065           0 :                                 fmt.Fprintf(&buf, "%s\n", stack)
    1066           0 :                         }
    1067           0 :                         err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
    1068             :                 }
    1069             :         }
    1070             : 
    1071           2 :         for c.mu.handHot != nil {
    1072           0 :                 n := c.mu.handHot
    1073           0 :                 if n.value != nil {
    1074           0 :                         if n.value.refCount.Add(-1) == 0 {
    1075           0 :                                 c.releasing.Add(1)
    1076           0 :                                 c.releasingCh <- n.value
    1077           0 :                         }
    1078             :                 }
    1079           0 :                 c.unlinkNode(n)
    1080             :         }
    1081           2 :         c.mu.nodes = nil
    1082           2 :         c.mu.handHot = nil
    1083           2 :         c.mu.handCold = nil
    1084           2 :         c.mu.handTest = nil
    1085           2 : 
    1086           2 :         // Only shutdown the releasing goroutine if there were no leaked
    1087           2 :         // iterators. If there were leaked iterators, we leave the goroutine running
    1088           2 :         // and the releasingCh open so that a subsequent iterator close can
    1089           2 :         // complete. This behavior is used by iterator leak tests. Leaking the
    1090           2 :         // goroutine for these tests is less bad not closing the iterator which
    1091           2 :         // triggers other warnings about block cache handles not being released.
    1092           2 :         if err != nil {
    1093           1 :                 c.releasing.Wait()
    1094           1 :                 return err
    1095           1 :         }
    1096             : 
    1097           2 :         close(c.releasingCh)
    1098           2 :         c.releasing.Wait()
    1099           2 :         c.releaseLoopExit.Wait()
    1100           2 :         return err
    1101             : }
    1102             : 
    1103             : type tableCacheValue struct {
    1104             :         closeHook func(i sstable.Iterator) error
    1105             :         reader    *sstable.Reader
    1106             :         err       error
    1107             :         loaded    chan struct{}
    1108             :         // Reference count for the value. The reader is closed when the reference
    1109             :         // count drops to zero.
    1110             :         refCount atomic.Int32
    1111             : }
    1112             : 
    1113             : type loadInfo struct {
    1114             :         backingFileNum base.DiskFileNum
    1115             :         largestSeqNum  uint64
    1116             :         smallestSeqNum uint64
    1117             : }
    1118             : 
    1119           2 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
    1120           2 :         // Try opening the file first.
    1121           2 :         var f objstorage.Readable
    1122           2 :         var err error
    1123           2 :         f, err = dbOpts.objProvider.OpenForReading(
    1124           2 :                 context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
    1125           2 :         )
    1126           2 :         if err == nil {
    1127           2 :                 cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
    1128           2 :                 v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
    1129           2 :         }
    1130           2 :         if err != nil {
    1131           1 :                 v.err = errors.Wrapf(
    1132           1 :                         err, "pebble: backing file %s error", errors.Safe(loadInfo.backingFileNum.FileNum()))
    1133           1 :         }
    1134           2 :         if v.err == nil && loadInfo.smallestSeqNum == loadInfo.largestSeqNum {
    1135           2 :                 v.reader.Properties.GlobalSeqNum = loadInfo.largestSeqNum
    1136           2 :         }
    1137           2 :         if v.err != nil {
    1138           1 :                 c.mu.Lock()
    1139           1 :                 defer c.mu.Unlock()
    1140           1 :                 // Lookup the node in the cache again as it might have already been
    1141           1 :                 // removed.
    1142           1 :                 key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
    1143           1 :                 n := c.mu.nodes[key]
    1144           1 :                 if n != nil && n.value == v {
    1145           1 :                         c.releaseNode(n)
    1146           1 :                 }
    1147             :         }
    1148           2 :         close(v.loaded)
    1149             : }
    1150             : 
    1151           2 : func (v *tableCacheValue) release(c *tableCacheShard) {
    1152           2 :         <-v.loaded
    1153           2 :         // Nothing to be done about an error at this point. Close the reader if it is
    1154           2 :         // open.
    1155           2 :         if v.reader != nil {
    1156           2 :                 _ = v.reader.Close()
    1157           2 :         }
    1158           2 :         c.releasing.Done()
    1159             : }
    1160             : 
    1161             : type tableCacheNodeType int8
    1162             : 
    1163             : const (
    1164             :         tableCacheNodeTest tableCacheNodeType = iota
    1165             :         tableCacheNodeCold
    1166             :         tableCacheNodeHot
    1167             : )
    1168             : 
    1169           0 : func (p tableCacheNodeType) String() string {
    1170           0 :         switch p {
    1171           0 :         case tableCacheNodeTest:
    1172           0 :                 return "test"
    1173           0 :         case tableCacheNodeCold:
    1174           0 :                 return "cold"
    1175           0 :         case tableCacheNodeHot:
    1176           0 :                 return "hot"
    1177             :         }
    1178           0 :         return "unknown"
    1179             : }
    1180             : 
    1181             : type tableCacheNode struct {
    1182             :         fileNum base.DiskFileNum
    1183             :         value   *tableCacheValue
    1184             : 
    1185             :         links struct {
    1186             :                 next *tableCacheNode
    1187             :                 prev *tableCacheNode
    1188             :         }
    1189             :         ptype tableCacheNodeType
    1190             :         // referenced is atomically set to indicate that this entry has been accessed
    1191             :         // since the last time one of the clock hands swept it.
    1192             :         referenced atomic.Bool
    1193             : 
    1194             :         // Storing the cache id associated with the DB instance here
    1195             :         // avoids the need to thread the dbOpts struct through many functions.
    1196             :         cacheID uint64
    1197             : }
    1198             : 
    1199           2 : func (n *tableCacheNode) next() *tableCacheNode {
    1200           2 :         if n == nil {
    1201           0 :                 return nil
    1202           0 :         }
    1203           2 :         return n.links.next
    1204             : }
    1205             : 
    1206           2 : func (n *tableCacheNode) prev() *tableCacheNode {
    1207           2 :         if n == nil {
    1208           0 :                 return nil
    1209           0 :         }
    1210           2 :         return n.links.prev
    1211             : }
    1212             : 
    1213           2 : func (n *tableCacheNode) link(s *tableCacheNode) {
    1214           2 :         s.links.prev = n.links.prev
    1215           2 :         s.links.prev.links.next = s
    1216           2 :         s.links.next = n
    1217           2 :         s.links.next.links.prev = s
    1218           2 : }
    1219             : 
    1220           2 : func (n *tableCacheNode) unlink() *tableCacheNode {
    1221           2 :         next := n.links.next
    1222           2 :         n.links.prev.links.next = n.links.next
    1223           2 :         n.links.next.links.prev = n.links.prev
    1224           2 :         n.links.prev = n
    1225           2 :         n.links.next = n
    1226           2 :         return next
    1227           2 : }

Generated by: LCOV version 1.14