LCOV - code coverage report
Current view: top level - pebble - table_cache.go (source / functions) Hit Total Coverage
Test: 2024-04-12 08:15Z d253ff7b - tests + meta.lcov Lines: 721 797 90.5 %
Date: 2024-04-12 08:17:19 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "fmt"
      11             :         "io"
      12             :         "runtime/debug"
      13             :         "runtime/pprof"
      14             :         "sync"
      15             :         "sync/atomic"
      16             :         "unsafe"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble/internal/base"
      20             :         "github.com/cockroachdb/pebble/internal/invariants"
      21             :         "github.com/cockroachdb/pebble/internal/keyspan"
      22             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      23             :         "github.com/cockroachdb/pebble/internal/manifest"
      24             :         "github.com/cockroachdb/pebble/internal/private"
      25             :         "github.com/cockroachdb/pebble/objstorage"
      26             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      27             :         "github.com/cockroachdb/pebble/sstable"
      28             : )
      29             : 
      30             : var emptyIter = &errorIter{err: nil}
      31             : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
      32             : 
      33             : // tableNewIters creates new iterators (point, range deletion and/or range key)
      34             : // for the given file metadata. Which of the various iterator kinds the user is
      35             : // requesting is specified with the iterKinds bitmap.
      36             : //
      37             : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
      38             : // populated with iterators.
      39             : //
      40             : // If a point iterator is requested and the operation was successful,
      41             : // iters.point is guaranteed to be non-nil and must be closed when the caller is
      42             : // finished.
      43             : //
      44             : // If a range deletion or range key iterator is requested, the corresponding
      45             : // iterator may be nil if the table does not contain any keys of the
      46             : // corresponding kind. The returned iterSet type provides RangeDeletion() and
      47             : // RangeKey() convenience methods that return non-nil empty iterators that may
      48             : // be used if the caller requires a non-nil iterator.
      49             : //
      50             : // On error, all iterators are nil.
      51             : //
      52             : // The only (non-test) implementation of tableNewIters is
      53             : // tableCacheContainer.newIters().
      54             : type tableNewIters func(
      55             :         ctx context.Context,
      56             :         file *manifest.FileMetadata,
      57             :         opts *IterOptions,
      58             :         internalOpts internalIterOpts,
      59             :         kinds iterKinds,
      60             : ) (iterSet, error)
      61             : 
      62             : // TODO implements the old tableNewIters interface that always attempted to open
      63             : // a point iterator and a range deletion iterator. All call sites should be
      64             : // updated to use `f` directly.
      65             : func (f tableNewIters) TODO(
      66             :         ctx context.Context,
      67             :         file *manifest.FileMetadata,
      68             :         opts *IterOptions,
      69             :         internalOpts internalIterOpts,
      70           2 : ) (internalIterator, keyspan.FragmentIterator, error) {
      71           2 :         iters, err := f(ctx, file, opts, internalOpts, iterPointKeys|iterRangeDeletions)
      72           2 :         if err != nil {
      73           1 :                 return nil, nil, err
      74           1 :         }
      75           2 :         return iters.point, iters.rangeDeletion, nil
      76             : }
      77             : 
      78             : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
      79             : // for the rangedel iterator returned by tableNewIters.
      80             : func tableNewRangeDelIter(
      81             :         ctx context.Context, newIters tableNewIters,
      82           2 : ) keyspanimpl.TableNewSpanIter {
      83           2 :         return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      84           2 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
      85           2 :                 if err != nil {
      86           0 :                         return nil, err
      87           0 :                 }
      88           2 :                 return iters.RangeDeletion(), nil
      89             :         }
      90             : }
      91             : 
      92             : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
      93             : // for the range key iterator returned by tableNewIters.
      94             : func tableNewRangeKeyIter(
      95             :         ctx context.Context, newIters tableNewIters,
      96           2 : ) keyspanimpl.TableNewSpanIter {
      97           2 :         return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      98           2 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
      99           2 :                 if err != nil {
     100           1 :                         return nil, err
     101           1 :                 }
     102           2 :                 return iters.RangeKey(), nil
     103             :         }
     104             : }
     105             : 
     106             : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
     107             : 
     108             : // tableCacheOpts contains the db specific fields
     109             : // of a table cache. This is stored in the tableCacheContainer
     110             : // along with the table cache.
     111             : // NB: It is important to make sure that the fields in this
     112             : // struct are read-only. Since the fields here are shared
     113             : // by every single tableCacheShard, if non read-only fields
     114             : // are updated, we could have unnecessary evictions of those
     115             : // fields, and the surrounding fields from the CPU caches.
     116             : type tableCacheOpts struct {
     117             :         // iterCount keeps track of how many iterators are open. It is used to keep
     118             :         // track of leaked iterators on a per-db level.
     119             :         iterCount *atomic.Int32
     120             : 
     121             :         loggerAndTracer   LoggerAndTracer
     122             :         cacheID           uint64
     123             :         objProvider       objstorage.Provider
     124             :         opts              sstable.ReaderOptions
     125             :         filterMetrics     *sstable.FilterMetricsTracker
     126             :         sstStatsCollector *sstable.CategoryStatsCollector
     127             : }
     128             : 
     129             : // tableCacheContainer contains the table cache and
     130             : // fields which are unique to the DB.
     131             : type tableCacheContainer struct {
     132             :         tableCache *TableCache
     133             : 
     134             :         // dbOpts contains fields relevant to the table cache
     135             :         // which are unique to each DB.
     136             :         dbOpts tableCacheOpts
     137             : }
     138             : 
     139             : // newTableCacheContainer will panic if the underlying cache in the table cache
     140             : // doesn't match Options.Cache.
     141             : func newTableCacheContainer(
     142             :         tc *TableCache,
     143             :         cacheID uint64,
     144             :         objProvider objstorage.Provider,
     145             :         opts *Options,
     146             :         size int,
     147             :         sstStatsCollector *sstable.CategoryStatsCollector,
     148           2 : ) *tableCacheContainer {
     149           2 :         // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
     150           2 :         if tc != nil {
     151           1 :                 if tc.cache != opts.Cache {
     152           0 :                         panic("pebble: underlying cache for the table cache and db are different")
     153             :                 }
     154           1 :                 tc.Ref()
     155           2 :         } else {
     156           2 :                 // NewTableCache should create a ref to tc which the container should
     157           2 :                 // drop whenever it is closed.
     158           2 :                 tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
     159           2 :         }
     160             : 
     161           2 :         t := &tableCacheContainer{}
     162           2 :         t.tableCache = tc
     163           2 :         t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
     164           2 :         t.dbOpts.cacheID = cacheID
     165           2 :         t.dbOpts.objProvider = objProvider
     166           2 :         t.dbOpts.opts = opts.MakeReaderOptions()
     167           2 :         t.dbOpts.filterMetrics = &sstable.FilterMetricsTracker{}
     168           2 :         t.dbOpts.iterCount = new(atomic.Int32)
     169           2 :         t.dbOpts.sstStatsCollector = sstStatsCollector
     170           2 :         return t
     171             : }
     172             : 
     173             : // Before calling close, make sure that there will be no further need
     174             : // to access any of the files associated with the store.
     175           2 : func (c *tableCacheContainer) close() error {
     176           2 :         // We want to do some cleanup work here. Check for leaked iterators
     177           2 :         // by the DB using this container. Note that we'll still perform cleanup
     178           2 :         // below in the case that there are leaked iterators.
     179           2 :         var err error
     180           2 :         if v := c.dbOpts.iterCount.Load(); v > 0 {
     181           1 :                 err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
     182           1 :         }
     183             : 
     184             :         // Release nodes here.
     185           2 :         for _, shard := range c.tableCache.shards {
     186           2 :                 if shard != nil {
     187           2 :                         shard.removeDB(&c.dbOpts)
     188           2 :                 }
     189             :         }
     190           2 :         return firstError(err, c.tableCache.Unref())
     191             : }
     192             : 
     193             : func (c *tableCacheContainer) newIters(
     194             :         ctx context.Context,
     195             :         file *manifest.FileMetadata,
     196             :         opts *IterOptions,
     197             :         internalOpts internalIterOpts,
     198             :         kinds iterKinds,
     199           2 : ) (iterSet, error) {
     200           2 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
     201           2 : }
     202             : 
     203             : // getTableProperties returns the properties associated with the backing physical
     204             : // table if the input metadata belongs to a virtual sstable.
     205           1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
     206           1 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
     207           1 : }
     208             : 
     209           2 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
     210           2 :         c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
     211           2 : }
     212             : 
     213           2 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
     214           2 :         var m CacheMetrics
     215           2 :         for i := range c.tableCache.shards {
     216           2 :                 s := c.tableCache.shards[i]
     217           2 :                 s.mu.RLock()
     218           2 :                 m.Count += int64(len(s.mu.nodes))
     219           2 :                 s.mu.RUnlock()
     220           2 :                 m.Hits += s.hits.Load()
     221           2 :                 m.Misses += s.misses.Load()
     222           2 :         }
     223           2 :         m.Size = m.Count * int64(unsafe.Sizeof(sstable.Reader{}))
     224           2 :         f := c.dbOpts.filterMetrics.Load()
     225           2 :         return m, f
     226             : }
     227             : 
     228             : func (c *tableCacheContainer) estimateSize(
     229             :         meta *fileMetadata, lower, upper []byte,
     230           2 : ) (size uint64, err error) {
     231           2 :         if meta.Virtual {
     232           2 :                 err = c.withVirtualReader(
     233           2 :                         meta.VirtualMeta(),
     234           2 :                         func(r sstable.VirtualReader) (err error) {
     235           2 :                                 size, err = r.EstimateDiskUsage(lower, upper)
     236           2 :                                 return err
     237           2 :                         },
     238             :                 )
     239           2 :         } else {
     240           2 :                 err = c.withReader(
     241           2 :                         meta.PhysicalMeta(),
     242           2 :                         func(r *sstable.Reader) (err error) {
     243           2 :                                 size, err = r.EstimateDiskUsage(lower, upper)
     244           2 :                                 return err
     245           2 :                         },
     246             :                 )
     247             :         }
     248           2 :         if err != nil {
     249           0 :                 return 0, err
     250           0 :         }
     251           2 :         return size, nil
     252             : }
     253             : 
     254             : // createCommonReader creates a Reader for this file.
     255           2 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
     256           2 :         // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
     257           2 :         var cr sstable.CommonReader = v.reader
     258           2 :         if file.Virtual {
     259           2 :                 virtualReader := sstable.MakeVirtualReader(
     260           2 :                         v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
     261           2 :                 )
     262           2 :                 cr = &virtualReader
     263           2 :         }
     264           2 :         return cr
     265             : }
     266             : 
     267             : func (c *tableCacheContainer) withCommonReader(
     268             :         meta *fileMetadata, fn func(sstable.CommonReader) error,
     269           2 : ) error {
     270           2 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     271           2 :         v := s.findNode(meta.FileBacking, &c.dbOpts)
     272           2 :         defer s.unrefValue(v)
     273           2 :         if v.err != nil {
     274           0 :                 return v.err
     275           0 :         }
     276           2 :         return fn(createCommonReader(v, meta))
     277             : }
     278             : 
     279           2 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
     280           2 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     281           2 :         v := s.findNode(meta.FileBacking, &c.dbOpts)
     282           2 :         defer s.unrefValue(v)
     283           2 :         if v.err != nil {
     284           1 :                 return v.err
     285           1 :         }
     286           2 :         return fn(v.reader)
     287             : }
     288             : 
     289             : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
     290             : func (c *tableCacheContainer) withVirtualReader(
     291             :         meta virtualMeta, fn func(sstable.VirtualReader) error,
     292           2 : ) error {
     293           2 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     294           2 :         v := s.findNode(meta.FileBacking, &c.dbOpts)
     295           2 :         defer s.unrefValue(v)
     296           2 :         if v.err != nil {
     297           0 :                 return v.err
     298           0 :         }
     299           2 :         provider := c.dbOpts.objProvider
     300           2 :         objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
     301           2 :         if err != nil {
     302           0 :                 return err
     303           0 :         }
     304           2 :         return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
     305             : }
     306             : 
     307           2 : func (c *tableCacheContainer) iterCount() int64 {
     308           2 :         return int64(c.dbOpts.iterCount.Load())
     309           2 : }
     310             : 
     311             : // TableCache is a shareable cache for open sstables.
     312             : type TableCache struct {
     313             :         refs atomic.Int64
     314             : 
     315             :         cache  *Cache
     316             :         shards []*tableCacheShard
     317             : }
     318             : 
     319             : // Ref adds a reference to the table cache. Once tableCache.init returns,
     320             : // the table cache only remains valid if there is at least one reference
     321             : // to it.
     322           1 : func (c *TableCache) Ref() {
     323           1 :         v := c.refs.Add(1)
     324           1 :         // We don't want the reference count to ever go from 0 -> 1,
     325           1 :         // cause a reference count of 0 implies that we've closed the cache.
     326           1 :         if v <= 1 {
     327           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     328             :         }
     329             : }
     330             : 
     331             : // Unref removes a reference to the table cache.
     332           2 : func (c *TableCache) Unref() error {
     333           2 :         v := c.refs.Add(-1)
     334           2 :         switch {
     335           1 :         case v < 0:
     336           1 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     337           2 :         case v == 0:
     338           2 :                 var err error
     339           2 :                 for i := range c.shards {
     340           2 :                         // The cache shard is not allocated yet, nothing to close
     341           2 :                         if c.shards[i] == nil {
     342           0 :                                 continue
     343             :                         }
     344           2 :                         err = firstError(err, c.shards[i].Close())
     345             :                 }
     346             : 
     347             :                 // Unref the cache which we create a reference to when the tableCache
     348             :                 // is first instantiated.
     349           2 :                 c.cache.Unref()
     350           2 :                 return err
     351             :         }
     352           1 :         return nil
     353             : }
     354             : 
     355             : // NewTableCache will create a reference to the table cache. It is the callers responsibility
     356             : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
     357           2 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
     358           2 :         if size == 0 {
     359           0 :                 panic("pebble: cannot create a table cache of size 0")
     360           2 :         } else if numShards == 0 {
     361           0 :                 panic("pebble: cannot create a table cache with 0 shards")
     362             :         }
     363             : 
     364           2 :         c := &TableCache{}
     365           2 :         c.cache = cache
     366           2 :         c.cache.Ref()
     367           2 : 
     368           2 :         c.shards = make([]*tableCacheShard, numShards)
     369           2 :         for i := range c.shards {
     370           2 :                 c.shards[i] = &tableCacheShard{}
     371           2 :                 c.shards[i].init(size / len(c.shards))
     372           2 :         }
     373             : 
     374             :         // Hold a ref to the cache here.
     375           2 :         c.refs.Store(1)
     376           2 : 
     377           2 :         return c
     378             : }
     379             : 
     380           2 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
     381           2 :         return c.shards[uint64(fileNum)%uint64(len(c.shards))]
     382           2 : }
     383             : 
     384             : type tableCacheKey struct {
     385             :         cacheID uint64
     386             :         fileNum base.DiskFileNum
     387             : }
     388             : 
     389             : type tableCacheShard struct {
     390             :         hits      atomic.Int64
     391             :         misses    atomic.Int64
     392             :         iterCount atomic.Int32
     393             : 
     394             :         size int
     395             : 
     396             :         mu struct {
     397             :                 sync.RWMutex
     398             :                 nodes map[tableCacheKey]*tableCacheNode
     399             :                 // The iters map is only created and populated in race builds.
     400             :                 iters map[io.Closer][]byte
     401             : 
     402             :                 handHot  *tableCacheNode
     403             :                 handCold *tableCacheNode
     404             :                 handTest *tableCacheNode
     405             : 
     406             :                 coldTarget int
     407             :                 sizeHot    int
     408             :                 sizeCold   int
     409             :                 sizeTest   int
     410             :         }
     411             :         releasing       sync.WaitGroup
     412             :         releasingCh     chan *tableCacheValue
     413             :         releaseLoopExit sync.WaitGroup
     414             : }
     415             : 
     416           2 : func (c *tableCacheShard) init(size int) {
     417           2 :         c.size = size
     418           2 : 
     419           2 :         c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
     420           2 :         c.mu.coldTarget = size
     421           2 :         c.releasingCh = make(chan *tableCacheValue, 100)
     422           2 :         c.releaseLoopExit.Add(1)
     423           2 :         go c.releaseLoop()
     424           2 : 
     425           2 :         if invariants.RaceEnabled {
     426           0 :                 c.mu.iters = make(map[io.Closer][]byte)
     427           0 :         }
     428             : }
     429             : 
     430           2 : func (c *tableCacheShard) releaseLoop() {
     431           2 :         pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
     432           2 :                 defer c.releaseLoopExit.Done()
     433           2 :                 for v := range c.releasingCh {
     434           2 :                         v.release(c)
     435           2 :                 }
     436             :         })
     437             : }
     438             : 
     439             : // checkAndIntersectFilters checks the specific table and block property filters
     440             : // for intersection with any available table and block-level properties. Returns
     441             : // true for ok if this table should be read by this iterator.
     442             : func (c *tableCacheShard) checkAndIntersectFilters(
     443             :         v *tableCacheValue,
     444             :         tableFilter func(userProps map[string]string) bool,
     445             :         blockPropertyFilters []BlockPropertyFilter,
     446             :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
     447             :         syntheticSuffix sstable.SyntheticSuffix,
     448           2 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
     449           2 :         if tableFilter != nil &&
     450           2 :                 !tableFilter(v.reader.Properties.UserProperties) {
     451           0 :                 return false, nil, nil
     452           0 :         }
     453             : 
     454           2 :         if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
     455           2 :                 filterer, err = sstable.IntersectsTable(
     456           2 :                         blockPropertyFilters,
     457           2 :                         boundLimitedFilter,
     458           2 :                         v.reader.Properties.UserProperties,
     459           2 :                         syntheticSuffix,
     460           2 :                 )
     461           2 :                 // NB: IntersectsTable will return a nil filterer if the table-level
     462           2 :                 // properties indicate there's no intersection with the provided filters.
     463           2 :                 if filterer == nil || err != nil {
     464           2 :                         return false, nil, err
     465           2 :                 }
     466             :         }
     467           2 :         return true, filterer, nil
     468             : }
     469             : 
     470             : func (c *tableCacheShard) newIters(
     471             :         ctx context.Context,
     472             :         file *manifest.FileMetadata,
     473             :         opts *IterOptions,
     474             :         internalOpts internalIterOpts,
     475             :         dbOpts *tableCacheOpts,
     476             :         kinds iterKinds,
     477           2 : ) (iterSet, error) {
     478           2 :         // TODO(sumeer): constructing the Reader should also use a plumbed context,
     479           2 :         // since parts of the sstable are read during the construction. The Reader
     480           2 :         // should not remember that context since the Reader can be long-lived.
     481           2 : 
     482           2 :         // Calling findNode gives us the responsibility of decrementing v's
     483           2 :         // refCount. If opening the underlying table resulted in error, then we
     484           2 :         // decrement this straight away. Otherwise, we pass that responsibility to
     485           2 :         // the sstable iterator, which decrements when it is closed.
     486           2 :         v := c.findNode(file.FileBacking, dbOpts)
     487           2 :         if v.err != nil {
     488           1 :                 defer c.unrefValue(v)
     489           1 :                 return iterSet{}, v.err
     490           1 :         }
     491             : 
     492             :         // Note: This suffers an allocation for virtual sstables.
     493           2 :         cr := createCommonReader(v, file)
     494           2 :         var iters iterSet
     495           2 :         var err error
     496           2 :         if kinds.RangeKey() && file.HasRangeKeys {
     497           2 :                 iters.rangeKey, err = c.newRangeKeyIter(v, file, cr, opts.SpanIterOptions())
     498           2 :         }
     499           2 :         if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
     500           2 :                 iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
     501           2 :         }
     502           2 :         if kinds.Point() && err == nil {
     503           2 :                 iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
     504           2 :         }
     505           2 :         if err != nil {
     506           1 :                 // NB: There's a subtlety here: Because the point iterator is the last
     507           1 :                 // iterator we attempt to create, it's not possible for:
     508           1 :                 //   err != nil && iters.point != nil
     509           1 :                 // If it were possible, we'd need to account for it to avoid double
     510           1 :                 // unref-ing here, once during CloseAll and once during `unrefValue`.
     511           1 :                 iters.CloseAll()
     512           1 :                 c.unrefValue(v)
     513           1 :                 return iterSet{}, err
     514           1 :         }
     515             :         // Only point iterators ever require the reader stay pinned in the cache. If
     516             :         // we're not returning a point iterator to the caller, we need to unref v.
     517           2 :         if iters.point == nil {
     518           2 :                 c.unrefValue(v)
     519           2 :         }
     520           2 :         return iters, nil
     521             : }
     522             : 
     523             : // newPointIter is an internal helper that constructs a point iterator over a
     524             : // sstable. This function is for internal use only, and callers should use
     525             : // newIters instead.
     526             : func (c *tableCacheShard) newPointIter(
     527             :         ctx context.Context,
     528             :         v *tableCacheValue,
     529             :         file *manifest.FileMetadata,
     530             :         cr sstable.CommonReader,
     531             :         opts *IterOptions,
     532             :         internalOpts internalIterOpts,
     533             :         dbOpts *tableCacheOpts,
     534           2 : ) (internalIterator, error) {
     535           2 :         var (
     536           2 :                 hideObsoletePoints bool = false
     537           2 :                 pointKeyFilters    []BlockPropertyFilter
     538           2 :                 filterer           *sstable.BlockPropertiesFilterer
     539           2 :         )
     540           2 :         if opts != nil {
     541           2 :                 // This code is appending (at most one filter) in-place to
     542           2 :                 // opts.PointKeyFilters even though the slice is shared for iterators in
     543           2 :                 // the same iterator tree. This is acceptable since all the following
     544           2 :                 // properties are true:
     545           2 :                 // - The iterator tree is single threaded, so the shared backing for the
     546           2 :                 //   slice is being mutated in a single threaded manner.
     547           2 :                 // - Each shallow copy of the slice has its own notion of length.
     548           2 :                 // - The appended element is always the obsoleteKeyBlockPropertyFilter
     549           2 :                 //   struct, which is stateless, so overwriting that struct when creating
     550           2 :                 //   one sstable iterator is harmless to other sstable iterators that are
     551           2 :                 //   relying on that struct.
     552           2 :                 //
     553           2 :                 // An alternative would be to have different slices for different sstable
     554           2 :                 // iterators, but that requires more work to avoid allocations.
     555           2 :                 //
     556           2 :                 // TODO(bilal): for compaction reads of foreign sstables, we do hide
     557           2 :                 // obsolete points (see sstable.Reader.newCompactionIter) but we don't
     558           2 :                 // apply the obsolete block property filter. We could optimize this by
     559           2 :                 // applying the filter.
     560           2 :                 hideObsoletePoints, pointKeyFilters =
     561           2 :                         v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
     562           2 :                                 opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
     563           2 : 
     564           2 :                 var ok bool
     565           2 :                 var err error
     566           2 :                 ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter,
     567           2 :                         pointKeyFilters, internalOpts.boundLimitedFilter, file.SyntheticSuffix)
     568           2 :                 if err != nil {
     569           0 :                         return nil, err
     570           2 :                 } else if !ok {
     571           2 :                         // No point keys within the table match the filters.
     572           2 :                         return nil, nil
     573           2 :                 }
     574             :         }
     575             : 
     576           2 :         var iter sstable.Iterator
     577           2 :         useFilter := true
     578           2 :         if opts != nil {
     579           2 :                 useFilter = manifest.LevelToInt(opts.level) != 6 || opts.UseL6Filters
     580           2 :                 ctx = objiotracing.WithLevel(ctx, manifest.LevelToInt(opts.level))
     581           2 :         }
     582           2 :         tableFormat, err := v.reader.TableFormat()
     583           2 :         if err != nil {
     584           0 :                 return nil, err
     585           0 :         }
     586           2 :         var rp sstable.ReaderProvider
     587           2 :         if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
     588           2 :                 rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
     589           2 :         }
     590             : 
     591           2 :         if v.isShared && file.SyntheticSeqNum() != 0 {
     592           2 :                 if tableFormat < sstable.TableFormatPebblev4 {
     593           0 :                         return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
     594           0 :                 }
     595             :                 // The table is shared and ingested.
     596           2 :                 hideObsoletePoints = true
     597             :         }
     598           2 :         transforms := file.IterTransforms()
     599           2 :         transforms.HideObsoletePoints = hideObsoletePoints
     600           2 :         var categoryAndQoS sstable.CategoryAndQoS
     601           2 :         if opts != nil {
     602           2 :                 categoryAndQoS = opts.CategoryAndQoS
     603           2 :         }
     604           2 :         if internalOpts.bytesIterated != nil {
     605           2 :                 iter, err = cr.NewCompactionIter(
     606           2 :                         transforms, internalOpts.bytesIterated, categoryAndQoS, dbOpts.sstStatsCollector, rp,
     607           2 :                         internalOpts.bufferPool)
     608           2 :         } else {
     609           2 :                 iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(
     610           2 :                         ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, useFilter,
     611           2 :                         internalOpts.stats, categoryAndQoS, dbOpts.sstStatsCollector, rp)
     612           2 :         }
     613           2 :         if err != nil {
     614           1 :                 return nil, err
     615           1 :         }
     616             :         // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
     617             :         // care to avoid introducing an allocation here by adding a closure.
     618           2 :         iter.SetCloseHook(v.closeHook)
     619           2 :         c.iterCount.Add(1)
     620           2 :         dbOpts.iterCount.Add(1)
     621           2 :         if invariants.RaceEnabled {
     622           0 :                 c.mu.Lock()
     623           0 :                 c.mu.iters[iter] = debug.Stack()
     624           0 :                 c.mu.Unlock()
     625           0 :         }
     626           2 :         return iter, nil
     627             : }
     628             : 
     629             : // newRangeDelIter is an internal helper that constructs an iterator over a
     630             : // sstable's range deletions. This function is for table-cache internal use
     631             : // only, and callers should use newIters instead.
     632             : func (c *tableCacheShard) newRangeDelIter(
     633             :         ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *tableCacheOpts,
     634           2 : ) (keyspan.FragmentIterator, error) {
     635           2 :         // NB: range-del iterator does not maintain a reference to the table, nor
     636           2 :         // does it need to read from it after creation.
     637           2 :         rangeDelIter, err := cr.NewRawRangeDelIter(file.IterTransforms())
     638           2 :         if err != nil {
     639           1 :                 return nil, err
     640           1 :         }
     641             :         // Assert expected bounds in tests.
     642           2 :         if invariants.Enabled && rangeDelIter != nil {
     643           2 :                 cmp := base.DefaultComparer.Compare
     644           2 :                 if dbOpts.opts.Comparer != nil {
     645           2 :                         cmp = dbOpts.opts.Comparer.Compare
     646           2 :                 }
     647           2 :                 rangeDelIter = keyspan.AssertBounds(
     648           2 :                         rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
     649           2 :                 )
     650             :         }
     651           2 :         return rangeDelIter, nil
     652             : }
     653             : 
     654             : // newRangeKeyIter is an internal helper that constructs an iterator over a
     655             : // sstable's range keys. This function is for table-cache internal use only, and
     656             : // callers should use newIters instead.
     657             : func (c *tableCacheShard) newRangeKeyIter(
     658             :         v *tableCacheValue, file *fileMetadata, cr sstable.CommonReader, opts keyspan.SpanIterOptions,
     659           2 : ) (keyspan.FragmentIterator, error) {
     660           2 :         transforms := file.IterTransforms()
     661           2 :         // Don't filter a table's range keys if the file contains RANGEKEYDELs.
     662           2 :         // The RANGEKEYDELs may delete range keys in other levels. Skipping the
     663           2 :         // file's range key blocks may surface deleted range keys below. This is
     664           2 :         // done here, rather than deferring to the block-property collector in order
     665           2 :         // to maintain parity with point keys and the treatment of RANGEDELs.
     666           2 :         if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
     667           0 :                 ok, _, err := c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix)
     668           0 :                 if err != nil {
     669           0 :                         return nil, err
     670           0 :                 } else if !ok {
     671           0 :                         return nil, nil
     672           0 :                 }
     673             :         }
     674             :         // TODO(radu): wrap in an AssertBounds.
     675           2 :         return cr.NewRawRangeKeyIter(transforms)
     676             : }
     677             : 
     678             : type tableCacheShardReaderProvider struct {
     679             :         c      *tableCacheShard
     680             :         file   *manifest.FileMetadata
     681             :         dbOpts *tableCacheOpts
     682             :         v      *tableCacheValue
     683             : }
     684             : 
     685             : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
     686             : 
     687             : // GetReader implements sstable.ReaderProvider. Note that it is not the
     688             : // responsibility of tableCacheShardReaderProvider to ensure that the file
     689             : // continues to exist. The ReaderProvider is used in iterators where the
     690             : // top-level iterator is pinning the read state and preventing the files from
     691             : // being deleted.
     692             : //
     693             : // The caller must call tableCacheShardReaderProvider.Close.
     694             : //
     695             : // Note that currently the Reader returned here is only used to read value
     696             : // blocks. This reader shouldn't be used for other purposes like reading keys
     697             : // outside of virtual sstable bounds.
     698             : //
     699             : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
     700             : // that the reader isn't used for other purposes.
     701           2 : func (rp *tableCacheShardReaderProvider) GetReader() (*sstable.Reader, error) {
     702           2 :         // Calling findNode gives us the responsibility of decrementing v's
     703           2 :         // refCount.
     704           2 :         v := rp.c.findNode(rp.file.FileBacking, rp.dbOpts)
     705           2 :         if v.err != nil {
     706           0 :                 defer rp.c.unrefValue(v)
     707           0 :                 return nil, v.err
     708           0 :         }
     709           2 :         rp.v = v
     710           2 :         return v.reader, nil
     711             : }
     712             : 
     713             : // Close implements sstable.ReaderProvider.
     714           2 : func (rp *tableCacheShardReaderProvider) Close() {
     715           2 :         rp.c.unrefValue(rp.v)
     716           2 :         rp.v = nil
     717           2 : }
     718             : 
     719             : // getTableProperties return sst table properties for target file
     720             : func (c *tableCacheShard) getTableProperties(
     721             :         file *fileMetadata, dbOpts *tableCacheOpts,
     722           1 : ) (*sstable.Properties, error) {
     723           1 :         // Calling findNode gives us the responsibility of decrementing v's refCount here
     724           1 :         v := c.findNode(file.FileBacking, dbOpts)
     725           1 :         defer c.unrefValue(v)
     726           1 : 
     727           1 :         if v.err != nil {
     728           0 :                 return nil, v.err
     729           0 :         }
     730           1 :         return &v.reader.Properties, nil
     731             : }
     732             : 
     733             : // releaseNode releases a node from the tableCacheShard.
     734             : //
     735             : // c.mu must be held when calling this.
     736           1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
     737           1 :         c.unlinkNode(n)
     738           1 :         c.clearNode(n)
     739           1 : }
     740             : 
     741             : // unlinkNode removes a node from the tableCacheShard, leaving the shard
     742             : // reference in place.
     743             : //
     744             : // c.mu must be held when calling this.
     745           2 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
     746           2 :         key := tableCacheKey{n.cacheID, n.fileNum}
     747           2 :         delete(c.mu.nodes, key)
     748           2 : 
     749           2 :         switch n.ptype {
     750           2 :         case tableCacheNodeHot:
     751           2 :                 c.mu.sizeHot--
     752           2 :         case tableCacheNodeCold:
     753           2 :                 c.mu.sizeCold--
     754           2 :         case tableCacheNodeTest:
     755           2 :                 c.mu.sizeTest--
     756             :         }
     757             : 
     758           2 :         if n == c.mu.handHot {
     759           2 :                 c.mu.handHot = c.mu.handHot.prev()
     760           2 :         }
     761           2 :         if n == c.mu.handCold {
     762           2 :                 c.mu.handCold = c.mu.handCold.prev()
     763           2 :         }
     764           2 :         if n == c.mu.handTest {
     765           2 :                 c.mu.handTest = c.mu.handTest.prev()
     766           2 :         }
     767             : 
     768           2 :         if n.unlink() == n {
     769           2 :                 // This was the last entry in the cache.
     770           2 :                 c.mu.handHot = nil
     771           2 :                 c.mu.handCold = nil
     772           2 :                 c.mu.handTest = nil
     773           2 :         }
     774             : 
     775           2 :         n.links.prev = nil
     776           2 :         n.links.next = nil
     777             : }
     778             : 
     779           2 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
     780           2 :         if v := n.value; v != nil {
     781           2 :                 n.value = nil
     782           2 :                 c.unrefValue(v)
     783           2 :         }
     784             : }
     785             : 
     786             : // unrefValue decrements the reference count for the specified value, releasing
     787             : // it if the reference count fell to 0. Note that the value has a reference if
     788             : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
     789             : // the node has already been removed from that map.
     790           2 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
     791           2 :         if v.refCount.Add(-1) == 0 {
     792           2 :                 c.releasing.Add(1)
     793           2 :                 c.releasingCh <- v
     794           2 :         }
     795             : }
     796             : 
     797             : // findNode returns the node for the table with the given file number, creating
     798             : // that node if it didn't already exist. The caller is responsible for
     799             : // decrementing the returned node's refCount.
     800           2 : func (c *tableCacheShard) findNode(b *fileBacking, dbOpts *tableCacheOpts) *tableCacheValue {
     801           2 :         // The backing must have a positive refcount (otherwise it could be deleted at any time).
     802           2 :         b.MustHaveRefs()
     803           2 :         // Caution! Here fileMetadata can be a physical or virtual table. Table cache
     804           2 :         // readers are associated with the physical backings. All virtual tables with
     805           2 :         // the same backing will use the same reader from the cache; so no information
     806           2 :         // that can differ among these virtual tables can be plumbed into loadInfo.
     807           2 :         info := loadInfo{
     808           2 :                 backingFileNum: b.DiskFileNum,
     809           2 :         }
     810           2 : 
     811           2 :         return c.findNodeInternal(info, dbOpts)
     812           2 : }
     813             : 
     814             : func (c *tableCacheShard) findNodeInternal(
     815             :         loadInfo loadInfo, dbOpts *tableCacheOpts,
     816           2 : ) *tableCacheValue {
     817           2 :         // Fast-path for a hit in the cache.
     818           2 :         c.mu.RLock()
     819           2 :         key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
     820           2 :         if n := c.mu.nodes[key]; n != nil && n.value != nil {
     821           2 :                 // Fast-path hit.
     822           2 :                 //
     823           2 :                 // The caller is responsible for decrementing the refCount.
     824           2 :                 v := n.value
     825           2 :                 v.refCount.Add(1)
     826           2 :                 c.mu.RUnlock()
     827           2 :                 n.referenced.Store(true)
     828           2 :                 c.hits.Add(1)
     829           2 :                 <-v.loaded
     830           2 :                 return v
     831           2 :         }
     832           2 :         c.mu.RUnlock()
     833           2 : 
     834           2 :         c.mu.Lock()
     835           2 : 
     836           2 :         n := c.mu.nodes[key]
     837           2 :         switch {
     838           2 :         case n == nil:
     839           2 :                 // Slow-path miss of a non-existent node.
     840           2 :                 n = &tableCacheNode{
     841           2 :                         fileNum: loadInfo.backingFileNum,
     842           2 :                         ptype:   tableCacheNodeCold,
     843           2 :                 }
     844           2 :                 c.addNode(n, dbOpts)
     845           2 :                 c.mu.sizeCold++
     846             : 
     847           2 :         case n.value != nil:
     848           2 :                 // Slow-path hit of a hot or cold node.
     849           2 :                 //
     850           2 :                 // The caller is responsible for decrementing the refCount.
     851           2 :                 v := n.value
     852           2 :                 v.refCount.Add(1)
     853           2 :                 n.referenced.Store(true)
     854           2 :                 c.hits.Add(1)
     855           2 :                 c.mu.Unlock()
     856           2 :                 <-v.loaded
     857           2 :                 return v
     858             : 
     859           2 :         default:
     860           2 :                 // Slow-path miss of a test node.
     861           2 :                 c.unlinkNode(n)
     862           2 :                 c.mu.coldTarget++
     863           2 :                 if c.mu.coldTarget > c.size {
     864           1 :                         c.mu.coldTarget = c.size
     865           1 :                 }
     866             : 
     867           2 :                 n.referenced.Store(false)
     868           2 :                 n.ptype = tableCacheNodeHot
     869           2 :                 c.addNode(n, dbOpts)
     870           2 :                 c.mu.sizeHot++
     871             :         }
     872             : 
     873           2 :         c.misses.Add(1)
     874           2 : 
     875           2 :         v := &tableCacheValue{
     876           2 :                 loaded: make(chan struct{}),
     877           2 :         }
     878           2 :         v.refCount.Store(2)
     879           2 :         // Cache the closure invoked when an iterator is closed. This avoids an
     880           2 :         // allocation on every call to newIters.
     881           2 :         v.closeHook = func(i sstable.Iterator) error {
     882           2 :                 if invariants.RaceEnabled {
     883           0 :                         c.mu.Lock()
     884           0 :                         delete(c.mu.iters, i)
     885           0 :                         c.mu.Unlock()
     886           0 :                 }
     887           2 :                 c.unrefValue(v)
     888           2 :                 c.iterCount.Add(-1)
     889           2 :                 dbOpts.iterCount.Add(-1)
     890           2 :                 return nil
     891             :         }
     892           2 :         n.value = v
     893           2 : 
     894           2 :         c.mu.Unlock()
     895           2 : 
     896           2 :         // Note adding to the cache lists must complete before we begin loading the
     897           2 :         // table as a failure during load will result in the node being unlinked.
     898           2 :         pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
     899           2 :                 v.load(loadInfo, c, dbOpts)
     900           2 :         })
     901           2 :         return v
     902             : }
     903             : 
     904           2 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
     905           2 :         c.evictNodes()
     906           2 :         n.cacheID = dbOpts.cacheID
     907           2 :         key := tableCacheKey{n.cacheID, n.fileNum}
     908           2 :         c.mu.nodes[key] = n
     909           2 : 
     910           2 :         n.links.next = n
     911           2 :         n.links.prev = n
     912           2 :         if c.mu.handHot == nil {
     913           2 :                 // First element.
     914           2 :                 c.mu.handHot = n
     915           2 :                 c.mu.handCold = n
     916           2 :                 c.mu.handTest = n
     917           2 :         } else {
     918           2 :                 c.mu.handHot.link(n)
     919           2 :         }
     920             : 
     921           2 :         if c.mu.handCold == c.mu.handHot {
     922           2 :                 c.mu.handCold = c.mu.handCold.prev()
     923           2 :         }
     924             : }
     925             : 
     926           2 : func (c *tableCacheShard) evictNodes() {
     927           2 :         for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
     928           2 :                 c.runHandCold()
     929           2 :         }
     930             : }
     931             : 
     932           2 : func (c *tableCacheShard) runHandCold() {
     933           2 :         n := c.mu.handCold
     934           2 :         if n.ptype == tableCacheNodeCold {
     935           2 :                 if n.referenced.Load() {
     936           2 :                         n.referenced.Store(false)
     937           2 :                         n.ptype = tableCacheNodeHot
     938           2 :                         c.mu.sizeCold--
     939           2 :                         c.mu.sizeHot++
     940           2 :                 } else {
     941           2 :                         c.clearNode(n)
     942           2 :                         n.ptype = tableCacheNodeTest
     943           2 :                         c.mu.sizeCold--
     944           2 :                         c.mu.sizeTest++
     945           2 :                         for c.size < c.mu.sizeTest && c.mu.handTest != nil {
     946           1 :                                 c.runHandTest()
     947           1 :                         }
     948             :                 }
     949             :         }
     950             : 
     951           2 :         c.mu.handCold = c.mu.handCold.next()
     952           2 : 
     953           2 :         for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
     954           2 :                 c.runHandHot()
     955           2 :         }
     956             : }
     957             : 
     958           2 : func (c *tableCacheShard) runHandHot() {
     959           2 :         if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
     960           2 :                 c.runHandTest()
     961           2 :                 if c.mu.handHot == nil {
     962           0 :                         return
     963           0 :                 }
     964             :         }
     965             : 
     966           2 :         n := c.mu.handHot
     967           2 :         if n.ptype == tableCacheNodeHot {
     968           2 :                 if n.referenced.Load() {
     969           2 :                         n.referenced.Store(false)
     970           2 :                 } else {
     971           2 :                         n.ptype = tableCacheNodeCold
     972           2 :                         c.mu.sizeHot--
     973           2 :                         c.mu.sizeCold++
     974           2 :                 }
     975             :         }
     976             : 
     977           2 :         c.mu.handHot = c.mu.handHot.next()
     978             : }
     979             : 
     980           2 : func (c *tableCacheShard) runHandTest() {
     981           2 :         if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
     982           2 :                 c.runHandCold()
     983           2 :                 if c.mu.handTest == nil {
     984           0 :                         return
     985           0 :                 }
     986             :         }
     987             : 
     988           2 :         n := c.mu.handTest
     989           2 :         if n.ptype == tableCacheNodeTest {
     990           2 :                 c.mu.coldTarget--
     991           2 :                 if c.mu.coldTarget < 0 {
     992           1 :                         c.mu.coldTarget = 0
     993           1 :                 }
     994           2 :                 c.unlinkNode(n)
     995           2 :                 c.clearNode(n)
     996             :         }
     997             : 
     998           2 :         c.mu.handTest = c.mu.handTest.next()
     999             : }
    1000             : 
    1001           2 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
    1002           2 :         c.mu.Lock()
    1003           2 :         key := tableCacheKey{dbOpts.cacheID, fileNum}
    1004           2 :         n := c.mu.nodes[key]
    1005           2 :         var v *tableCacheValue
    1006           2 :         if n != nil {
    1007           2 :                 // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
    1008           2 :                 // the tableCacheNode.release() call synchronously below to ensure the
    1009           2 :                 // sstable file descriptor is closed before returning. Note that
    1010           2 :                 // tableCacheShard.releasing needs to be incremented while holding
    1011           2 :                 // tableCacheShard.mu in order to avoid a race with Close()
    1012           2 :                 c.unlinkNode(n)
    1013           2 :                 v = n.value
    1014           2 :                 if v != nil {
    1015           2 :                         if !allowLeak {
    1016           2 :                                 if t := v.refCount.Add(-1); t != 0 {
    1017           0 :                                         dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
    1018           0 :                                 }
    1019             :                         }
    1020           2 :                         c.releasing.Add(1)
    1021             :                 }
    1022             :         }
    1023             : 
    1024           2 :         c.mu.Unlock()
    1025           2 : 
    1026           2 :         if v != nil {
    1027           2 :                 v.release(c)
    1028           2 :         }
    1029             : 
    1030           2 :         dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
    1031             : }
    1032             : 
    1033             : // removeDB evicts any nodes which have a reference to the DB
    1034             : // associated with dbOpts.cacheID. Make sure that there will
    1035             : // be no more accesses to the files associated with the DB.
    1036           2 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
    1037           2 :         var fileNums []base.DiskFileNum
    1038           2 : 
    1039           2 :         c.mu.RLock()
    1040           2 :         // Collect the fileNums which need to be cleaned.
    1041           2 :         var firstNode *tableCacheNode
    1042           2 :         node := c.mu.handHot
    1043           2 :         for node != firstNode {
    1044           2 :                 if firstNode == nil {
    1045           2 :                         firstNode = node
    1046           2 :                 }
    1047             : 
    1048           2 :                 if node.cacheID == dbOpts.cacheID {
    1049           2 :                         fileNums = append(fileNums, node.fileNum)
    1050           2 :                 }
    1051           2 :                 node = node.next()
    1052             :         }
    1053           2 :         c.mu.RUnlock()
    1054           2 : 
    1055           2 :         // Evict all the nodes associated with the DB.
    1056           2 :         // This should synchronously close all the files
    1057           2 :         // associated with the DB.
    1058           2 :         for _, fileNum := range fileNums {
    1059           2 :                 c.evict(fileNum, dbOpts, true)
    1060           2 :         }
    1061             : }
    1062             : 
    1063           2 : func (c *tableCacheShard) Close() error {
    1064           2 :         c.mu.Lock()
    1065           2 :         defer c.mu.Unlock()
    1066           2 : 
    1067           2 :         // Check for leaked iterators. Note that we'll still perform cleanup below in
    1068           2 :         // the case that there are leaked iterators.
    1069           2 :         var err error
    1070           2 :         if v := c.iterCount.Load(); v > 0 {
    1071           1 :                 if !invariants.RaceEnabled {
    1072           1 :                         err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
    1073           1 :                 } else {
    1074           0 :                         var buf bytes.Buffer
    1075           0 :                         for _, stack := range c.mu.iters {
    1076           0 :                                 fmt.Fprintf(&buf, "%s\n", stack)
    1077           0 :                         }
    1078           0 :                         err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
    1079             :                 }
    1080             :         }
    1081             : 
    1082           2 :         for c.mu.handHot != nil {
    1083           0 :                 n := c.mu.handHot
    1084           0 :                 if n.value != nil {
    1085           0 :                         if n.value.refCount.Add(-1) == 0 {
    1086           0 :                                 c.releasing.Add(1)
    1087           0 :                                 c.releasingCh <- n.value
    1088           0 :                         }
    1089             :                 }
    1090           0 :                 c.unlinkNode(n)
    1091             :         }
    1092           2 :         c.mu.nodes = nil
    1093           2 :         c.mu.handHot = nil
    1094           2 :         c.mu.handCold = nil
    1095           2 :         c.mu.handTest = nil
    1096           2 : 
    1097           2 :         // Only shutdown the releasing goroutine if there were no leaked
    1098           2 :         // iterators. If there were leaked iterators, we leave the goroutine running
    1099           2 :         // and the releasingCh open so that a subsequent iterator close can
    1100           2 :         // complete. This behavior is used by iterator leak tests. Leaking the
    1101           2 :         // goroutine for these tests is less bad not closing the iterator which
    1102           2 :         // triggers other warnings about block cache handles not being released.
    1103           2 :         if err != nil {
    1104           1 :                 c.releasing.Wait()
    1105           1 :                 return err
    1106           1 :         }
    1107             : 
    1108           2 :         close(c.releasingCh)
    1109           2 :         c.releasing.Wait()
    1110           2 :         c.releaseLoopExit.Wait()
    1111           2 :         return err
    1112             : }
    1113             : 
    1114             : type tableCacheValue struct {
    1115             :         closeHook func(i sstable.Iterator) error
    1116             :         reader    *sstable.Reader
    1117             :         err       error
    1118             :         isShared  bool
    1119             :         loaded    chan struct{}
    1120             :         // Reference count for the value. The reader is closed when the reference
    1121             :         // count drops to zero.
    1122             :         refCount atomic.Int32
    1123             : }
    1124             : 
    1125             : // loadInfo contains the information needed to populate a new cache entry.
    1126             : type loadInfo struct {
    1127             :         backingFileNum base.DiskFileNum
    1128             : }
    1129             : 
    1130           2 : func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) {
    1131           2 :         // Try opening the file first.
    1132           2 :         var f objstorage.Readable
    1133           2 :         var err error
    1134           2 :         f, err = dbOpts.objProvider.OpenForReading(
    1135           2 :                 context.TODO(), fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
    1136           2 :         )
    1137           2 :         if err == nil {
    1138           2 :                 cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption)
    1139           2 :                 v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics)
    1140           2 :         }
    1141           2 :         if err == nil {
    1142           2 :                 var objMeta objstorage.ObjectMetadata
    1143           2 :                 objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum)
    1144           2 :                 v.isShared = objMeta.IsShared()
    1145           2 :         }
    1146           2 :         if err != nil {
    1147           1 :                 v.err = errors.Wrapf(
    1148           1 :                         err, "pebble: backing file %s error", loadInfo.backingFileNum)
    1149           1 :         }
    1150           2 :         if v.err != nil {
    1151           1 :                 c.mu.Lock()
    1152           1 :                 defer c.mu.Unlock()
    1153           1 :                 // Lookup the node in the cache again as it might have already been
    1154           1 :                 // removed.
    1155           1 :                 key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
    1156           1 :                 n := c.mu.nodes[key]
    1157           1 :                 if n != nil && n.value == v {
    1158           1 :                         c.releaseNode(n)
    1159           1 :                 }
    1160             :         }
    1161           2 :         close(v.loaded)
    1162             : }
    1163             : 
    1164           2 : func (v *tableCacheValue) release(c *tableCacheShard) {
    1165           2 :         <-v.loaded
    1166           2 :         // Nothing to be done about an error at this point. Close the reader if it is
    1167           2 :         // open.
    1168           2 :         if v.reader != nil {
    1169           2 :                 _ = v.reader.Close()
    1170           2 :         }
    1171           2 :         c.releasing.Done()
    1172             : }
    1173             : 
    1174             : type tableCacheNodeType int8
    1175             : 
    1176             : const (
    1177             :         tableCacheNodeTest tableCacheNodeType = iota
    1178             :         tableCacheNodeCold
    1179             :         tableCacheNodeHot
    1180             : )
    1181             : 
    1182           0 : func (p tableCacheNodeType) String() string {
    1183           0 :         switch p {
    1184           0 :         case tableCacheNodeTest:
    1185           0 :                 return "test"
    1186           0 :         case tableCacheNodeCold:
    1187           0 :                 return "cold"
    1188           0 :         case tableCacheNodeHot:
    1189           0 :                 return "hot"
    1190             :         }
    1191           0 :         return "unknown"
    1192             : }
    1193             : 
    1194             : type tableCacheNode struct {
    1195             :         fileNum base.DiskFileNum
    1196             :         value   *tableCacheValue
    1197             : 
    1198             :         links struct {
    1199             :                 next *tableCacheNode
    1200             :                 prev *tableCacheNode
    1201             :         }
    1202             :         ptype tableCacheNodeType
    1203             :         // referenced is atomically set to indicate that this entry has been accessed
    1204             :         // since the last time one of the clock hands swept it.
    1205             :         referenced atomic.Bool
    1206             : 
    1207             :         // Storing the cache id associated with the DB instance here
    1208             :         // avoids the need to thread the dbOpts struct through many functions.
    1209             :         cacheID uint64
    1210             : }
    1211             : 
    1212           2 : func (n *tableCacheNode) next() *tableCacheNode {
    1213           2 :         if n == nil {
    1214           0 :                 return nil
    1215           0 :         }
    1216           2 :         return n.links.next
    1217             : }
    1218             : 
    1219           2 : func (n *tableCacheNode) prev() *tableCacheNode {
    1220           2 :         if n == nil {
    1221           0 :                 return nil
    1222           0 :         }
    1223           2 :         return n.links.prev
    1224             : }
    1225             : 
    1226           2 : func (n *tableCacheNode) link(s *tableCacheNode) {
    1227           2 :         s.links.prev = n.links.prev
    1228           2 :         s.links.prev.links.next = s
    1229           2 :         s.links.next = n
    1230           2 :         s.links.next.links.prev = s
    1231           2 : }
    1232             : 
    1233           2 : func (n *tableCacheNode) unlink() *tableCacheNode {
    1234           2 :         next := n.links.next
    1235           2 :         n.links.prev.links.next = n.links.next
    1236           2 :         n.links.next.links.prev = n.links.prev
    1237           2 :         n.links.prev = n
    1238           2 :         n.links.next = n
    1239           2 :         return next
    1240           2 : }
    1241             : 
    1242             : // iterSet holds a set of iterators of various key kinds, all constructed over
    1243             : // the same data structure (eg, an sstable). A subset of the fields may be
    1244             : // populated depending on the `iterKinds` passed to newIters.
    1245             : type iterSet struct {
    1246             :         point         internalIterator
    1247             :         rangeDeletion keyspan.FragmentIterator
    1248             :         rangeKey      keyspan.FragmentIterator
    1249             : }
    1250             : 
    1251             : // TODO(jackson): Consider adding methods for fast paths that check whether an
    1252             : // iterator of a particular kind is nil, so that these call sites don't need to
    1253             : // reach into the struct's fields directly.
    1254             : 
    1255             : // Point returns the contained point iterator. If there is no point iterator,
    1256             : // Point returns a non-nil empty point iterator.
    1257           2 : func (s *iterSet) Point() internalIterator {
    1258           2 :         if s.point == nil {
    1259           2 :                 return emptyIter
    1260           2 :         }
    1261           2 :         return s.point
    1262             : }
    1263             : 
    1264             : // RangeDeletion returns the contained range deletion iterator. If there is no
    1265             : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
    1266             : // iterator.
    1267           2 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
    1268           2 :         if s.rangeDeletion == nil {
    1269           2 :                 return emptyKeyspanIter
    1270           2 :         }
    1271           2 :         return s.rangeDeletion
    1272             : }
    1273             : 
    1274             : // RangeKey returns the contained range key iterator. If there is no range key
    1275             : // iterator, RangeKey returns a non-nil empty keyspan iterator.
    1276           2 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
    1277           2 :         if s.rangeKey == nil {
    1278           2 :                 return emptyKeyspanIter
    1279           2 :         }
    1280           2 :         return s.rangeKey
    1281             : }
    1282             : 
    1283             : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
    1284             : // must be not be called on the constituent iterators.
    1285           1 : func (s *iterSet) CloseAll() error {
    1286           1 :         var err error
    1287           1 :         if s.point != nil {
    1288           1 :                 err = s.point.Close()
    1289           1 :         }
    1290           1 :         if s.rangeDeletion != nil {
    1291           1 :                 err = firstError(err, s.rangeDeletion.Close())
    1292           1 :         }
    1293           1 :         if s.rangeKey != nil {
    1294           0 :                 err = firstError(err, s.rangeKey.Close())
    1295           0 :         }
    1296           1 :         return err
    1297             : }
    1298             : 
    1299             : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
    1300             : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
    1301             : // represent a set of desired iterator kinds.
    1302             : type iterKinds uint8
    1303             : 
    1304           2 : func (t iterKinds) Point() bool         { return (t & iterPointKeys) != 0 }
    1305           2 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
    1306           2 : func (t iterKinds) RangeKey() bool      { return (t & iterRangeKeys) != 0 }
    1307             : 
    1308             : const (
    1309             :         iterPointKeys iterKinds = 1 << iota
    1310             :         iterRangeDeletions
    1311             :         iterRangeKeys
    1312             : )

Generated by: LCOV version 1.14