LCOV - code coverage report
Current view: top level - pebble - table_cache.go (source / functions) Hit Total Coverage
Test: 2024-11-16 08:16Z 9ed54bc4 - tests only.lcov Lines: 727 803 90.5 %
Date: 2024-11-16 08:17:17 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "fmt"
      11             :         "io"
      12             :         "runtime/debug"
      13             :         "runtime/pprof"
      14             :         "sync"
      15             :         "sync/atomic"
      16             :         "unsafe"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble/internal/base"
      20             :         "github.com/cockroachdb/pebble/internal/cache"
      21             :         "github.com/cockroachdb/pebble/internal/invariants"
      22             :         "github.com/cockroachdb/pebble/internal/keyspan"
      23             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      24             :         "github.com/cockroachdb/pebble/internal/manifest"
      25             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      26             :         "github.com/cockroachdb/pebble/objstorage"
      27             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      28             :         "github.com/cockroachdb/pebble/sstable"
      29             : )
      30             : 
      31             : var emptyIter = &errorIter{err: nil}
      32             : var emptyKeyspanIter = &errorKeyspanIter{err: nil}
      33             : 
      34             : // tableNewIters creates new iterators (point, range deletion and/or range key)
      35             : // for the given file metadata. Which of the various iterator kinds the user is
      36             : // requesting is specified with the iterKinds bitmap.
      37             : //
      38             : // On success, the requested subset of iters.{point,rangeDel,rangeKey} are
      39             : // populated with iterators.
      40             : //
      41             : // If a point iterator is requested and the operation was successful,
      42             : // iters.point is guaranteed to be non-nil and must be closed when the caller is
      43             : // finished.
      44             : //
      45             : // If a range deletion or range key iterator is requested, the corresponding
      46             : // iterator may be nil if the table does not contain any keys of the
      47             : // corresponding kind. The returned iterSet type provides RangeDeletion() and
      48             : // RangeKey() convenience methods that return non-nil empty iterators that may
      49             : // be used if the caller requires a non-nil iterator.
      50             : //
      51             : // On error, all iterators are nil.
      52             : //
      53             : // The only (non-test) implementation of tableNewIters is
      54             : // tableCacheContainer.newIters().
      55             : type tableNewIters func(
      56             :         ctx context.Context,
      57             :         file *manifest.FileMetadata,
      58             :         opts *IterOptions,
      59             :         internalOpts internalIterOpts,
      60             :         kinds iterKinds,
      61             : ) (iterSet, error)
      62             : 
      63             : // tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
      64             : // for the rangedel iterator returned by tableNewIters.
      65           1 : func tableNewRangeDelIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
      66           1 :         return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      67           1 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
      68           1 :                 if err != nil {
      69           0 :                         return nil, err
      70           0 :                 }
      71           1 :                 return iters.RangeDeletion(), nil
      72             :         }
      73             : }
      74             : 
      75             : // tableNewRangeKeyIter takes a tableNewIters and returns a TableNewSpanIter
      76             : // for the range key iterator returned by tableNewIters.
      77           1 : func tableNewRangeKeyIter(newIters tableNewIters) keyspanimpl.TableNewSpanIter {
      78           1 :         return func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
      79           1 :                 iters, err := newIters(ctx, file, nil, internalIterOpts{}, iterRangeKeys)
      80           1 :                 if err != nil {
      81           1 :                         return nil, err
      82           1 :                 }
      83           1 :                 return iters.RangeKey(), nil
      84             :         }
      85             : }
      86             : 
      87             : var tableCacheLabels = pprof.Labels("pebble", "table-cache")
      88             : 
      89             : // tableCacheOpts contains the db specific fields
      90             : // of a table cache. This is stored in the tableCacheContainer
      91             : // along with the table cache.
      92             : // NB: It is important to make sure that the fields in this
      93             : // struct are read-only. Since the fields here are shared
      94             : // by every single tableCacheShard, if non read-only fields
      95             : // are updated, we could have unnecessary evictions of those
      96             : // fields, and the surrounding fields from the CPU caches.
      97             : type tableCacheOpts struct {
      98             :         // iterCount keeps track of how many iterators are open. It is used to keep
      99             :         // track of leaked iterators on a per-db level.
     100             :         iterCount *atomic.Int32
     101             : 
     102             :         loggerAndTracer   LoggerAndTracer
     103             :         cache             *cache.Cache
     104             :         cacheID           cache.ID
     105             :         objProvider       objstorage.Provider
     106             :         readerOpts        sstable.ReaderOptions
     107             :         sstStatsCollector *sstable.CategoryStatsCollector
     108             : }
     109             : 
     110             : // tableCacheContainer contains the table cache and
     111             : // fields which are unique to the DB.
     112             : type tableCacheContainer struct {
     113             :         tableCache *TableCache
     114             : 
     115             :         // dbOpts contains fields relevant to the table cache
     116             :         // which are unique to each DB.
     117             :         dbOpts tableCacheOpts
     118             : }
     119             : 
     120             : // newTableCacheContainer will panic if the underlying cache in the table cache
     121             : // doesn't match Options.Cache.
     122             : func newTableCacheContainer(
     123             :         tc *TableCache,
     124             :         cacheID cache.ID,
     125             :         objProvider objstorage.Provider,
     126             :         opts *Options,
     127             :         size int,
     128             :         sstStatsCollector *sstable.CategoryStatsCollector,
     129           1 : ) *tableCacheContainer {
     130           1 :         // We will release a ref to table cache acquired here when tableCacheContainer.close is called.
     131           1 :         if tc != nil {
     132           1 :                 if tc.cache != opts.Cache {
     133           0 :                         panic("pebble: underlying cache for the table cache and db are different")
     134             :                 }
     135           1 :                 tc.Ref()
     136           1 :         } else {
     137           1 :                 // NewTableCache should create a ref to tc which the container should
     138           1 :                 // drop whenever it is closed.
     139           1 :                 tc = NewTableCache(opts.Cache, opts.Experimental.TableCacheShards, size)
     140           1 :         }
     141             : 
     142           1 :         t := &tableCacheContainer{}
     143           1 :         t.tableCache = tc
     144           1 :         t.dbOpts.loggerAndTracer = opts.LoggerAndTracer
     145           1 :         t.dbOpts.cache = opts.Cache
     146           1 :         t.dbOpts.cacheID = cacheID
     147           1 :         t.dbOpts.objProvider = objProvider
     148           1 :         t.dbOpts.readerOpts = opts.MakeReaderOptions()
     149           1 :         t.dbOpts.readerOpts.FilterMetricsTracker = &sstable.FilterMetricsTracker{}
     150           1 :         t.dbOpts.iterCount = new(atomic.Int32)
     151           1 :         t.dbOpts.sstStatsCollector = sstStatsCollector
     152           1 :         return t
     153             : }
     154             : 
     155             : // Before calling close, make sure that there will be no further need
     156             : // to access any of the files associated with the store.
     157           1 : func (c *tableCacheContainer) close() error {
     158           1 :         // We want to do some cleanup work here. Check for leaked iterators
     159           1 :         // by the DB using this container. Note that we'll still perform cleanup
     160           1 :         // below in the case that there are leaked iterators.
     161           1 :         var err error
     162           1 :         if v := c.dbOpts.iterCount.Load(); v > 0 {
     163           1 :                 err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
     164           1 :         }
     165             : 
     166             :         // Release nodes here.
     167           1 :         for _, shard := range c.tableCache.shards {
     168           1 :                 if shard != nil {
     169           1 :                         shard.removeDB(&c.dbOpts)
     170           1 :                 }
     171             :         }
     172           1 :         return firstError(err, c.tableCache.Unref())
     173             : }
     174             : 
     175             : func (c *tableCacheContainer) newIters(
     176             :         ctx context.Context,
     177             :         file *manifest.FileMetadata,
     178             :         opts *IterOptions,
     179             :         internalOpts internalIterOpts,
     180             :         kinds iterKinds,
     181           1 : ) (iterSet, error) {
     182           1 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).newIters(ctx, file, opts, internalOpts, &c.dbOpts, kinds)
     183           1 : }
     184             : 
     185             : // getTableProperties returns the properties associated with the backing physical
     186             : // table if the input metadata belongs to a virtual sstable.
     187           1 : func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) {
     188           1 :         return c.tableCache.getShard(file.FileBacking.DiskFileNum).getTableProperties(file, &c.dbOpts)
     189           1 : }
     190             : 
     191           1 : func (c *tableCacheContainer) evict(fileNum base.DiskFileNum) {
     192           1 :         c.tableCache.getShard(fileNum).evict(fileNum, &c.dbOpts, false)
     193           1 : }
     194             : 
     195           1 : func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) {
     196           1 :         var m CacheMetrics
     197           1 :         for i := range c.tableCache.shards {
     198           1 :                 s := c.tableCache.shards[i]
     199           1 :                 s.mu.RLock()
     200           1 :                 m.Count += int64(len(s.mu.nodes))
     201           1 :                 s.mu.RUnlock()
     202           1 :                 m.Hits += s.hits.Load()
     203           1 :                 m.Misses += s.misses.Load()
     204           1 :         }
     205           1 :         m.Size = m.Count * int64(unsafe.Sizeof(tableCacheNode{})+unsafe.Sizeof(tableCacheValue{})+unsafe.Sizeof(sstable.Reader{}))
     206           1 :         f := c.dbOpts.readerOpts.FilterMetricsTracker.Load()
     207           1 :         return m, f
     208             : }
     209             : 
     210             : func (c *tableCacheContainer) estimateSize(
     211             :         meta *fileMetadata, lower, upper []byte,
     212           1 : ) (size uint64, err error) {
     213           1 :         c.withCommonReader(meta, func(cr sstable.CommonReader) error {
     214           1 :                 size, err = cr.EstimateDiskUsage(lower, upper)
     215           1 :                 return err
     216           1 :         })
     217           1 :         return size, err
     218             : }
     219             : 
     220             : // createCommonReader creates a Reader for this file.
     221           1 : func createCommonReader(v *tableCacheValue, file *fileMetadata) sstable.CommonReader {
     222           1 :         // TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
     223           1 :         var cr sstable.CommonReader = v.reader
     224           1 :         if file.Virtual {
     225           1 :                 virtualReader := sstable.MakeVirtualReader(
     226           1 :                         v.reader, file.VirtualMeta().VirtualReaderParams(v.isShared),
     227           1 :                 )
     228           1 :                 cr = &virtualReader
     229           1 :         }
     230           1 :         return cr
     231             : }
     232             : 
     233             : func (c *tableCacheContainer) withCommonReader(
     234             :         meta *fileMetadata, fn func(sstable.CommonReader) error,
     235           1 : ) error {
     236           1 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     237           1 :         v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
     238           1 :         defer s.unrefValue(v)
     239           1 :         if v.err != nil {
     240           0 :                 return v.err
     241           0 :         }
     242           1 :         return fn(createCommonReader(v, meta))
     243             : }
     244             : 
     245           1 : func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
     246           1 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     247           1 :         v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
     248           1 :         defer s.unrefValue(v)
     249           1 :         if v.err != nil {
     250           1 :                 return v.err
     251           1 :         }
     252           1 :         return fn(v.reader)
     253             : }
     254             : 
     255             : // withVirtualReader fetches a VirtualReader associated with a virtual sstable.
     256             : func (c *tableCacheContainer) withVirtualReader(
     257             :         meta virtualMeta, fn func(sstable.VirtualReader) error,
     258           1 : ) error {
     259           1 :         s := c.tableCache.getShard(meta.FileBacking.DiskFileNum)
     260           1 :         v := s.findNode(context.TODO(), meta.FileBacking, &c.dbOpts)
     261           1 :         defer s.unrefValue(v)
     262           1 :         if v.err != nil {
     263           0 :                 return v.err
     264           0 :         }
     265           1 :         provider := c.dbOpts.objProvider
     266           1 :         objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
     267           1 :         if err != nil {
     268           0 :                 return err
     269           0 :         }
     270           1 :         return fn(sstable.MakeVirtualReader(v.reader, meta.VirtualReaderParams(objMeta.IsShared())))
     271             : }
     272             : 
     273           1 : func (c *tableCacheContainer) iterCount() int64 {
     274           1 :         return int64(c.dbOpts.iterCount.Load())
     275           1 : }
     276             : 
     277             : // TableCache is a shareable cache for open sstables.
     278             : type TableCache struct {
     279             :         refs atomic.Int64
     280             : 
     281             :         cache  *Cache
     282             :         shards []*tableCacheShard
     283             : }
     284             : 
     285             : // Ref adds a reference to the table cache. Once tableCache.init returns,
     286             : // the table cache only remains valid if there is at least one reference
     287             : // to it.
     288           1 : func (c *TableCache) Ref() {
     289           1 :         v := c.refs.Add(1)
     290           1 :         // We don't want the reference count to ever go from 0 -> 1,
     291           1 :         // cause a reference count of 0 implies that we've closed the cache.
     292           1 :         if v <= 1 {
     293           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     294             :         }
     295             : }
     296             : 
     297             : // Unref removes a reference to the table cache.
     298           1 : func (c *TableCache) Unref() error {
     299           1 :         v := c.refs.Add(-1)
     300           1 :         switch {
     301           1 :         case v < 0:
     302           1 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     303           1 :         case v == 0:
     304           1 :                 var err error
     305           1 :                 for i := range c.shards {
     306           1 :                         // The cache shard is not allocated yet, nothing to close
     307           1 :                         if c.shards[i] == nil {
     308           0 :                                 continue
     309             :                         }
     310           1 :                         err = firstError(err, c.shards[i].Close())
     311             :                 }
     312             : 
     313             :                 // Unref the cache which we create a reference to when the tableCache
     314             :                 // is first instantiated.
     315           1 :                 c.cache.Unref()
     316           1 :                 return err
     317             :         }
     318           1 :         return nil
     319             : }
     320             : 
     321             : // NewTableCache will create a reference to the table cache. It is the callers responsibility
     322             : // to call tableCache.Unref if they will no longer hold a reference to the table cache.
     323           1 : func NewTableCache(cache *Cache, numShards int, size int) *TableCache {
     324           1 :         if size == 0 {
     325           0 :                 panic("pebble: cannot create a table cache of size 0")
     326           1 :         } else if numShards == 0 {
     327           0 :                 panic("pebble: cannot create a table cache with 0 shards")
     328             :         }
     329             : 
     330           1 :         c := &TableCache{}
     331           1 :         c.cache = cache
     332           1 :         c.cache.Ref()
     333           1 : 
     334           1 :         c.shards = make([]*tableCacheShard, numShards)
     335           1 :         for i := range c.shards {
     336           1 :                 c.shards[i] = &tableCacheShard{}
     337           1 :                 c.shards[i].init(size / len(c.shards))
     338           1 :         }
     339             : 
     340             :         // Hold a ref to the cache here.
     341           1 :         c.refs.Store(1)
     342           1 : 
     343           1 :         return c
     344             : }
     345             : 
     346           1 : func (c *TableCache) getShard(fileNum base.DiskFileNum) *tableCacheShard {
     347           1 :         return c.shards[uint64(fileNum)%uint64(len(c.shards))]
     348           1 : }
     349             : 
     350             : type tableCacheKey struct {
     351             :         cacheID cache.ID
     352             :         fileNum base.DiskFileNum
     353             : }
     354             : 
     355             : type tableCacheShard struct {
     356             :         hits      atomic.Int64
     357             :         misses    atomic.Int64
     358             :         iterCount atomic.Int32
     359             : 
     360             :         size int
     361             : 
     362             :         mu struct {
     363             :                 sync.RWMutex
     364             :                 nodes map[tableCacheKey]*tableCacheNode
     365             :                 // The iters map is only created and populated in race builds.
     366             :                 iters map[io.Closer][]byte
     367             : 
     368             :                 handHot  *tableCacheNode
     369             :                 handCold *tableCacheNode
     370             :                 handTest *tableCacheNode
     371             : 
     372             :                 coldTarget int
     373             :                 sizeHot    int
     374             :                 sizeCold   int
     375             :                 sizeTest   int
     376             :         }
     377             :         releasing       sync.WaitGroup
     378             :         releasingCh     chan *tableCacheValue
     379             :         releaseLoopExit sync.WaitGroup
     380             : }
     381             : 
     382           1 : func (c *tableCacheShard) init(size int) {
     383           1 :         c.size = size
     384           1 : 
     385           1 :         c.mu.nodes = make(map[tableCacheKey]*tableCacheNode)
     386           1 :         c.mu.coldTarget = size
     387           1 :         c.releasingCh = make(chan *tableCacheValue, 100)
     388           1 :         c.releaseLoopExit.Add(1)
     389           1 :         go c.releaseLoop()
     390           1 : 
     391           1 :         if invariants.RaceEnabled {
     392           0 :                 c.mu.iters = make(map[io.Closer][]byte)
     393           0 :         }
     394             : }
     395             : 
     396           1 : func (c *tableCacheShard) releaseLoop() {
     397           1 :         pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
     398           1 :                 defer c.releaseLoopExit.Done()
     399           1 :                 for v := range c.releasingCh {
     400           1 :                         v.release(c)
     401           1 :                 }
     402             :         })
     403             : }
     404             : 
     405             : // checkAndIntersectFilters checks the specific table and block property filters
     406             : // for intersection with any available table and block-level properties. Returns
     407             : // true for ok if this table should be read by this iterator.
     408             : func (c *tableCacheShard) checkAndIntersectFilters(
     409             :         v *tableCacheValue,
     410             :         blockPropertyFilters []BlockPropertyFilter,
     411             :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter,
     412             :         syntheticSuffix sstable.SyntheticSuffix,
     413           1 : ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) {
     414           1 :         if boundLimitedFilter != nil || len(blockPropertyFilters) > 0 {
     415           1 :                 filterer, err = sstable.IntersectsTable(
     416           1 :                         blockPropertyFilters,
     417           1 :                         boundLimitedFilter,
     418           1 :                         v.reader.Properties.UserProperties,
     419           1 :                         syntheticSuffix,
     420           1 :                 )
     421           1 :                 // NB: IntersectsTable will return a nil filterer if the table-level
     422           1 :                 // properties indicate there's no intersection with the provided filters.
     423           1 :                 if filterer == nil || err != nil {
     424           1 :                         return false, nil, err
     425           1 :                 }
     426             :         }
     427           1 :         return true, filterer, nil
     428             : }
     429             : 
     430             : func (c *tableCacheShard) newIters(
     431             :         ctx context.Context,
     432             :         file *manifest.FileMetadata,
     433             :         opts *IterOptions,
     434             :         internalOpts internalIterOpts,
     435             :         dbOpts *tableCacheOpts,
     436             :         kinds iterKinds,
     437           1 : ) (iterSet, error) {
     438           1 :         // TODO(sumeer): constructing the Reader should also use a plumbed context,
     439           1 :         // since parts of the sstable are read during the construction. The Reader
     440           1 :         // should not remember that context since the Reader can be long-lived.
     441           1 : 
     442           1 :         // Calling findNode gives us the responsibility of decrementing v's
     443           1 :         // refCount. If opening the underlying table resulted in error, then we
     444           1 :         // decrement this straight away. Otherwise, we pass that responsibility to
     445           1 :         // the sstable iterator, which decrements when it is closed.
     446           1 :         v := c.findNode(ctx, file.FileBacking, dbOpts)
     447           1 :         if v.err != nil {
     448           1 :                 defer c.unrefValue(v)
     449           1 :                 return iterSet{}, v.err
     450           1 :         }
     451             : 
     452             :         // Note: This suffers an allocation for virtual sstables.
     453           1 :         cr := createCommonReader(v, file)
     454           1 :         var iters iterSet
     455           1 :         var err error
     456           1 :         if kinds.RangeKey() && file.HasRangeKeys {
     457           1 :                 iters.rangeKey, err = c.newRangeKeyIter(ctx, v, file, cr, opts.SpanIterOptions())
     458           1 :         }
     459           1 :         if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
     460           1 :                 iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
     461           1 :         }
     462           1 :         if kinds.Point() && err == nil {
     463           1 :                 iters.point, err = c.newPointIter(ctx, v, file, cr, opts, internalOpts, dbOpts)
     464           1 :         }
     465           1 :         if err != nil {
     466           1 :                 // NB: There's a subtlety here: Because the point iterator is the last
     467           1 :                 // iterator we attempt to create, it's not possible for:
     468           1 :                 //   err != nil && iters.point != nil
     469           1 :                 // If it were possible, we'd need to account for it to avoid double
     470           1 :                 // unref-ing here, once during CloseAll and once during `unrefValue`.
     471           1 :                 iters.CloseAll()
     472           1 :                 c.unrefValue(v)
     473           1 :                 return iterSet{}, err
     474           1 :         }
     475             :         // Only point iterators ever require the reader stay pinned in the cache. If
     476             :         // we're not returning a point iterator to the caller, we need to unref v.
     477           1 :         if iters.point == nil {
     478           1 :                 c.unrefValue(v)
     479           1 :         }
     480           1 :         return iters, nil
     481             : }
     482             : 
     483             : // For flushable ingests, we decide whether to use the bloom filter base on
     484             : // size.
     485             : const filterBlockSizeLimitForFlushableIngests = 64 * 1024
     486             : 
     487             : // newPointIter is an internal helper that constructs a point iterator over a
     488             : // sstable. This function is for internal use only, and callers should use
     489             : // newIters instead.
     490             : func (c *tableCacheShard) newPointIter(
     491             :         ctx context.Context,
     492             :         v *tableCacheValue,
     493             :         file *manifest.FileMetadata,
     494             :         cr sstable.CommonReader,
     495             :         opts *IterOptions,
     496             :         internalOpts internalIterOpts,
     497             :         dbOpts *tableCacheOpts,
     498           1 : ) (internalIterator, error) {
     499           1 :         var (
     500           1 :                 hideObsoletePoints bool = false
     501           1 :                 pointKeyFilters    []BlockPropertyFilter
     502           1 :                 filterer           *sstable.BlockPropertiesFilterer
     503           1 :         )
     504           1 :         if opts != nil {
     505           1 :                 // This code is appending (at most one filter) in-place to
     506           1 :                 // opts.PointKeyFilters even though the slice is shared for iterators in
     507           1 :                 // the same iterator tree. This is acceptable since all the following
     508           1 :                 // properties are true:
     509           1 :                 // - The iterator tree is single threaded, so the shared backing for the
     510           1 :                 //   slice is being mutated in a single threaded manner.
     511           1 :                 // - Each shallow copy of the slice has its own notion of length.
     512           1 :                 // - The appended element is always the obsoleteKeyBlockPropertyFilter
     513           1 :                 //   struct, which is stateless, so overwriting that struct when creating
     514           1 :                 //   one sstable iterator is harmless to other sstable iterators that are
     515           1 :                 //   relying on that struct.
     516           1 :                 //
     517           1 :                 // An alternative would be to have different slices for different sstable
     518           1 :                 // iterators, but that requires more work to avoid allocations.
     519           1 :                 //
     520           1 :                 // TODO(bilal): for compaction reads of foreign sstables, we do hide
     521           1 :                 // obsolete points (see sstable.Reader.newCompactionIter) but we don't
     522           1 :                 // apply the obsolete block property filter. We could optimize this by
     523           1 :                 // applying the filter.
     524           1 :                 hideObsoletePoints, pointKeyFilters =
     525           1 :                         v.reader.TryAddBlockPropertyFilterForHideObsoletePoints(
     526           1 :                                 opts.snapshotForHideObsoletePoints, file.LargestSeqNum, opts.PointKeyFilters)
     527           1 : 
     528           1 :                 var ok bool
     529           1 :                 var err error
     530           1 :                 ok, filterer, err = c.checkAndIntersectFilters(v, pointKeyFilters,
     531           1 :                         internalOpts.boundLimitedFilter, file.SyntheticPrefixAndSuffix.Suffix())
     532           1 :                 if err != nil {
     533           0 :                         return nil, err
     534           1 :                 } else if !ok {
     535           1 :                         // No point keys within the table match the filters.
     536           1 :                         return nil, nil
     537           1 :                 }
     538             :         }
     539             : 
     540           1 :         var iter sstable.Iterator
     541           1 :         filterBlockSizeLimit := sstable.AlwaysUseFilterBlock
     542           1 :         if opts != nil {
     543           1 :                 // By default, we don't use block filters for L6 and restrict the size for
     544           1 :                 // flushable ingests, as these blocks can be very big.
     545           1 :                 if !opts.UseL6Filters {
     546           1 :                         if opts.layer == manifest.Level(6) {
     547           1 :                                 filterBlockSizeLimit = sstable.NeverUseFilterBlock
     548           1 :                         } else if opts.layer.IsFlushableIngests() {
     549           1 :                                 filterBlockSizeLimit = filterBlockSizeLimitForFlushableIngests
     550           1 :                         }
     551             :                 }
     552           1 :                 if opts.layer.IsSet() && !opts.layer.IsFlushableIngests() {
     553           1 :                         ctx = objiotracing.WithLevel(ctx, opts.layer.Level())
     554           1 :                 }
     555             :         }
     556           1 :         tableFormat, err := v.reader.TableFormat()
     557           1 :         if err != nil {
     558           0 :                 return nil, err
     559           0 :         }
     560             : 
     561           1 :         if v.isShared && file.SyntheticSeqNum() != 0 {
     562           1 :                 if tableFormat < sstable.TableFormatPebblev4 {
     563           0 :                         return nil, errors.New("pebble: shared ingested sstable has a lower table format than expected")
     564           0 :                 }
     565             :                 // The table is shared and ingested.
     566           1 :                 hideObsoletePoints = true
     567             :         }
     568           1 :         transforms := file.IterTransforms()
     569           1 :         transforms.HideObsoletePoints = hideObsoletePoints
     570           1 :         iterStatsAccum := internalOpts.iterStatsAccumulator
     571           1 :         if iterStatsAccum == nil && opts != nil && dbOpts.sstStatsCollector != nil {
     572           1 :                 iterStatsAccum = dbOpts.sstStatsCollector.Accumulator(
     573           1 :                         uint64(uintptr(unsafe.Pointer(v.reader))), opts.Category)
     574           1 :         }
     575           1 :         if internalOpts.compaction {
     576           1 :                 iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, &v.readerProvider, internalOpts.bufferPool)
     577           1 :         } else {
     578           1 :                 iter, err = cr.NewPointIter(
     579           1 :                         ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit,
     580           1 :                         internalOpts.stats, iterStatsAccum, &v.readerProvider)
     581           1 :         }
     582           1 :         if err != nil {
     583           1 :                 return nil, err
     584           1 :         }
     585             :         // NB: v.closeHook takes responsibility for calling unrefValue(v) here. Take
     586             :         // care to avoid introducing an allocation here by adding a closure.
     587           1 :         iter.SetCloseHook(v.closeHook)
     588           1 :         c.iterCount.Add(1)
     589           1 :         dbOpts.iterCount.Add(1)
     590           1 :         if invariants.RaceEnabled {
     591           0 :                 c.mu.Lock()
     592           0 :                 c.mu.iters[iter] = debug.Stack()
     593           0 :                 c.mu.Unlock()
     594           0 :         }
     595           1 :         return iter, nil
     596             : }
     597             : 
     598             : // newRangeDelIter is an internal helper that constructs an iterator over a
     599             : // sstable's range deletions. This function is for table-cache internal use
     600             : // only, and callers should use newIters instead.
     601             : func (c *tableCacheShard) newRangeDelIter(
     602             :         ctx context.Context, file *manifest.FileMetadata, cr sstable.CommonReader, dbOpts *tableCacheOpts,
     603           1 : ) (keyspan.FragmentIterator, error) {
     604           1 :         // NB: range-del iterator does not maintain a reference to the table, nor
     605           1 :         // does it need to read from it after creation.
     606           1 :         rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms())
     607           1 :         if err != nil {
     608           1 :                 return nil, err
     609           1 :         }
     610             :         // Assert expected bounds in tests.
     611           1 :         if invariants.Sometimes(50) && rangeDelIter != nil {
     612           1 :                 cmp := base.DefaultComparer.Compare
     613           1 :                 if dbOpts.readerOpts.Comparer != nil {
     614           1 :                         cmp = dbOpts.readerOpts.Comparer.Compare
     615           1 :                 }
     616           1 :                 rangeDelIter = keyspan.AssertBounds(
     617           1 :                         rangeDelIter, file.SmallestPointKey, file.LargestPointKey.UserKey, cmp,
     618           1 :                 )
     619             :         }
     620           1 :         return rangeDelIter, nil
     621             : }
     622             : 
     623             : // newRangeKeyIter is an internal helper that constructs an iterator over a
     624             : // sstable's range keys. This function is for table-cache internal use only, and
     625             : // callers should use newIters instead.
     626             : func (c *tableCacheShard) newRangeKeyIter(
     627             :         ctx context.Context,
     628             :         v *tableCacheValue,
     629             :         file *fileMetadata,
     630             :         cr sstable.CommonReader,
     631             :         opts keyspan.SpanIterOptions,
     632           1 : ) (keyspan.FragmentIterator, error) {
     633           1 :         transforms := file.FragmentIterTransforms()
     634           1 :         // Don't filter a table's range keys if the file contains RANGEKEYDELs.
     635           1 :         // The RANGEKEYDELs may delete range keys in other levels. Skipping the
     636           1 :         // file's range key blocks may surface deleted range keys below. This is
     637           1 :         // done here, rather than deferring to the block-property collector in order
     638           1 :         // to maintain parity with point keys and the treatment of RANGEDELs.
     639           1 :         if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 {
     640           0 :                 ok, _, err := c.checkAndIntersectFilters(v, opts.RangeKeyFilters, nil, transforms.SyntheticSuffix())
     641           0 :                 if err != nil {
     642           0 :                         return nil, err
     643           0 :                 } else if !ok {
     644           0 :                         return nil, nil
     645           0 :                 }
     646             :         }
     647             :         // TODO(radu): wrap in an AssertBounds.
     648           1 :         return cr.NewRawRangeKeyIter(ctx, transforms)
     649             : }
     650             : 
     651             : // tableCacheShardReaderProvider implements sstable.ReaderProvider for a
     652             : // specific table.
     653             : type tableCacheShardReaderProvider struct {
     654             :         c              *tableCacheShard
     655             :         dbOpts         *tableCacheOpts
     656             :         backingFileNum base.DiskFileNum
     657             : 
     658             :         mu struct {
     659             :                 sync.Mutex
     660             :                 // v is the result of findNode. Whenever it is not null, we hold a refcount
     661             :                 // on the tableCacheValue.
     662             :                 v *tableCacheValue
     663             :                 // refCount is the number of GetReader() calls that have not received a
     664             :                 // corresponding Close().
     665             :                 refCount int
     666             :         }
     667             : }
     668             : 
     669             : var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
     670             : 
     671             : func (rp *tableCacheShardReaderProvider) init(
     672             :         c *tableCacheShard, dbOpts *tableCacheOpts, backingFileNum base.DiskFileNum,
     673           1 : ) {
     674           1 :         rp.c = c
     675           1 :         rp.dbOpts = dbOpts
     676           1 :         rp.backingFileNum = backingFileNum
     677           1 :         rp.mu.v = nil
     678           1 :         rp.mu.refCount = 0
     679           1 : }
     680             : 
     681             : // GetReader implements sstable.ReaderProvider. Note that it is not the
     682             : // responsibility of tableCacheShardReaderProvider to ensure that the file
     683             : // continues to exist. The ReaderProvider is used in iterators where the
     684             : // top-level iterator is pinning the read state and preventing the files from
     685             : // being deleted.
     686             : //
     687             : // The caller must call tableCacheShardReaderProvider.Close.
     688             : //
     689             : // Note that currently the Reader returned here is only used to read value
     690             : // blocks. This reader shouldn't be used for other purposes like reading keys
     691             : // outside of virtual sstable bounds.
     692             : //
     693             : // TODO(bananabrick): We could return a wrapper over the Reader to ensure
     694             : // that the reader isn't used for other purposes.
     695           1 : func (rp *tableCacheShardReaderProvider) GetReader(ctx context.Context) (*sstable.Reader, error) {
     696           1 :         rp.mu.Lock()
     697           1 :         defer rp.mu.Unlock()
     698           1 : 
     699           1 :         if rp.mu.v != nil {
     700           0 :                 rp.mu.refCount++
     701           0 :                 return rp.mu.v.reader, nil
     702           0 :         }
     703             : 
     704             :         // Calling findNodeInternal gives us the responsibility of decrementing v's
     705             :         // refCount. Note that if the table is no longer in the cache,
     706             :         // findNodeInternal will need to do IO to initialize a new Reader. We hold
     707             :         // rp.mu during this time so that concurrent GetReader calls block until the
     708             :         // Reader is created.
     709           1 :         v := rp.c.findNodeInternal(ctx, rp.backingFileNum, rp.dbOpts)
     710           1 :         if v.err != nil {
     711           0 :                 defer rp.c.unrefValue(v)
     712           0 :                 return nil, v.err
     713           0 :         }
     714           1 :         rp.mu.v = v
     715           1 :         rp.mu.refCount = 1
     716           1 :         return v.reader, nil
     717             : }
     718             : 
     719             : // Close implements sstable.ReaderProvider.
     720           1 : func (rp *tableCacheShardReaderProvider) Close() {
     721           1 :         rp.mu.Lock()
     722           1 :         defer rp.mu.Unlock()
     723           1 :         rp.mu.refCount--
     724           1 :         if rp.mu.refCount <= 0 {
     725           1 :                 if rp.mu.refCount < 0 {
     726           0 :                         panic("pebble: sstable.ReaderProvider misuse")
     727             :                 }
     728           1 :                 rp.c.unrefValue(rp.mu.v)
     729           1 :                 rp.mu.v = nil
     730             :         }
     731             : }
     732             : 
     733             : // getTableProperties return sst table properties for target file
     734             : func (c *tableCacheShard) getTableProperties(
     735             :         file *fileMetadata, dbOpts *tableCacheOpts,
     736           1 : ) (*sstable.Properties, error) {
     737           1 :         // Calling findNode gives us the responsibility of decrementing v's refCount here
     738           1 :         v := c.findNode(context.TODO(), file.FileBacking, dbOpts)
     739           1 :         defer c.unrefValue(v)
     740           1 : 
     741           1 :         if v.err != nil {
     742           0 :                 return nil, v.err
     743           0 :         }
     744           1 :         return &v.reader.Properties, nil
     745             : }
     746             : 
     747             : // releaseNode releases a node from the tableCacheShard.
     748             : //
     749             : // c.mu must be held when calling this.
     750           1 : func (c *tableCacheShard) releaseNode(n *tableCacheNode) {
     751           1 :         c.unlinkNode(n)
     752           1 :         c.clearNode(n)
     753           1 : }
     754             : 
     755             : // unlinkNode removes a node from the tableCacheShard, leaving the shard
     756             : // reference in place.
     757             : //
     758             : // c.mu must be held when calling this.
     759           1 : func (c *tableCacheShard) unlinkNode(n *tableCacheNode) {
     760           1 :         key := tableCacheKey{n.cacheID, n.fileNum}
     761           1 :         delete(c.mu.nodes, key)
     762           1 : 
     763           1 :         switch n.ptype {
     764           1 :         case tableCacheNodeHot:
     765           1 :                 c.mu.sizeHot--
     766           1 :         case tableCacheNodeCold:
     767           1 :                 c.mu.sizeCold--
     768           1 :         case tableCacheNodeTest:
     769           1 :                 c.mu.sizeTest--
     770             :         }
     771             : 
     772           1 :         if n == c.mu.handHot {
     773           1 :                 c.mu.handHot = c.mu.handHot.prev()
     774           1 :         }
     775           1 :         if n == c.mu.handCold {
     776           1 :                 c.mu.handCold = c.mu.handCold.prev()
     777           1 :         }
     778           1 :         if n == c.mu.handTest {
     779           1 :                 c.mu.handTest = c.mu.handTest.prev()
     780           1 :         }
     781             : 
     782           1 :         if n.unlink() == n {
     783           1 :                 // This was the last entry in the cache.
     784           1 :                 c.mu.handHot = nil
     785           1 :                 c.mu.handCold = nil
     786           1 :                 c.mu.handTest = nil
     787           1 :         }
     788             : 
     789           1 :         n.links.prev = nil
     790           1 :         n.links.next = nil
     791             : }
     792             : 
     793           1 : func (c *tableCacheShard) clearNode(n *tableCacheNode) {
     794           1 :         if v := n.value; v != nil {
     795           1 :                 n.value = nil
     796           1 :                 c.unrefValue(v)
     797           1 :         }
     798             : }
     799             : 
     800             : // unrefValue decrements the reference count for the specified value, releasing
     801             : // it if the reference count fell to 0. Note that the value has a reference if
     802             : // it is present in tableCacheShard.mu.nodes, so a reference count of 0 means
     803             : // the node has already been removed from that map.
     804           1 : func (c *tableCacheShard) unrefValue(v *tableCacheValue) {
     805           1 :         if v.refCount.Add(-1) == 0 {
     806           1 :                 c.releasing.Add(1)
     807           1 :                 c.releasingCh <- v
     808           1 :         }
     809             : }
     810             : 
     811             : // findNode returns the node for the table with the given file number, creating
     812             : // that node if it didn't already exist. The caller is responsible for
     813             : // decrementing the returned node's refCount.
     814             : func (c *tableCacheShard) findNode(
     815             :         ctx context.Context, b *fileBacking, dbOpts *tableCacheOpts,
     816           1 : ) *tableCacheValue {
     817           1 :         // The backing must have a positive refcount (otherwise it could be deleted at any time).
     818           1 :         b.MustHaveRefs()
     819           1 :         // Caution! Here fileMetadata can be a physical or virtual table. Table cache
     820           1 :         // readers are associated with the physical backings. All virtual tables with
     821           1 :         // the same backing will use the same reader from the cache; so no information
     822           1 :         // that can differ among these virtual tables can be passed to findNodeInternal.
     823           1 :         backingFileNum := b.DiskFileNum
     824           1 : 
     825           1 :         return c.findNodeInternal(ctx, backingFileNum, dbOpts)
     826           1 : }
     827             : 
     828             : func (c *tableCacheShard) findNodeInternal(
     829             :         ctx context.Context, backingFileNum base.DiskFileNum, dbOpts *tableCacheOpts,
     830           1 : ) *tableCacheValue {
     831           1 :         // Fast-path for a hit in the cache.
     832           1 :         c.mu.RLock()
     833           1 :         key := tableCacheKey{dbOpts.cacheID, backingFileNum}
     834           1 :         if n := c.mu.nodes[key]; n != nil && n.value != nil {
     835           1 :                 // Fast-path hit.
     836           1 :                 //
     837           1 :                 // The caller is responsible for decrementing the refCount.
     838           1 :                 v := n.value
     839           1 :                 v.refCount.Add(1)
     840           1 :                 c.mu.RUnlock()
     841           1 :                 n.referenced.Store(true)
     842           1 :                 c.hits.Add(1)
     843           1 :                 <-v.loaded
     844           1 :                 return v
     845           1 :         }
     846           1 :         c.mu.RUnlock()
     847           1 : 
     848           1 :         c.mu.Lock()
     849           1 : 
     850           1 :         n := c.mu.nodes[key]
     851           1 :         switch {
     852           1 :         case n == nil:
     853           1 :                 // Slow-path miss of a non-existent node.
     854           1 :                 n = &tableCacheNode{
     855           1 :                         fileNum: backingFileNum,
     856           1 :                         ptype:   tableCacheNodeCold,
     857           1 :                 }
     858           1 :                 c.addNode(n, dbOpts)
     859           1 :                 c.mu.sizeCold++
     860             : 
     861           1 :         case n.value != nil:
     862           1 :                 // Slow-path hit of a hot or cold node.
     863           1 :                 //
     864           1 :                 // The caller is responsible for decrementing the refCount.
     865           1 :                 v := n.value
     866           1 :                 v.refCount.Add(1)
     867           1 :                 n.referenced.Store(true)
     868           1 :                 c.hits.Add(1)
     869           1 :                 c.mu.Unlock()
     870           1 :                 <-v.loaded
     871           1 :                 return v
     872             : 
     873           1 :         default:
     874           1 :                 // Slow-path miss of a test node.
     875           1 :                 c.unlinkNode(n)
     876           1 :                 c.mu.coldTarget++
     877           1 :                 if c.mu.coldTarget > c.size {
     878           1 :                         c.mu.coldTarget = c.size
     879           1 :                 }
     880             : 
     881           1 :                 n.referenced.Store(false)
     882           1 :                 n.ptype = tableCacheNodeHot
     883           1 :                 c.addNode(n, dbOpts)
     884           1 :                 c.mu.sizeHot++
     885             :         }
     886             : 
     887           1 :         c.misses.Add(1)
     888           1 : 
     889           1 :         v := &tableCacheValue{
     890           1 :                 loaded: make(chan struct{}),
     891           1 :         }
     892           1 :         v.readerProvider.init(c, dbOpts, backingFileNum)
     893           1 :         v.refCount.Store(2)
     894           1 :         // Cache the closure invoked when an iterator is closed. This avoids an
     895           1 :         // allocation on every call to newIters.
     896           1 :         v.closeHook = func(i sstable.Iterator) error {
     897           1 :                 if invariants.RaceEnabled {
     898           0 :                         c.mu.Lock()
     899           0 :                         delete(c.mu.iters, i)
     900           0 :                         c.mu.Unlock()
     901           0 :                 }
     902           1 :                 c.unrefValue(v)
     903           1 :                 c.iterCount.Add(-1)
     904           1 :                 dbOpts.iterCount.Add(-1)
     905           1 :                 return nil
     906             :         }
     907           1 :         n.value = v
     908           1 : 
     909           1 :         c.mu.Unlock()
     910           1 : 
     911           1 :         // Note adding to the cache lists must complete before we begin loading the
     912           1 :         // table as a failure during load will result in the node being unlinked.
     913           1 :         pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
     914           1 :                 v.load(ctx, backingFileNum, c, dbOpts)
     915           1 :         })
     916           1 :         return v
     917             : }
     918             : 
     919           1 : func (c *tableCacheShard) addNode(n *tableCacheNode, dbOpts *tableCacheOpts) {
     920           1 :         c.evictNodes()
     921           1 :         n.cacheID = dbOpts.cacheID
     922           1 :         key := tableCacheKey{n.cacheID, n.fileNum}
     923           1 :         c.mu.nodes[key] = n
     924           1 : 
     925           1 :         n.links.next = n
     926           1 :         n.links.prev = n
     927           1 :         if c.mu.handHot == nil {
     928           1 :                 // First element.
     929           1 :                 c.mu.handHot = n
     930           1 :                 c.mu.handCold = n
     931           1 :                 c.mu.handTest = n
     932           1 :         } else {
     933           1 :                 c.mu.handHot.link(n)
     934           1 :         }
     935             : 
     936           1 :         if c.mu.handCold == c.mu.handHot {
     937           1 :                 c.mu.handCold = c.mu.handCold.prev()
     938           1 :         }
     939             : }
     940             : 
     941           1 : func (c *tableCacheShard) evictNodes() {
     942           1 :         for c.size <= c.mu.sizeHot+c.mu.sizeCold && c.mu.handCold != nil {
     943           1 :                 c.runHandCold()
     944           1 :         }
     945             : }
     946             : 
     947           1 : func (c *tableCacheShard) runHandCold() {
     948           1 :         n := c.mu.handCold
     949           1 :         if n.ptype == tableCacheNodeCold {
     950           1 :                 if n.referenced.Load() {
     951           1 :                         n.referenced.Store(false)
     952           1 :                         n.ptype = tableCacheNodeHot
     953           1 :                         c.mu.sizeCold--
     954           1 :                         c.mu.sizeHot++
     955           1 :                 } else {
     956           1 :                         c.clearNode(n)
     957           1 :                         n.ptype = tableCacheNodeTest
     958           1 :                         c.mu.sizeCold--
     959           1 :                         c.mu.sizeTest++
     960           1 :                         for c.size < c.mu.sizeTest && c.mu.handTest != nil {
     961           1 :                                 c.runHandTest()
     962           1 :                         }
     963             :                 }
     964             :         }
     965             : 
     966           1 :         c.mu.handCold = c.mu.handCold.next()
     967           1 : 
     968           1 :         for c.size-c.mu.coldTarget <= c.mu.sizeHot && c.mu.handHot != nil {
     969           1 :                 c.runHandHot()
     970           1 :         }
     971             : }
     972             : 
     973           1 : func (c *tableCacheShard) runHandHot() {
     974           1 :         if c.mu.handHot == c.mu.handTest && c.mu.handTest != nil {
     975           1 :                 c.runHandTest()
     976           1 :                 if c.mu.handHot == nil {
     977           0 :                         return
     978           0 :                 }
     979             :         }
     980             : 
     981           1 :         n := c.mu.handHot
     982           1 :         if n.ptype == tableCacheNodeHot {
     983           1 :                 if n.referenced.Load() {
     984           1 :                         n.referenced.Store(false)
     985           1 :                 } else {
     986           1 :                         n.ptype = tableCacheNodeCold
     987           1 :                         c.mu.sizeHot--
     988           1 :                         c.mu.sizeCold++
     989           1 :                 }
     990             :         }
     991             : 
     992           1 :         c.mu.handHot = c.mu.handHot.next()
     993             : }
     994             : 
     995           1 : func (c *tableCacheShard) runHandTest() {
     996           1 :         if c.mu.sizeCold > 0 && c.mu.handTest == c.mu.handCold && c.mu.handCold != nil {
     997           1 :                 c.runHandCold()
     998           1 :                 if c.mu.handTest == nil {
     999           0 :                         return
    1000           0 :                 }
    1001             :         }
    1002             : 
    1003           1 :         n := c.mu.handTest
    1004           1 :         if n.ptype == tableCacheNodeTest {
    1005           1 :                 c.mu.coldTarget--
    1006           1 :                 if c.mu.coldTarget < 0 {
    1007           1 :                         c.mu.coldTarget = 0
    1008           1 :                 }
    1009           1 :                 c.unlinkNode(n)
    1010           1 :                 c.clearNode(n)
    1011             :         }
    1012             : 
    1013           1 :         c.mu.handTest = c.mu.handTest.next()
    1014             : }
    1015             : 
    1016           1 : func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts, allowLeak bool) {
    1017           1 :         c.mu.Lock()
    1018           1 :         key := tableCacheKey{dbOpts.cacheID, fileNum}
    1019           1 :         n := c.mu.nodes[key]
    1020           1 :         var v *tableCacheValue
    1021           1 :         if n != nil {
    1022           1 :                 // NB: This is equivalent to tableCacheShard.releaseNode(), but we perform
    1023           1 :                 // the tableCacheNode.release() call synchronously below to ensure the
    1024           1 :                 // sstable file descriptor is closed before returning. Note that
    1025           1 :                 // tableCacheShard.releasing needs to be incremented while holding
    1026           1 :                 // tableCacheShard.mu in order to avoid a race with Close()
    1027           1 :                 c.unlinkNode(n)
    1028           1 :                 v = n.value
    1029           1 :                 if v != nil {
    1030           1 :                         if !allowLeak {
    1031           1 :                                 if t := v.refCount.Add(-1); t != 0 {
    1032           0 :                                         dbOpts.loggerAndTracer.Fatalf("sstable %s: refcount is not zero: %d\n%s", fileNum, t, debug.Stack())
    1033           0 :                                 }
    1034             :                         }
    1035           1 :                         c.releasing.Add(1)
    1036             :                 }
    1037             :         }
    1038             : 
    1039           1 :         c.mu.Unlock()
    1040           1 : 
    1041           1 :         if v != nil {
    1042           1 :                 v.release(c)
    1043           1 :         }
    1044             : 
    1045           1 :         dbOpts.cache.EvictFile(dbOpts.cacheID, fileNum)
    1046             : }
    1047             : 
    1048             : // removeDB evicts any nodes which have a reference to the DB
    1049             : // associated with dbOpts.cacheID. Make sure that there will
    1050             : // be no more accesses to the files associated with the DB.
    1051           1 : func (c *tableCacheShard) removeDB(dbOpts *tableCacheOpts) {
    1052           1 :         var fileNums []base.DiskFileNum
    1053           1 : 
    1054           1 :         c.mu.RLock()
    1055           1 :         // Collect the fileNums which need to be cleaned.
    1056           1 :         var firstNode *tableCacheNode
    1057           1 :         node := c.mu.handHot
    1058           1 :         for node != firstNode {
    1059           1 :                 if firstNode == nil {
    1060           1 :                         firstNode = node
    1061           1 :                 }
    1062             : 
    1063           1 :                 if node.cacheID == dbOpts.cacheID {
    1064           1 :                         fileNums = append(fileNums, node.fileNum)
    1065           1 :                 }
    1066           1 :                 node = node.next()
    1067             :         }
    1068           1 :         c.mu.RUnlock()
    1069           1 : 
    1070           1 :         // Evict all the nodes associated with the DB.
    1071           1 :         // This should synchronously close all the files
    1072           1 :         // associated with the DB.
    1073           1 :         for _, fileNum := range fileNums {
    1074           1 :                 c.evict(fileNum, dbOpts, true)
    1075           1 :         }
    1076             : }
    1077             : 
    1078           1 : func (c *tableCacheShard) Close() error {
    1079           1 :         c.mu.Lock()
    1080           1 :         defer c.mu.Unlock()
    1081           1 : 
    1082           1 :         // Check for leaked iterators. Note that we'll still perform cleanup below in
    1083           1 :         // the case that there are leaked iterators.
    1084           1 :         var err error
    1085           1 :         if v := c.iterCount.Load(); v > 0 {
    1086           1 :                 if !invariants.RaceEnabled {
    1087           1 :                         err = errors.Errorf("leaked iterators: %d", errors.Safe(v))
    1088           1 :                 } else {
    1089           0 :                         var buf bytes.Buffer
    1090           0 :                         for _, stack := range c.mu.iters {
    1091           0 :                                 fmt.Fprintf(&buf, "%s\n", stack)
    1092           0 :                         }
    1093           0 :                         err = errors.Errorf("leaked iterators: %d\n%s", errors.Safe(v), buf.String())
    1094             :                 }
    1095             :         }
    1096             : 
    1097           1 :         for c.mu.handHot != nil {
    1098           0 :                 n := c.mu.handHot
    1099           0 :                 if n.value != nil {
    1100           0 :                         if n.value.refCount.Add(-1) == 0 {
    1101           0 :                                 c.releasing.Add(1)
    1102           0 :                                 c.releasingCh <- n.value
    1103           0 :                         }
    1104             :                 }
    1105           0 :                 c.unlinkNode(n)
    1106             :         }
    1107           1 :         c.mu.nodes = nil
    1108           1 :         c.mu.handHot = nil
    1109           1 :         c.mu.handCold = nil
    1110           1 :         c.mu.handTest = nil
    1111           1 : 
    1112           1 :         // Only shutdown the releasing goroutine if there were no leaked
    1113           1 :         // iterators. If there were leaked iterators, we leave the goroutine running
    1114           1 :         // and the releasingCh open so that a subsequent iterator close can
    1115           1 :         // complete. This behavior is used by iterator leak tests. Leaking the
    1116           1 :         // goroutine for these tests is less bad not closing the iterator which
    1117           1 :         // triggers other warnings about block cache handles not being released.
    1118           1 :         if err != nil {
    1119           1 :                 c.releasing.Wait()
    1120           1 :                 return err
    1121           1 :         }
    1122             : 
    1123           1 :         close(c.releasingCh)
    1124           1 :         c.releasing.Wait()
    1125           1 :         c.releaseLoopExit.Wait()
    1126           1 :         return err
    1127             : }
    1128             : 
    1129             : type tableCacheValue struct {
    1130             :         closeHook func(i sstable.Iterator) error
    1131             :         reader    *sstable.Reader
    1132             :         err       error
    1133             :         loaded    chan struct{}
    1134             :         // Reference count for the value. The reader is closed when the reference
    1135             :         // count drops to zero.
    1136             :         refCount atomic.Int32
    1137             :         isShared bool
    1138             : 
    1139             :         // readerProvider is embedded here so that we only allocate it once as long as
    1140             :         // the table stays in the cache. Its state is not always logically tied to
    1141             :         // this specific tableCacheValue - if a table goes out of the cache and then
    1142             :         // comes back in, the readerProvider in a now-defunct tableCacheValue can
    1143             :         // still be used and will internally refer to the new tableCacheValue.
    1144             :         readerProvider tableCacheShardReaderProvider
    1145             : }
    1146             : 
    1147             : func (v *tableCacheValue) load(
    1148             :         ctx context.Context, backingFileNum base.DiskFileNum, c *tableCacheShard, dbOpts *tableCacheOpts,
    1149           1 : ) {
    1150           1 :         // Try opening the file first.
    1151           1 :         var f objstorage.Readable
    1152           1 :         var err error
    1153           1 :         f, err = dbOpts.objProvider.OpenForReading(
    1154           1 :                 ctx, fileTypeTable, backingFileNum, objstorage.OpenOptions{MustExist: true},
    1155           1 :         )
    1156           1 :         if err == nil {
    1157           1 :                 o := dbOpts.readerOpts
    1158           1 :                 o.SetInternalCacheOpts(sstableinternal.CacheOptions{
    1159           1 :                         Cache:   dbOpts.cache,
    1160           1 :                         CacheID: dbOpts.cacheID,
    1161           1 :                         FileNum: backingFileNum,
    1162           1 :                 })
    1163           1 :                 v.reader, err = sstable.NewReader(ctx, f, o)
    1164           1 :         }
    1165           1 :         if err == nil {
    1166           1 :                 var objMeta objstorage.ObjectMetadata
    1167           1 :                 objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, backingFileNum)
    1168           1 :                 v.isShared = objMeta.IsShared()
    1169           1 :         }
    1170           1 :         if err != nil {
    1171           1 :                 v.err = errors.Wrapf(
    1172           1 :                         err, "pebble: backing file %s error", backingFileNum)
    1173           1 :         }
    1174           1 :         if v.err != nil {
    1175           1 :                 c.mu.Lock()
    1176           1 :                 defer c.mu.Unlock()
    1177           1 :                 // Lookup the node in the cache again as it might have already been
    1178           1 :                 // removed.
    1179           1 :                 key := tableCacheKey{dbOpts.cacheID, backingFileNum}
    1180           1 :                 n := c.mu.nodes[key]
    1181           1 :                 if n != nil && n.value == v {
    1182           1 :                         c.releaseNode(n)
    1183           1 :                 }
    1184             :         }
    1185           1 :         close(v.loaded)
    1186             : }
    1187             : 
    1188           1 : func (v *tableCacheValue) release(c *tableCacheShard) {
    1189           1 :         <-v.loaded
    1190           1 :         // Nothing to be done about an error at this point. Close the reader if it is
    1191           1 :         // open.
    1192           1 :         if v.reader != nil {
    1193           1 :                 _ = v.reader.Close()
    1194           1 :         }
    1195           1 :         c.releasing.Done()
    1196             : }
    1197             : 
    1198             : type tableCacheNodeType int8
    1199             : 
    1200             : const (
    1201             :         tableCacheNodeTest tableCacheNodeType = iota
    1202             :         tableCacheNodeCold
    1203             :         tableCacheNodeHot
    1204             : )
    1205             : 
    1206           0 : func (p tableCacheNodeType) String() string {
    1207           0 :         switch p {
    1208           0 :         case tableCacheNodeTest:
    1209           0 :                 return "test"
    1210           0 :         case tableCacheNodeCold:
    1211           0 :                 return "cold"
    1212           0 :         case tableCacheNodeHot:
    1213           0 :                 return "hot"
    1214             :         }
    1215           0 :         return "unknown"
    1216             : }
    1217             : 
    1218             : type tableCacheNode struct {
    1219             :         fileNum base.DiskFileNum
    1220             :         value   *tableCacheValue
    1221             : 
    1222             :         links struct {
    1223             :                 next *tableCacheNode
    1224             :                 prev *tableCacheNode
    1225             :         }
    1226             :         ptype tableCacheNodeType
    1227             :         // referenced is atomically set to indicate that this entry has been accessed
    1228             :         // since the last time one of the clock hands swept it.
    1229             :         referenced atomic.Bool
    1230             : 
    1231             :         // Storing the cache id associated with the DB instance here
    1232             :         // avoids the need to thread the dbOpts struct through many functions.
    1233             :         cacheID cache.ID
    1234             : }
    1235             : 
    1236           1 : func (n *tableCacheNode) next() *tableCacheNode {
    1237           1 :         if n == nil {
    1238           0 :                 return nil
    1239           0 :         }
    1240           1 :         return n.links.next
    1241             : }
    1242             : 
    1243           1 : func (n *tableCacheNode) prev() *tableCacheNode {
    1244           1 :         if n == nil {
    1245           0 :                 return nil
    1246           0 :         }
    1247           1 :         return n.links.prev
    1248             : }
    1249             : 
    1250           1 : func (n *tableCacheNode) link(s *tableCacheNode) {
    1251           1 :         s.links.prev = n.links.prev
    1252           1 :         s.links.prev.links.next = s
    1253           1 :         s.links.next = n
    1254           1 :         s.links.next.links.prev = s
    1255           1 : }
    1256             : 
    1257           1 : func (n *tableCacheNode) unlink() *tableCacheNode {
    1258           1 :         next := n.links.next
    1259           1 :         n.links.prev.links.next = n.links.next
    1260           1 :         n.links.next.links.prev = n.links.prev
    1261           1 :         n.links.prev = n
    1262           1 :         n.links.next = n
    1263           1 :         return next
    1264           1 : }
    1265             : 
    1266             : // iterSet holds a set of iterators of various key kinds, all constructed over
    1267             : // the same data structure (eg, an sstable). A subset of the fields may be
    1268             : // populated depending on the `iterKinds` passed to newIters.
    1269             : type iterSet struct {
    1270             :         point         internalIterator
    1271             :         rangeDeletion keyspan.FragmentIterator
    1272             :         rangeKey      keyspan.FragmentIterator
    1273             : }
    1274             : 
    1275             : // TODO(jackson): Consider adding methods for fast paths that check whether an
    1276             : // iterator of a particular kind is nil, so that these call sites don't need to
    1277             : // reach into the struct's fields directly.
    1278             : 
    1279             : // Point returns the contained point iterator. If there is no point iterator,
    1280             : // Point returns a non-nil empty point iterator.
    1281           1 : func (s *iterSet) Point() internalIterator {
    1282           1 :         if s.point == nil {
    1283           1 :                 return emptyIter
    1284           1 :         }
    1285           1 :         return s.point
    1286             : }
    1287             : 
    1288             : // RangeDeletion returns the contained range deletion iterator. If there is no
    1289             : // range deletion iterator, RangeDeletion returns a non-nil empty keyspan
    1290             : // iterator.
    1291           1 : func (s *iterSet) RangeDeletion() keyspan.FragmentIterator {
    1292           1 :         if s.rangeDeletion == nil {
    1293           1 :                 return emptyKeyspanIter
    1294           1 :         }
    1295           1 :         return s.rangeDeletion
    1296             : }
    1297             : 
    1298             : // RangeKey returns the contained range key iterator. If there is no range key
    1299             : // iterator, RangeKey returns a non-nil empty keyspan iterator.
    1300           1 : func (s *iterSet) RangeKey() keyspan.FragmentIterator {
    1301           1 :         if s.rangeKey == nil {
    1302           0 :                 return emptyKeyspanIter
    1303           0 :         }
    1304           1 :         return s.rangeKey
    1305             : }
    1306             : 
    1307             : // CloseAll closes all of the held iterators. If CloseAll is called, then Close
    1308             : // must be not be called on the constituent iterators.
    1309           1 : func (s *iterSet) CloseAll() error {
    1310           1 :         var err error
    1311           1 :         if s.point != nil {
    1312           1 :                 err = s.point.Close()
    1313           1 :                 s.point = nil
    1314           1 :         }
    1315           1 :         if s.rangeDeletion != nil {
    1316           1 :                 s.rangeDeletion.Close()
    1317           1 :                 s.rangeDeletion = nil
    1318           1 :         }
    1319           1 :         if s.rangeKey != nil {
    1320           1 :                 s.rangeKey.Close()
    1321           1 :                 s.rangeKey = nil
    1322           1 :         }
    1323           1 :         return err
    1324             : }
    1325             : 
    1326             : // iterKinds is a bitmap indicating a set of kinds of iterators. Callers may
    1327             : // bitwise-OR iterPointKeys, iterRangeDeletions and/or iterRangeKeys together to
    1328             : // represent a set of desired iterator kinds.
    1329             : type iterKinds uint8
    1330             : 
    1331           1 : func (t iterKinds) Point() bool         { return (t & iterPointKeys) != 0 }
    1332           1 : func (t iterKinds) RangeDeletion() bool { return (t & iterRangeDeletions) != 0 }
    1333           1 : func (t iterKinds) RangeKey() bool      { return (t & iterRangeKeys) != 0 }
    1334             : 
    1335             : const (
    1336             :         iterPointKeys iterKinds = 1 << iota
    1337             :         iterRangeDeletions
    1338             :         iterRangeKeys
    1339             : )

Generated by: LCOV version 1.14