LCOV - code coverage report
Current view: top level - pebble - table_cache.go (source / functions) Hit Total Coverage
Test: 2023-10-30 08:17Z ed45a776 - meta test only.lcov Lines: 528 757 69.7 %
Date: 2023-10-30 08:18:28 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "fmt"
      11             :         "io"
      12             :         "runtime/debug"
      13             :         "runtime/pprof"
      14             :         "sync"
      15             :         "sync/atomic"
      16             :         "unsafe"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble/internal/base"
      20             :         "github.com/cockroachdb/pebble/internal/invariants"
      21             :         "github.com/cockroachdb/pebble/internal/keyspan"
      22             :         "github.com/cockroachdb/pebble/internal/manifest"
      23             :         "github.com/cockroachdb/pebble/internal/private"
      24             :         "github.com/cockroachdb/pebble/objstorage"
      25             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      26             :         "github.com/cockroachdb/pebble/sstable"
      27             : )
      28             : 
      29             : var emptyIter = &errorIter{err: nil}
      30             : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
      31             : 
      32             : // filteredAll is a singleton internalIterator implementation used when an
      33             : // sstable does contain point keys, but all the keys are filtered by the active
      34             : // PointKeyFilters set in the iterator's IterOptions.
      35             : //
      36             : // filteredAll implements filteredIter, ensuring the level iterator recognizes
      37             : // when it may need to return file boundaries to keep the rangeDelIter open
      38             : // during mergingIter operation.
      39             : var filteredAll = &filteredAllKeysIter{errorIter: errorIter{err: nil}}
      40             : 
      41             : var _ filteredIter = filteredAll
      42             : 
      43             : type filteredAllKeysIter struct {
      44             :         errorIter
      45             : }
      46             : 
      47           1 : func (s *filteredAllKeysIter) MaybeFilteredKeys() bool {
      48           1 :         return true
      49           1 : }
      50             : 
      51             : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
      52             : 
      53             : // tableCacheOpts contains the db specific fields
      54             : // of a table cache. This is stored in the tableCacheContainer
      55             : // along with the table cache.
      56             : // NB: It is important to make sure that the fields in this
      57             : // struct are read-only. Since the fields here are shared
      58             : // by every single tableCacheShard, if non read-only fields
      59             : // are updated, we could have unnecessary evictions of those
      60             : // fields, and the surrounding fields from the CPU caches.
      61             : type tableCacheOpts struct {
      62             :         // iterCount keeps track of how many iterators are open. It is used to keep
      63             :         // track of leaked iterators on a per-db level.
      64             :         iterCount *atomic.Int32
      65             : 
      66             :         loggerAndTracer LoggerAndTracer
      67             :         cacheID         uint64
      68             :         objProvider     objstorage.Provider
      69             :         opts            sstable.ReaderOptions
      70             :         filterMetrics   *sstable.FilterMetricsTracker
      71             : }
      72             : 
      73             : // tableCacheContainer contains the table cache and
      74             : // fields which are unique to the DB.
      75             : type tableCacheContainer struct {
      76             :         tableCache *TableCache
      77             : 
      78             :         // dbOpts contains fields relevant to the table cache
      79             :         // which are unique to each DB.
      80             :         dbOpts tableCacheOpts
      81             : }
      82             : 
      83             : // newTableCacheContainer will panic if the underlying cache in the table cache
      84             : // doesn't match Options.Cache.
      85             : func newTableCacheContainer(
      86             :         tc *TableCache, cacheID uint64, objProvider objstorage.Provider, opts *Options, size int,
      87           1 : ) *tableCacheContainer {
      88           1 :         // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
      89           1 :         if tc != nil {
      90           0 :                 if tc.cache != opts.Cache {
      91           0 :                         panic("pebble: underlying cache for the table cache and db are different")
      92             :                 }
      93           0 :                 tc.Ref()
      94           1 :         } else {
      95           1 :                 // NewTableCache should create a ref to tc which the container should
      96           1 :                 // drop whenever it is closed.
      97           1 :                 tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
      98           1 :         }
      99             : 
     100           1 :         t := &tableCacheContainer{}
     101           1 :         t.tableCache = tc
     102           1 :         t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
     103           1 :         t.dbOpts.cacheID = cacheID
     104           1 :         t.dbOpts.objProvider = objProvider
     105           1 :         t.dbOpts.opts = opts.MakeReaderOptions()
     106           1 :         t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
     107           1 :         t.dbOpts.iterCount = new(atomic.Int32)
     108           1 :         return t
     109             : }
     110             : 
     111             : // Before calling close, make sure that there will be no further need
     112             : // to access any of the files associated with the store.
     113           1 : func (c *tableCacheContainer) close() error {
     114           1 :         // We want to do some cleanup work here. Check for leaked iterators
     115           1 :         // by the DB using this container. Note that we'll still perform cleanup
     116           1 :         // below in the case that there are leaked iterators.
     117           1 :         var err error
     118           1 :         if v := c.dbOpts.iterCount.Load(); v > 0 {
     119           0 :                 err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
     120           0 :         }
     121             : 
     122             :         // Release nodes here.
     123           1 :         for _, shard := range c.tableCache.shards {
     124           1 :                 if shard != nil {
     125           1 :                         shard.removeDB(&c.dbOpts)
     126           1 :                 }
     127             :         }
     128           1 :         return firstError(err, c.tableCache.Unref())
     129             : }
     130             : 
     131             : func (c *tableCacheContainer) newIters(
     132             :         ctx context.Context,
     133             :         file *manifest.FileMetadata,
     134             :         opts *IterOptions,
     135             :         internalOpts internalIterOpts,
     136           1 : ) (internalIterator, keyspan.FragmentIterator, error) {
     137           1 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts)
     138           1 : }
     139             : 
     140             : func (c *tableCacheContainer) newRangeKeyIter(
     141             :         file *manifest.FileMetadata, opts keyspan.SpanIterOptions,
     142           1 : ) (keyspan.FragmentIterator, error) {
     143           1 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).newRangeKeyIter(file, opts, &c.dbOpts)
     144           1 : }
     145             : 
     146             : // getTableProperties returns the properties associated with the backing physical
     147             : // table if the input metadata belongs to a virtual sstable.
     148           0 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
     149           0 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
     150           0 : }
     151             : 
     152           1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
     153           1 :         c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
     154           1 : }
     155             : 
     156           1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
     157           1 :         var m CacheMetrics
     158           1 :         for i := range c.tableCache.shards {
     159           1 :                 s := c.tableCache.shards[i]
     160           1 :                 s.mu.RLock()
     161           1 :                 m.Count += int64(len(s.mu.nodes))
     162           1 :                 s.mu.RUnlock()
     163           1 :                 m.Hits += s.hits.Load()
     164           1 :                 m.Misses += s.misses.Load()
     165           1 :         }
     166           1 :         m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
     167           1 :         f := c.dbOpts.filterMetrics.Load()
     168           1 :         return m, f
     169             : }
     170             : 
     171             : func (c *tableCacheContainer) estimateSize(
     172             :         meta *fileMetadata, lower, upper []byte,
     173           1 : ) (size uint64, err error) {
     174           1 :         if meta.Virtual {
     175           0 :                 err = c.withVirtualReader(
     176           0 :                         meta.VirtualMeta(),
     177           0 :                         func(r sstable.VirtualReader) (err error) {
     178           0 :                                 size, err = r.EstimateDiskUsage(lower, upper)
     179           0 :                                 return err
     180           0 :                         },
     181             :                 )
     182           1 :         } else {
     183           1 :                 err = c.withReader(
     184           1 :                         meta.PhysicalMeta(),
     185           1 :                         func(r *sstable.Reader) (err error) {
     186           1 :                                 size, err = r.EstimateDiskUsage(lower, upper)
     187           1 :                                 return err
     188           1 :                         },
     189             :                 )
     190             :         }
     191           1 :         if err != nil {
     192           0 :                 return 0, err
     193           0 :         }
     194           1 :         return size, nil
     195             : }
     196             : 
     197           1 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
     198           1 :         // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
     199           1 :         var cr sstable.CommonReader = v.reader
     200           1 :         if file.Virtual {
     201           1 :                 virtualReader := sstable.MakeVirtualReader(
     202           1 :                         v.reader, file.VirtualMeta(),
     203           1 :                 )
     204           1 :                 cr = &virtualReader
     205           1 :         }
     206           1 :         return cr
     207             : }
     208             : 
     209             : func (c *tableCacheContainer) withCommonReader(
     210             :         meta *fileMetadata, fn func(sstable.CommonReader) error,
     211           1 : ) error {
     212           1 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     213           1 :         v := s.findNode(meta, &c.dbOpts)
     214           1 :         defer s.unrefValue(v)
     215           1 :         if v.err != nil {
     216           0 :                 return v.err
     217           0 :         }
     218           1 :         return fn(createCommonReader(v, meta))
     219             : }
     220             : 
     221           1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
     222           1 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     223           1 :         v := s.findNode(meta.FileMetadata, &c.dbOpts)
     224           1 :         defer s.unrefValue(v)
     225           1 :         if v.err != nil {
     226           0 :                 return v.err
     227           0 :         }
     228           1 :         return fn(v.reader)
     229             : }
     230             : 
     231             : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
     232             : func (c *tableCacheContainer) withVirtualReader(
     233             :         meta virtualMeta, fn func(sstable.VirtualReader) error,
     234           1 : ) error {
     235           1 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     236           1 :         v := s.findNode(meta.FileMetadata, &c.dbOpts)
     237           1 :         defer s.unrefValue(v)
     238           1 :         if v.err != nil {
     239           0 :                 return v.err
     240           0 :         }
     241           1 :         return fn(sstable.MakeVirtualReader(v.reader, meta))
     242             : }
     243             : 
     244           1 : func (c *tableCacheContainer) iterCount() int64 {
     245           1 :         return int64(c.dbOpts.iterCount.Load())
     246           1 : }
     247             : 
     248             : // TableCache is a shareable cache for open sstables.
     249             : type TableCache struct {
     250             :         refs atomic.Int64
     251             : 
     252             :         cache  *Cache
     253             :         shards []*tableCacheShard
     254             : }
     255             : 
     256             : // Ref adds a reference to the table cache. Once tableCache.init returns,
     257             : // the table cache only remains valid if there is at least one reference
     258             : // to it.
     259           0 : func (c *TableCache) Ref() {
     260           0 :         v := c.refs.Add(1)
     261           0 :         // We don't want the reference count to ever go from 0 -> 1,
     262           0 :         // cause a reference count of 0 implies that we've closed the cache.
     263           0 :         if v <= 1 {
     264           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     265             :         }
     266             : }
     267             : 
     268             : // Unref removes a reference to the table cache.
     269           1 : func (c *TableCache) Unref() error {
     270           1 :         v := c.refs.Add(-1)
     271           1 :         switch {
     272           0 :         case v < 0:
     273           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     274           1 :         case v == 0:
     275           1 :                 var err error
     276           1 :                 for i := range c.shards {
     277           1 :                         // The cache shard is not allocated yet, nothing to close
     278           1 :                         if c.shards[i] == nil {
     279           0 :                                 continue
     280             :                         }
     281           1 :                         err = firstError(err, c.shards[i].Close())
     282             :                 }
     283             : 
     284             :                 // Unref the cache which we create a reference to when the tableCache
     285             :                 // is first instantiated.
     286           1 :                 c.cache.Unref()
     287           1 :                 return err
     288             :         }
     289           0 :         return nil
     290             : }
     291             : 
     292             : // NewTableCache will create a reference to the table cache. It is the callers responsibility
     293             : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
     294           1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
     295           1 :         if size == 0 {
     296           0 :                 panic("pebble: cannot create a table cache of size 0")
     297           1 :         } else if numShards == 0 {
     298           0 :                 panic("pebble: cannot create a table cache with 0 shards")
     299             :         }
     300             : 
     301           1 :         c := &TableCache{}
     302           1 :         c.cache = cache
     303           1 :         c.cache.Ref()
     304           1 : 
     305           1 :         c.shards = make([]*tableCacheShard, numShards)
     306           1 :         for i := range c.shards {
     307           1 :                 c.shards[i] = &tableCacheShard{}
     308           1 :                 c.shards[i].init(size / len(c.shards))
     309           1 :         }
     310             : 
     311             :         // Hold a ref to the cache here.
     312           1 :         c.refs.Store(1)
     313           1 : 
     314           1 :         return c
     315             : }
     316             : 
     317           1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
     318           1 :         return c.shards[uint64(fileNum.FileNum())%uint64(len(c.shards))]
     319           1 : }
     320             : 
     321             : type tableCacheKey struct {
     322             :         cacheID uint64
     323             :         fileNum base.DiskFileNum
     324             : }
     325             : 
     326             : type tableCacheShard struct {
     327             :         hits      atomic.Int64
     328             :         misses    atomic.Int64
     329             :         iterCount atomic.Int32
     330             : 
     331             :         size int
     332             : 
     333             :         mu struct {
     334             :                 sync.RWMutex
     335             :                 nodes map[tableCacheKey]*tableCacheNode
     336             :                 // The iters map is only created and populated in race builds.
     337             :                 iters map[io.Closer][]byte
     338             : 
     339             :                 handHot  *tableCacheNode
     340             :                 handCold *tableCacheNode
     341             :                 handTest *tableCacheNode
     342             : 
     343             :                 coldTarget int
     344             :                 sizeHot    int
     345             :                 sizeCold   int
     346             :                 sizeTest   int
     347             :         }
     348             :         releasing       sync.WaitGroup
     349             :         releasingCh     chan *tableCacheValue
     350             :         releaseLoopExit sync.WaitGroup
     351             : }
     352             : 
     353           1 : func (c *tableCacheShard) init(size int) {
     354           1 :         c.size = size
     355           1 : 
     356           1 :         c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
     357           1 :         c.mu.coldTarget = size
     358           1 :         c.releasingCh = make(chan *tableCacheValue, 100)
     359           1 :         c.releaseLoopExit.Add(1)
     360           1 :         go c.releaseLoop()
     361           1 : 
     362           1 :         if invariants.RaceEnabled {
     363           0 :                 c.mu.iters = make(map[io.Closer][]byte)
     364           0 :         }
     365             : }
     366             : 
     367           1 : func (c *tableCacheShard) releaseLoop() {
     368           1 :         pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
     369           1 :                 defer c.releaseLoopExit.Done()
     370           1 :                 for v := range c.releasingCh {
     371           0 :                         v.release(c)
     372           0 :                 }
     373             :         })
     374             : }
     375             : 
     376             : // checkAndIntersectFilters checks the specific table and block property filters
     377             : // for intersection with any available table and block-level properties. Returns
     378             : // true for ok if this table should be read by this iterator.
     379             : func (c *tableCacheShard) checkAndIntersectFilters(
     380             :         v *tableCacheValue,
     381             :         tableFilter func(userProps map[string]string) bool,
     382             :         blockPropertyFilters []BlockPropertyFilter,
     383             :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
     384           1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
     385           1 :         if tableFilter != nil &&
     386           1 :                 !tableFilter(v.reader.Properties.UserProperties) {
     387           0 :                 return false, nil, nil
     388           0 :         }
     389             : 
     390           1 :         if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
     391           1 :                 filterer, err = sstable.IntersectsTable(
     392           1 :                         blockPropertyFilters,
     393           1 :                         boundLimitedFilter,
     394           1 :                         v.reader.Properties.UserProperties,
     395           1 :                 )
     396           1 :                 // NB: IntersectsTable will return a nil filterer if the table-level
     397           1 :                 // properties indicate there's no intersection with the provided filters.
     398           1 :                 if filterer == nil || err != nil {
     399           1 :                         return false, nil, err
     400           1 :                 }
     401             :         }
     402           1 :         return true, filterer, nil
     403             : }
     404             : 
     405             : func (c *tableCacheShard) newIters(
     406             :         ctx context.Context,
     407             :         file *manifest.FileMetadata,
     408             :         opts *IterOptions,
     409             :         internalOpts internalIterOpts,
     410             :         dbOpts *tableCacheOpts,
     411           1 : ) (internalIterator, keyspan.FragmentIterator, error) {
     412           1 :         // TODO(sumeer): constructing the Reader should also use a plumbed context,
     413           1 :         // since parts of the sstable are read during the construction. The Reader
     414           1 :         // should not remember that context since the Reader can be long-lived.
     415           1 : 
     416           1 :         // Calling findNode gives us the responsibility of decrementing v's
     417           1 :         // refCount. If opening the underlying table resulted in error, then we
     418           1 :         // decrement this straight away. Otherwise, we pass that responsibility to
     419           1 :         // the sstable iterator, which decrements when it is closed.
     420           1 :         v := c.findNode(file, dbOpts)
     421           1 :         if v.err != nil {
     422           0 :                 defer c.unrefValue(v)
     423           0 :                 return nil, nil, v.err
     424           0 :         }
     425             : 
     426           1 :         hideObsoletePoints := false
     427           1 :         var pointKeyFilters []BlockPropertyFilter
     428           1 :         if opts != nil {
     429           1 :                 // This code is appending (at most one filter) in-place to
     430           1 :                 // opts.PointKeyFilters even though the slice is shared for iterators in
     431           1 :                 // the same iterator tree. This is acceptable since all the following
     432           1 :                 // properties are true:
     433           1 :                 // - The iterator tree is single threaded, so the shared backing for the
     434           1 :                 //   slice is being mutated in a single threaded manner.
     435           1 :                 // - Each shallow copy of the slice has its own notion of length.
     436           1 :                 // - The appended element is always the obsoleteKeyBlockPropertyFilter
     437           1 :                 //   struct, which is stateless, so overwriting that struct when creating
     438           1 :                 //   one sstable iterator is harmless to other sstable iterators that are
     439           1 :                 //   relying on that struct.
     440           1 :                 //
     441           1 :                 // An alternative would be to have different slices for different sstable
     442           1 :                 // iterators, but that requires more work to avoid allocations.
     443           1 :                 hideObsoletePoints, pointKeyFilters =
     444           1 :                         v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
     445           1 :                                 opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
     446           1 :         }
     447           1 :         ok := true
     448           1 :         var filterer *sstable.BlockPropertiesFilterer
     449           1 :         var err error
     450           1 :         if opts != nil {
     451           1 :                 ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
     452           1 :                         pointKeyFilters, internalOpts.boundLimitedFilter)
     453           1 :         }
     454           1 :         if err != nil {
     455           0 :                 c.unrefValue(v)
     456           0 :                 return nil, nil, err
     457           0 :         }
     458             : 
     459             :         // Note: This suffers an allocation for virtual sstables.
     460           1 :         cr := createCommonReader(v, file)
     461           1 : 
     462           1 :         provider := dbOpts.objProvider
     463           1 :         // Check if this file is a foreign file.
     464           1 :         objMeta, err := provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
     465           1 :         if err != nil {
     466           0 :                 return nil, nil, err
     467           0 :         }
     468             : 
     469             :         // NB: range-del iterator does not maintain a reference to the table, nor
     470             :         // does it need to read from it after creation.
     471           1 :         rangeDelIter, err := cr.NewRawRangeDelIter()
     472           1 :         if err != nil {
     473           0 :                 c.unrefValue(v)
     474           0 :                 return nil, nil, err
     475           0 :         }
     476             : 
     477           1 :         if !ok {
     478           1 :                 c.unrefValue(v)
     479           1 :                 // Return an empty iterator. This iterator has no mutable state, so
     480           1 :                 // using a singleton is fine.
     481           1 :                 // NB: We still return the potentially non-empty rangeDelIter. This
     482           1 :                 // ensures the iterator observes the file's range deletions even if the
     483           1 :                 // block property filters exclude all the file's point keys. The range
     484           1 :                 // deletions may still delete keys lower in the LSM in files that DO
     485           1 :                 // match the active filters.
     486           1 :                 //
     487           1 :                 // The point iterator returned must implement the filteredIter
     488           1 :                 // interface, so that the level iterator surfaces file boundaries when
     489           1 :                 // range deletions are present.
     490           1 :                 return filteredAll, rangeDelIter, err
     491           1 :         }
     492             : 
     493           1 :         var iter sstable.Iterator
     494           1 :         useFilter := true
     495           1 :         if opts != nil {
     496           1 :                 useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
     497           1 :                 ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
     498           1 :         }
     499           1 :         tableFormat, err := v.reader.TableFormat()
     500           1 :         if err != nil {
     501           0 :                 return nil, nil, err
     502           0 :         }
     503           1 :         var rp sstable.ReaderProvider
     504           1 :         if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
     505           1 :                 rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
     506           1 :         }
     507             : 
     508           1 :         if provider.IsSharedForeign(objMeta) {
     509           0 :                 if tableFormat < sstable.TableFormatPebblev4 {
     510           0 :                         return nil, nil, errors.New("pebble: shared foreign sstable has a lower table format than expected")
     511           0 :                 }
     512           0 :                 hideObsoletePoints = true
     513             :         }
     514           1 :         if internalOpts.bytesIterated != nil {
     515           1 :                 iter, err = cr.NewCompactionIter(internalOpts.bytesIterated, rp, internalOpts.bufferPool)
     516           1 :         } else {
     517           1 :                 iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
     518           1 :                         ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter,
     519           1 :                         internalOpts.stats, rp)
     520           1 :         }
     521           1 :         if err != nil {
     522           0 :                 if rangeDelIter != nil {
     523           0 :                         _ = rangeDelIter.Close()
     524           0 :                 }
     525           0 :                 c.unrefValue(v)
     526           0 :                 return nil, nil, err
     527             :         }
     528             :         // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
     529             :         // care to avoid introducing an allocation here by adding a closure.
     530           1 :         iter.SetCloseHook(v.closeHook)
     531           1 : 
     532           1 :         c.iterCount.Add(1)
     533           1 :         dbOpts.iterCount.Add(1)
     534           1 :         if invariants.RaceEnabled {
     535           0 :                 c.mu.Lock()
     536           0 :                 c.mu.iters[iter] = debug.Stack()
     537           0 :                 c.mu.Unlock()
     538           0 :         }
     539           1 :         return iter, rangeDelIter, nil
     540             : }
     541             : 
     542             : func (c *tableCacheShard) newRangeKeyIter(
     543             :         file *manifest.FileMetadata, opts keyspan.SpanIterOptions, dbOpts *tableCacheOpts,
     544           1 : ) (keyspan.FragmentIterator, error) {
     545           1 :         // Calling findNode gives us the responsibility of decrementing v's
     546           1 :         // refCount. If opening the underlying table resulted in error, then we
     547           1 :         // decrement this straight away. Otherwise, we pass that responsibility to
     548           1 :         // the sstable iterator, which decrements when it is closed.
     549           1 :         v := c.findNode(file, dbOpts)
     550           1 :         if v.err != nil {
     551           0 :                 defer c.unrefValue(v)
     552           0 :                 return nil, v.err
     553           0 :         }
     554             : 
     555           1 :         ok := true
     556           1 :         var err error
     557           1 :         // Don't filter a table's range keys if the file contains RANGEKEYDELs.
     558           1 :         // The RANGEKEYDELs may delete range keys in other levels. Skipping the
     559           1 :         // file's range key blocks may surface deleted range keys below. This is
     560           1 :         // done here, rather than deferring to the block-property collector in order
     561           1 :         // to maintain parity with point keys and the treatment of RANGEDELs.
     562           1 :         if v.reader.Properties.NumRangeKeyDels == 0 {
     563           1 :                 ok, _, err = c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil)
     564           1 :         }
     565           1 :         if err != nil {
     566           0 :                 c.unrefValue(v)
     567           0 :                 return nil, err
     568           0 :         }
     569           1 :         if !ok {
     570           0 :                 c.unrefValue(v)
     571           0 :                 // Return the empty iterator. This iterator has no mutable state, so
     572           0 :                 // using a singleton is fine.
     573           0 :                 return emptyKeyspanIter, err
     574           0 :         }
     575             : 
     576           1 :         var iter keyspan.FragmentIterator
     577           1 :         if file.Virtual {
     578           1 :                 virtualReader := sstable.MakeVirtualReader(
     579           1 :                         v.reader, file.VirtualMeta(),
     580           1 :                 )
     581           1 :                 iter, err = virtualReader.NewRawRangeKeyIter()
     582           1 :         } else {
     583           1 :                 iter, err = v.reader.NewRawRangeKeyIter()
     584           1 :         }
     585             : 
     586             :         // iter is a block iter that holds the entire value of the block in memory.
     587             :         // No need to hold onto a ref of the cache value.
     588           1 :         c.unrefValue(v)
     589           1 : 
     590           1 :         if err != nil {
     591           0 :                 return nil, err
     592           0 :         }
     593             : 
     594           1 :         if iter == nil {
     595           0 :                 // NewRawRangeKeyIter can return nil even if there's no error. However,
     596           0 :                 // the keyspan.LevelIter expects a non-nil iterator if err is nil.
     597           0 :                 return emptyKeyspanIter, nil
     598           0 :         }
     599             : 
     600           1 :         return iter, nil
     601             : }
     602             : 
     603             : type tableCacheShardReaderProvider struct {
     604             :         c      *tableCacheShard
     605             :         file   *manifest.FileMetadata
     606             :         dbOpts *tableCacheOpts
     607             :         v      *tableCacheValue
     608             : }
     609             : 
     610             : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
     611             : 
     612             : // GetReader implements sstable.ReaderProvider. Note that it is not the
     613             : // responsibility of tableCacheShardReaderProvider to ensure that the file
     614             : // continues to exist. The ReaderProvider is used in iterators where the
     615             : // top-level iterator is pinning the read state and preventing the files from
     616             : // being deleted.
     617             : //
     618             : // The caller must call tableCacheShardReaderProvider.Close.
     619             : //
     620             : // Note that currently the Reader returned here is only used to read value
     621             : // blocks. This reader shouldn't be used for other purposes like reading keys
     622             : // outside of virtual sstable bounds.
     623             : //
     624             : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
     625             : // that the reader isn't used for other purposes.
     626           1 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
     627           1 :         // Calling findNode gives us the responsibility of decrementing v's
     628           1 :         // refCount.
     629           1 :         v := rp.c.findNode(rp.file, rp.dbOpts)
     630           1 :         if v.err != nil {
     631           0 :                 defer rp.c.unrefValue(v)
     632           0 :                 return nil, v.err
     633           0 :         }
     634           1 :         rp.v = v
     635           1 :         return v.reader, nil
     636             : }
     637             : 
     638             : // Close implements sstable.ReaderProvider.
     639           1 : func (rp *tableCacheShardReaderProvider) Close() {
     640           1 :         rp.c.unrefValue(rp.v)
     641           1 :         rp.v = nil
     642           1 : }
     643             : 
     644             : // getTableProperties return sst table properties for target file
     645             : func (c *tableCacheShard) getTableProperties(
     646             :         file *fileMetadata, dbOpts *tableCacheOpts,
     647           0 : ) (*sstable.Properties, error) {
     648           0 :         // Calling findNode gives us the responsibility of decrementing v's refCount here
     649           0 :         v := c.findNode(file, dbOpts)
     650           0 :         defer c.unrefValue(v)
     651           0 : 
     652           0 :         if v.err != nil {
     653           0 :                 return nil, v.err
     654           0 :         }
     655           0 :         return &v.reader.Properties, nil
     656             : }
     657             : 
     658             : // releaseNode releases a node from the tableCacheShard.
     659             : //
     660             : // c.mu must be held when calling this.
     661           0 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
     662           0 :         c.unlinkNode(n)
     663           0 :         c.clearNode(n)
     664           0 : }
     665             : 
     666             : // unlinkNode removes a node from the tableCacheShard, leaving the shard
     667             : // reference in place.
     668             : //
     669             : // c.mu must be held when calling this.
     670           1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
     671           1 :         key := tableCacheKey{n.cacheID, n.fileNum}
     672           1 :         delete(c.mu.nodes, key)
     673           1 : 
     674           1 :         switch n.ptype {
     675           0 :         case tableCacheNodeHot:
     676           0 :                 c.mu.sizeHot--
     677           1 :         case tableCacheNodeCold:
     678           1 :                 c.mu.sizeCold--
     679           0 :         case tableCacheNodeTest:
     680           0 :                 c.mu.sizeTest--
     681             :         }
     682             : 
     683           1 :         if n == c.mu.handHot {
     684           1 :                 c.mu.handHot = c.mu.handHot.prev()
     685           1 :         }
     686           1 :         if n == c.mu.handCold {
     687           1 :                 c.mu.handCold = c.mu.handCold.prev()
     688           1 :         }
     689           1 :         if n == c.mu.handTest {
     690           1 :                 c.mu.handTest = c.mu.handTest.prev()
     691           1 :         }
     692             : 
     693           1 :         if n.unlink() == n {
     694           1 :                 // This was the last entry in the cache.
     695           1 :                 c.mu.handHot = nil
     696           1 :                 c.mu.handCold = nil
     697           1 :                 c.mu.handTest = nil
     698           1 :         }
     699             : 
     700           1 :         n.links.prev = nil
     701           1 :         n.links.next = nil
     702             : }
     703             : 
     704           0 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
     705           0 :         if v := n.value; v != nil {
     706           0 :                 n.value = nil
     707           0 :                 c.unrefValue(v)
     708           0 :         }
     709             : }
     710             : 
     711             : // unrefValue decrements the reference count for the specified value, releasing
     712             : // it if the reference count fell to 0. Note that the value has a reference if
     713             : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
     714             : // the node has already been removed from that map.
     715           1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
     716           1 :         if v.refCount.Add(-1) == 0 {
     717           0 :                 c.releasing.Add(1)
     718           0 :                 c.releasingCh <- v
     719           0 :         }
     720             : }
     721             : 
     722             : // findNode returns the node for the table with the given file number, creating
     723             : // that node if it didn't already exist. The caller is responsible for
     724             : // decrementing the returned node's refCount.
     725             : func (c *tableCacheShard) findNode(
     726             :         meta *fileMetadata, dbOpts *tableCacheOpts,
     727           1 : ) (v *tableCacheValue) {
     728           1 :         // Loading a file before its global sequence number is known (eg,
     729           1 :         // during ingest before entering the commit pipeline) can pollute
     730           1 :         // the cache with incorrect state. In invariant builds, verify
     731           1 :         // that the global sequence number of the returned reader matches.
     732           1 :         if invariants.Enabled {
     733           1 :                 defer func() {
     734           1 :                         if v.reader != nil && meta.LargestSeqNum == meta.SmallestSeqNum &&
     735           1 :                                 v.reader.Properties.GlobalSeqNum != meta.SmallestSeqNum {
     736           0 :                                 panic(errors.AssertionFailedf("file %s loaded from table cache with the wrong global sequence number %d",
     737           0 :                                         meta, v.reader.Properties.GlobalSeqNum))
     738             :                         }
     739             :                 }()
     740             :         }
     741           1 :         if refs := meta.Refs(); refs <= 0 {
     742           0 :                 panic(errors.AssertionFailedf("attempting to load file %s with refs=%d from table cache",
     743           0 :                         meta, refs))
     744             :         }
     745             : 
     746             :         // Fast-path for a hit in the cache.
     747           1 :         c.mu.RLock()
     748           1 :         key := tableCacheKey{dbOpts.cacheID, meta.FileBacking.DiskFileNum}
     749           1 :         if n := c.mu.nodes[key]; n != nil && n.value != nil {
     750           1 :                 // Fast-path hit.
     751           1 :                 //
     752           1 :                 // The caller is responsible for decrementing the refCount.
     753           1 :                 v = n.value
     754           1 :                 v.refCount.Add(1)
     755           1 :                 c.mu.RUnlock()
     756           1 :                 n.referenced.Store(true)
     757           1 :                 c.hits.Add(1)
     758           1 :                 <-v.loaded
     759           1 :                 return v
     760           1 :         }
     761           1 :         c.mu.RUnlock()
     762           1 : 
     763           1 :         c.mu.Lock()
     764           1 : 
     765           1 :         n := c.mu.nodes[key]
     766           1 :         switch {
     767           1 :         case n == nil:
     768           1 :                 // Slow-path miss of a non-existent node.
     769           1 :                 n = &tableCacheNode{
     770           1 :                         fileNum: meta.FileBacking.DiskFileNum,
     771           1 :                         ptype:   tableCacheNodeCold,
     772           1 :                 }
     773           1 :                 c.addNode(n, dbOpts)
     774           1 :                 c.mu.sizeCold++
     775             : 
     776           1 :         case n.value != nil:
     777           1 :                 // Slow-path hit of a hot or cold node.
     778           1 :                 //
     779           1 :                 // The caller is responsible for decrementing the refCount.
     780           1 :                 v = n.value
     781           1 :                 v.refCount.Add(1)
     782           1 :                 n.referenced.Store(true)
     783           1 :                 c.hits.Add(1)
     784           1 :                 c.mu.Unlock()
     785           1 :                 <-v.loaded
     786           1 :                 return v
     787             : 
     788           0 :         default:
     789           0 :                 // Slow-path miss of a test node.
     790           0 :                 c.unlinkNode(n)
     791           0 :                 c.mu.coldTarget++
     792           0 :                 if c.mu.coldTarget > c.size {
     793           0 :                         c.mu.coldTarget = c.size
     794           0 :                 }
     795             : 
     796           0 :                 n.referenced.Store(false)
     797           0 :                 n.ptype = tableCacheNodeHot
     798           0 :                 c.addNode(n, dbOpts)
     799           0 :                 c.mu.sizeHot++
     800             :         }
     801             : 
     802           1 :         c.misses.Add(1)
     803           1 : 
     804           1 :         v = &tableCacheValue{
     805           1 :                 loaded: make(chan struct{}),
     806           1 :         }
     807           1 :         v.refCount.Store(2)
     808           1 :         // Cache the closure invoked when an iterator is closed. This avoids an
     809           1 :         // allocation on every call to newIters.
     810           1 :         v.closeHook = func(i sstable.Iterator) error {
     811           1 :                 if invariants.RaceEnabled {
     812           0 :                         c.mu.Lock()
     813           0 :                         delete(c.mu.iters, i)
     814           0 :                         c.mu.Unlock()
     815           0 :                 }
     816           1 :                 c.unrefValue(v)
     817           1 :                 c.iterCount.Add(-1)
     818           1 :                 dbOpts.iterCount.Add(-1)
     819           1 :                 return nil
     820             :         }
     821           1 :         n.value = v
     822           1 : 
     823           1 :         c.mu.Unlock()
     824           1 : 
     825           1 :         // Note adding to the cache lists must complete before we begin loading the
     826           1 :         // table as a failure during load will result in the node being unlinked.
     827           1 :         pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
     828           1 :                 v.load(
     829           1 :                         loadInfo{
     830           1 :                                 backingFileNum: meta.FileBacking.DiskFileNum,
     831           1 :                                 smallestSeqNum: meta.SmallestSeqNum,
     832           1 :                                 largestSeqNum:  meta.LargestSeqNum,
     833           1 :                         }, c, dbOpts)
     834           1 :         })
     835           1 :         return v
     836             : }
     837             : 
     838           1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
     839           1 :         c.evictNodes()
     840           1 :         n.cacheID = dbOpts.cacheID
     841           1 :         key := tableCacheKey{n.cacheID, n.fileNum}
     842           1 :         c.mu.nodes[key] = n
     843           1 : 
     844           1 :         n.links.next = n
     845           1 :         n.links.prev = n
     846           1 :         if c.mu.handHot == nil {
     847           1 :                 // First element.
     848           1 :                 c.mu.handHot = n
     849           1 :                 c.mu.handCold = n
     850           1 :                 c.mu.handTest = n
     851           1 :         } else {
     852           1 :                 c.mu.handHot.link(n)
     853           1 :         }
     854             : 
     855           1 :         if c.mu.handCold == c.mu.handHot {
     856           1 :                 c.mu.handCold = c.mu.handCold.prev()
     857           1 :         }
     858             : }
     859             : 
     860           1 : func (c *tableCacheShard) evictNodes() {
     861           1 :         for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
     862           0 :                 c.runHandCold()
     863           0 :         }
     864             : }
     865             : 
     866           0 : func (c *tableCacheShard) runHandCold() {
     867           0 :         n := c.mu.handCold
     868           0 :         if n.ptype == tableCacheNodeCold {
     869           0 :                 if n.referenced.Load() {
     870           0 :                         n.referenced.Store(false)
     871           0 :                         n.ptype = tableCacheNodeHot
     872           0 :                         c.mu.sizeCold--
     873           0 :                         c.mu.sizeHot++
     874           0 :                 } else {
     875           0 :                         c.clearNode(n)
     876           0 :                         n.ptype = tableCacheNodeTest
     877           0 :                         c.mu.sizeCold--
     878           0 :                         c.mu.sizeTest++
     879           0 :                         for c.size < c.mu.sizeTest && c.mu.handTest != nil {
     880           0 :                                 c.runHandTest()
     881           0 :                         }
     882             :                 }
     883             :         }
     884             : 
     885           0 :         c.mu.handCold = c.mu.handCold.next()
     886           0 : 
     887           0 :         for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
     888           0 :                 c.runHandHot()
     889           0 :         }
     890             : }
     891             : 
     892           0 : func (c *tableCacheShard) runHandHot() {
     893           0 :         if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
     894           0 :                 c.runHandTest()
     895           0 :                 if c.mu.handHot == nil {
     896           0 :                         return
     897           0 :                 }
     898             :         }
     899             : 
     900           0 :         n := c.mu.handHot
     901           0 :         if n.ptype == tableCacheNodeHot {
     902           0 :                 if n.referenced.Load() {
     903           0 :                         n.referenced.Store(false)
     904           0 :                 } else {
     905           0 :                         n.ptype = tableCacheNodeCold
     906           0 :                         c.mu.sizeHot--
     907           0 :                         c.mu.sizeCold++
     908           0 :                 }
     909             :         }
     910             : 
     911           0 :         c.mu.handHot = c.mu.handHot.next()
     912             : }
     913             : 
     914           0 : func (c *tableCacheShard) runHandTest() {
     915           0 :         if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
     916           0 :                 c.runHandCold()
     917           0 :                 if c.mu.handTest == nil {
     918           0 :                         return
     919           0 :                 }
     920             :         }
     921             : 
     922           0 :         n := c.mu.handTest
     923           0 :         if n.ptype == tableCacheNodeTest {
     924           0 :                 c.mu.coldTarget--
     925           0 :                 if c.mu.coldTarget < 0 {
     926           0 :                         c.mu.coldTarget = 0
     927           0 :                 }
     928           0 :                 c.unlinkNode(n)
     929           0 :                 c.clearNode(n)
     930             :         }
     931             : 
     932           0 :         c.mu.handTest = c.mu.handTest.next()
     933             : }
     934             : 
     935           1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
     936           1 :         c.mu.Lock()
     937           1 :         key := tableCacheKey{dbOpts.cacheID, fileNum}
     938           1 :         n := c.mu.nodes[key]
     939           1 :         var v *tableCacheValue
     940           1 :         if n != nil {
     941           1 :                 // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
     942           1 :                 // the tableCacheNode.release() call synchronously below to ensure the
     943           1 :                 // sstable file descriptor is closed before returning. Note that
     944           1 :                 // tableCacheShard.releasing needs to be incremented while holding
     945           1 :                 // tableCacheShard.mu in order to avoid a race with Close()
     946           1 :                 c.unlinkNode(n)
     947           1 :                 v = n.value
     948           1 :                 if v != nil {
     949           1 :                         if !allowLeak {
     950           1 :                                 if t := v.refCount.Add(-1); t != 0 {
     951           0 :                                         dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
     952           0 :                                 }
     953             :                         }
     954           1 :                         c.releasing.Add(1)
     955             :                 }
     956             :         }
     957             : 
     958           1 :         c.mu.Unlock()
     959           1 : 
     960           1 :         if v != nil {
     961           1 :                 v.release(c)
     962           1 :         }
     963             : 
     964           1 :         dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
     965             : }
     966             : 
     967             : // removeDB evicts any nodes which have a reference to the DB
     968             : // associated with dbOpts.cacheID. Make sure that there will
     969             : // be no more accesses to the files associated with the DB.
     970           1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
     971           1 :         var fileNums []base.DiskFileNum
     972           1 : 
     973           1 :         c.mu.RLock()
     974           1 :         // Collect the fileNums which need to be cleaned.
     975           1 :         var firstNode *tableCacheNode
     976           1 :         node := c.mu.handHot
     977           1 :         for node != firstNode {
     978           1 :                 if firstNode == nil {
     979           1 :                         firstNode = node
     980           1 :                 }
     981             : 
     982           1 :                 if node.cacheID == dbOpts.cacheID {
     983           1 :                         fileNums = append(fileNums, node.fileNum)
     984           1 :                 }
     985           1 :                 node = node.next()
     986             :         }
     987           1 :         c.mu.RUnlock()
     988           1 : 
     989           1 :         // Evict all the nodes associated with the DB.
     990           1 :         // This should synchronously close all the files
     991           1 :         // associated with the DB.
     992           1 :         for _, fileNum := range fileNums {
     993           1 :                 c.evict(fileNum, dbOpts, true)
     994           1 :         }
     995             : }
     996             : 
     997           1 : func (c *tableCacheShard) Close() error {
     998           1 :         c.mu.Lock()
     999           1 :         defer c.mu.Unlock()
    1000           1 : 
    1001           1 :         // Check for leaked iterators. Note that we'll still perform cleanup below in
    1002           1 :         // the case that there are leaked iterators.
    1003           1 :         var err error
    1004           1 :         if v := c.iterCount.Load(); v > 0 {
    1005           0 :                 if !invariants.RaceEnabled {
    1006           0 :                         err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
    1007           0 :                 } else {
    1008           0 :                         var buf bytes.Buffer
    1009           0 :                         for _, stack := range c.mu.iters {
    1010           0 :                                 fmt.Fprintf(&buf, "%s\n", stack)
    1011           0 :                         }
    1012           0 :                         err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
    1013             :                 }
    1014             :         }
    1015             : 
    1016           1 :         for c.mu.handHot != nil {
    1017           0 :                 n := c.mu.handHot
    1018           0 :                 if n.value != nil {
    1019           0 :                         if n.value.refCount.Add(-1) == 0 {
    1020           0 :                                 c.releasing.Add(1)
    1021           0 :                                 c.releasingCh <- n.value
    1022           0 :                         }
    1023             :                 }
    1024           0 :                 c.unlinkNode(n)
    1025             :         }
    1026           1 :         c.mu.nodes = nil
    1027           1 :         c.mu.handHot = nil
    1028           1 :         c.mu.handCold = nil
    1029           1 :         c.mu.handTest = nil
    1030           1 : 
    1031           1 :         // Only shutdown the releasing goroutine if there were no leaked
    1032           1 :         // iterators. If there were leaked iterators, we leave the goroutine running
    1033           1 :         // and the releasingCh open so that a subsequent iterator close can
    1034           1 :         // complete. This behavior is used by iterator leak tests. Leaking the
    1035           1 :         // goroutine for these tests is less bad not closing the iterator which
    1036           1 :         // triggers other warnings about block cache handles not being released.
    1037           1 :         if err != nil {
    1038           0 :                 c.releasing.Wait()
    1039           0 :                 return err
    1040           0 :         }
    1041             : 
    1042           1 :         close(c.releasingCh)
    1043           1 :         c.releasing.Wait()
    1044           1 :         c.releaseLoopExit.Wait()
    1045           1 :         return err
    1046             : }
    1047             : 
    1048             : type tableCacheValue struct {
    1049             :         closeHook func(i sstable.Iterator) error
    1050             :         reader    *sstable.Reader
    1051             :         err       error
    1052             :         loaded    chan struct{}
    1053             :         // Reference count for the value. The reader is closed when the reference
    1054             :         // count drops to zero.
    1055             :         refCount atomic.Int32
    1056             : }
    1057             : 
    1058             : type loadInfo struct {
    1059             :         backingFileNum base.DiskFileNum
    1060             :         largestSeqNum  uint64
    1061             :         smallestSeqNum uint64
    1062             : }
    1063             : 
    1064           1 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
    1065           1 :         // Try opening the file first.
    1066           1 :         var f objstorage.Readable
    1067           1 :         var err error
    1068           1 :         f, err = dbOpts.objProvider.OpenForReading(
    1069           1 :                 context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
    1070           1 :         )
    1071           1 :         if err == nil {
    1072           1 :                 cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
    1073           1 :                 v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
    1074           1 :         }
    1075           1 :         if err != nil {
    1076           0 :                 v.err = errors.Wrapf(
    1077           0 :                         err, "pebble: backing file %s error", errors.Safe(loadInfo.backingFileNum.FileNum()))
    1078           0 :         }
    1079           1 :         if v.err == nil && loadInfo.smallestSeqNum == loadInfo.largestSeqNum {
    1080           1 :                 v.reader.Properties.GlobalSeqNum = loadInfo.largestSeqNum
    1081           1 :         }
    1082           1 :         if v.err != nil {
    1083           0 :                 c.mu.Lock()
    1084           0 :                 defer c.mu.Unlock()
    1085           0 :                 // Lookup the node in the cache again as it might have already been
    1086           0 :                 // removed.
    1087           0 :                 key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
    1088           0 :                 n := c.mu.nodes[key]
    1089           0 :                 if n != nil && n.value == v {
    1090           0 :                         c.releaseNode(n)
    1091           0 :                 }
    1092             :         }
    1093           1 :         close(v.loaded)
    1094             : }
    1095             : 
    1096           1 : func (v *tableCacheValue) release(c *tableCacheShard) {
    1097           1 :         <-v.loaded
    1098           1 :         // Nothing to be done about an error at this point. Close the reader if it is
    1099           1 :         // open.
    1100           1 :         if v.reader != nil {
    1101           1 :                 _ = v.reader.Close()
    1102           1 :         }
    1103           1 :         c.releasing.Done()
    1104             : }
    1105             : 
    1106             : type tableCacheNodeType int8
    1107             : 
    1108             : const (
    1109             :         tableCacheNodeTest tableCacheNodeType = iota
    1110             :         tableCacheNodeCold
    1111             :         tableCacheNodeHot
    1112             : )
    1113             : 
    1114           0 : func (p tableCacheNodeType) String() string {
    1115           0 :         switch p {
    1116           0 :         case tableCacheNodeTest:
    1117           0 :                 return "test"
    1118           0 :         case tableCacheNodeCold:
    1119           0 :                 return "cold"
    1120           0 :         case tableCacheNodeHot:
    1121           0 :                 return "hot"
    1122             :         }
    1123           0 :         return "unknown"
    1124             : }
    1125             : 
    1126             : type tableCacheNode struct {
    1127             :         fileNum base.DiskFileNum
    1128             :         value   *tableCacheValue
    1129             : 
    1130             :         links struct {
    1131             :                 next *tableCacheNode
    1132             :                 prev *tableCacheNode
    1133             :         }
    1134             :         ptype tableCacheNodeType
    1135             :         // referenced is atomically set to indicate that this entry has been accessed
    1136             :         // since the last time one of the clock hands swept it.
    1137             :         referenced atomic.Bool
    1138             : 
    1139             :         // Storing the cache id associated with the DB instance here
    1140             :         // avoids the need to thread the dbOpts struct through many functions.
    1141             :         cacheID uint64
    1142             : }
    1143             : 
    1144           1 : func (n *tableCacheNode) next() *tableCacheNode {
    1145           1 :         if n == nil {
    1146           0 :                 return nil
    1147           0 :         }
    1148           1 :         return n.links.next
    1149             : }
    1150             : 
    1151           1 : func (n *tableCacheNode) prev() *tableCacheNode {
    1152           1 :         if n == nil {
    1153           0 :                 return nil
    1154           0 :         }
    1155           1 :         return n.links.prev
    1156             : }
    1157             : 
    1158           1 : func (n *tableCacheNode) link(s *tableCacheNode) {
    1159           1 :         s.links.prev = n.links.prev
    1160           1 :         s.links.prev.links.next = s
    1161           1 :         s.links.next = n
    1162           1 :         s.links.next.links.prev = s
    1163           1 : }
    1164             : 
    1165           1 : func (n *tableCacheNode) unlink() *tableCacheNode {
    1166           1 :         next := n.links.next
    1167           1 :         n.links.prev.links.next = n.links.next
    1168           1 :         n.links.next.links.prev = n.links.prev
    1169           1 :         n.links.prev = n
    1170           1 :         n.links.next = n
    1171           1 :         return next
    1172           1 : }

Generated by: LCOV version 1.14