LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider/sharedcache - shared_cache.go (source / functions) Hit Total Coverage
Test: 2024-05-27 08:16Z 542915b2 - tests + meta.lcov Lines: 426 485 87.8 %
Date: 2024-05-27 08:16:58 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sharedcache
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "io"
      11             :         "math/bits"
      12             :         "sync"
      13             :         "sync/atomic"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/objstorage/remote"
      20             :         "github.com/cockroachdb/pebble/vfs"
      21             :         "github.com/prometheus/client_golang/prometheus"
      22             : )
      23             : 
      24             : // Exported to enable exporting from package pebble to enable
      25             : // exporting metrics with below buckets in CRDB.
      26             : var (
      27             :         IOBuckets           = prometheus.ExponentialBucketsRange(float64(time.Millisecond*1), float64(10*time.Second), 50)
      28             :         ChannelWriteBuckets = prometheus.ExponentialBucketsRange(float64(time.Microsecond*1), float64(10*time.Second), 50)
      29             : )
      30             : 
      31             : // Cache is a persistent cache backed by a local filesystem. It is intended
      32             : // to cache data that is in slower shared storage (e.g. S3), hence the
      33             : // package name 'sharedcache'.
      34             : type Cache struct {
      35             :         shards       []shard
      36             :         writeWorkers writeWorkers
      37             : 
      38             :         bm                blockMath
      39             :         shardingBlockSize int64
      40             : 
      41             :         logger  base.Logger
      42             :         metrics internalMetrics
      43             : }
      44             : 
      45             : // Metrics is a struct containing metrics exported by the secondary cache.
      46             : // TODO(josh): Reconsider the set of metrics exported by the secondary cache
      47             : // before we release the secondary cache to users. We choose to export many metrics
      48             : // right now, so we learn a lot from the benchmarking we are doing over the 23.2
      49             : // cycle.
      50             : type Metrics struct {
      51             :         // The number of sstable bytes stored in the cache.
      52             :         Size int64
      53             :         // The count of cache blocks in the cache (not sstable blocks).
      54             :         Count int64
      55             : 
      56             :         // The number of calls to ReadAt.
      57             :         TotalReads int64
      58             :         // The number of calls to ReadAt that require reading data from 2+ shards.
      59             :         MultiShardReads int64
      60             :         // The number of calls to ReadAt that require reading data from 2+ cache blocks.
      61             :         MultiBlockReads int64
      62             :         // The number of calls to ReadAt where all data returned was read from the cache.
      63             :         ReadsWithFullHit int64
      64             :         // The number of calls to ReadAt where some data returned was read from the cache.
      65             :         ReadsWithPartialHit int64
      66             :         // The number of calls to ReadAt where no data returned was read from the cache.
      67             :         ReadsWithNoHit int64
      68             : 
      69             :         // The number of times a cache block was evicted from the cache.
      70             :         Evictions int64
      71             :         // The number of times writing a cache block to the cache failed.
      72             :         WriteBackFailures int64
      73             : 
      74             :         // The latency of calls to get some data from the cache.
      75             :         GetLatency prometheus.Histogram
      76             :         // The latency of reads of a single cache block from disk.
      77             :         DiskReadLatency prometheus.Histogram
      78             :         // The latency of writing data to write back to the cache to a channel.
      79             :         // Generally should be low, but if the channel is full, could be high.
      80             :         QueuePutLatency prometheus.Histogram
      81             :         // The latency of calls to put some data read from block storage into the cache.
      82             :         PutLatency prometheus.Histogram
      83             :         // The latency of writes of a single cache block to disk.
      84             :         DiskWriteLatency prometheus.Histogram
      85             : }
      86             : 
      87             : // See docs at Metrics.
      88             : type internalMetrics struct {
      89             :         count atomic.Int64
      90             : 
      91             :         totalReads          atomic.Int64
      92             :         multiShardReads     atomic.Int64
      93             :         multiBlockReads     atomic.Int64
      94             :         readsWithFullHit    atomic.Int64
      95             :         readsWithPartialHit atomic.Int64
      96             :         readsWithNoHit      atomic.Int64
      97             : 
      98             :         evictions         atomic.Int64
      99             :         writeBackFailures atomic.Int64
     100             : 
     101             :         getLatency       prometheus.Histogram
     102             :         diskReadLatency  prometheus.Histogram
     103             :         queuePutLatency  prometheus.Histogram
     104             :         putLatency       prometheus.Histogram
     105             :         diskWriteLatency prometheus.Histogram
     106             : }
     107             : 
     108             : const (
     109             :         // writeWorkersPerShard is used to establish the number of worker goroutines
     110             :         // that perform writes to the cache.
     111             :         writeWorkersPerShard = 4
     112             :         // writeTaskPerWorker is used to establish how many tasks can be queued up
     113             :         // until we have to block.
     114             :         writeTasksPerWorker = 4
     115             : )
     116             : 
     117             : // Open opens a cache. If there is no existing cache at fsDir, a new one
     118             : // is created.
     119             : func Open(
     120             :         fs vfs.FS,
     121             :         logger base.Logger,
     122             :         fsDir string,
     123             :         blockSize int,
     124             :         // shardingBlockSize is the size of a shard block. The cache is split into contiguous
     125             :         // shardingBlockSize units. The units are distributed across multiple independent shards
     126             :         // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
     127             :         // policies operate at the level of shard, not whole cache. This is done to reduce lock
     128             :         // contention.
     129             :         shardingBlockSize int64,
     130             :         sizeBytes int64,
     131             :         numShards int,
     132           2 : ) (*Cache, error) {
     133           2 :         if minSize := shardingBlockSize * int64(numShards); sizeBytes < minSize {
     134           0 :                 // Up the size so that we have one block per shard. In practice, this should
     135           0 :                 // only happen in tests.
     136           0 :                 sizeBytes = minSize
     137           0 :         }
     138             : 
     139           2 :         c := &Cache{
     140           2 :                 logger:            logger,
     141           2 :                 bm:                makeBlockMath(blockSize),
     142           2 :                 shardingBlockSize: shardingBlockSize,
     143           2 :         }
     144           2 :         c.shards = make([]shard, numShards)
     145           2 :         blocksPerShard := sizeBytes / int64(numShards) / int64(blockSize)
     146           2 :         for i := range c.shards {
     147           2 :                 if err := c.shards[i].init(c, fs, fsDir, i, blocksPerShard, blockSize, shardingBlockSize); err != nil {
     148           0 :                         return nil, err
     149           0 :                 }
     150             :         }
     151             : 
     152           2 :         c.writeWorkers.Start(c, numShards*writeWorkersPerShard)
     153           2 : 
     154           2 :         c.metrics.getLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
     155           2 :         c.metrics.diskReadLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
     156           2 :         c.metrics.putLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
     157           2 :         c.metrics.diskWriteLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
     158           2 : 
     159           2 :         // Measures a channel write, so lower min.
     160           2 :         c.metrics.queuePutLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: ChannelWriteBuckets})
     161           2 : 
     162           2 :         return c, nil
     163             : }
     164             : 
     165             : // Close closes the cache. Methods such as ReadAt should not be called after Close is
     166             : // called.
     167           2 : func (c *Cache) Close() error {
     168           2 :         c.writeWorkers.Stop()
     169           2 : 
     170           2 :         var retErr error
     171           2 :         for i := range c.shards {
     172           2 :                 if err := c.shards[i].close(); err != nil && retErr == nil {
     173           0 :                         retErr = err
     174           0 :                 }
     175             :         }
     176           2 :         c.shards = nil
     177           2 :         return retErr
     178             : }
     179             : 
     180             : // Metrics return metrics for the cache. Callers should not mutate
     181             : // the returned histograms, which are pointer types.
     182           2 : func (c *Cache) Metrics() Metrics {
     183           2 :         return Metrics{
     184           2 :                 Count:               c.metrics.count.Load(),
     185           2 :                 Size:                c.metrics.count.Load() * int64(c.bm.BlockSize()),
     186           2 :                 TotalReads:          c.metrics.totalReads.Load(),
     187           2 :                 MultiShardReads:     c.metrics.multiShardReads.Load(),
     188           2 :                 MultiBlockReads:     c.metrics.multiBlockReads.Load(),
     189           2 :                 ReadsWithFullHit:    c.metrics.readsWithFullHit.Load(),
     190           2 :                 ReadsWithPartialHit: c.metrics.readsWithPartialHit.Load(),
     191           2 :                 ReadsWithNoHit:      c.metrics.readsWithNoHit.Load(),
     192           2 :                 Evictions:           c.metrics.evictions.Load(),
     193           2 :                 WriteBackFailures:   c.metrics.writeBackFailures.Load(),
     194           2 :                 GetLatency:          c.metrics.getLatency,
     195           2 :                 DiskReadLatency:     c.metrics.diskReadLatency,
     196           2 :                 QueuePutLatency:     c.metrics.queuePutLatency,
     197           2 :                 PutLatency:          c.metrics.putLatency,
     198           2 :                 DiskWriteLatency:    c.metrics.diskWriteLatency,
     199           2 :         }
     200           2 : }
     201             : 
     202             : // ReadFlags contains options for Cache.ReadAt.
     203             : type ReadFlags struct {
     204             :         // ReadOnly instructs ReadAt to not write any new data into the cache; it is
     205             :         // used when the data is unlikely to be used again.
     206             :         ReadOnly bool
     207             : }
     208             : 
     209             : // ReadAt performs a read form an object, attempting to use cached data when
     210             : // possible.
     211             : func (c *Cache) ReadAt(
     212             :         ctx context.Context,
     213             :         fileNum base.DiskFileNum,
     214             :         p []byte,
     215             :         ofs int64,
     216             :         objReader remote.ObjectReader,
     217             :         objSize int64,
     218             :         flags ReadFlags,
     219           2 : ) error {
     220           2 :         c.metrics.totalReads.Add(1)
     221           2 :         if ofs >= objSize {
     222           0 :                 if invariants.Enabled {
     223           0 :                         panic(fmt.Sprintf("invalid ReadAt offset %v %v", ofs, objSize))
     224             :                 }
     225           0 :                 return io.EOF
     226             :         }
     227             :         // TODO(radu): for compaction reads, we may not want to read from the cache at
     228             :         // all.
     229           2 :         {
     230           2 :                 start := time.Now()
     231           2 :                 n, err := c.get(fileNum, p, ofs)
     232           2 :                 c.metrics.getLatency.Observe(float64(time.Since(start)))
     233           2 :                 if err != nil {
     234           1 :                         return err
     235           1 :                 }
     236           2 :                 if n == len(p) {
     237           2 :                         // Everything was in cache!
     238           2 :                         c.metrics.readsWithFullHit.Add(1)
     239           2 :                         return nil
     240           2 :                 }
     241           2 :                 if n == 0 {
     242           2 :                         c.metrics.readsWithNoHit.Add(1)
     243           2 :                 } else {
     244           1 :                         c.metrics.readsWithPartialHit.Add(1)
     245           1 :                 }
     246             : 
     247             :                 // Note this. The below code does not need the original ofs, as with the earlier
     248             :                 // reading from the cache done, the relevant offset is ofs + int64(n). Same with p.
     249           2 :                 ofs += int64(n)
     250           2 :                 p = p[n:]
     251           2 : 
     252           2 :                 if invariants.Enabled {
     253           2 :                         if n != 0 && c.bm.Remainder(ofs) != 0 {
     254           0 :                                 panic(fmt.Sprintf("after non-zero read from cache, ofs is not block-aligned: %v %v", ofs, n))
     255             :                         }
     256             :                 }
     257             :         }
     258             : 
     259           2 :         if flags.ReadOnly {
     260           2 :                 return objReader.ReadAt(ctx, p, ofs)
     261           2 :         }
     262             : 
     263             :         // We must do reads with offset & size that are multiples of the block size. Else
     264             :         // later cache hits may return incorrect zeroed results from the cache.
     265           2 :         firstBlockInd := c.bm.Block(ofs)
     266           2 :         adjustedOfs := c.bm.BlockOffset(firstBlockInd)
     267           2 : 
     268           2 :         // Take the length of what is left to read plus the length of the adjustment of
     269           2 :         // the offset plus the size of a block minus one and divide by the size of a block
     270           2 :         // to get the number of blocks to read from the object.
     271           2 :         sizeOfOffAdjustment := int(ofs - adjustedOfs)
     272           2 :         adjustedLen := int(c.bm.RoundUp(int64(len(p) + sizeOfOffAdjustment)))
     273           2 :         adjustedP := make([]byte, adjustedLen)
     274           2 : 
     275           2 :         // Read the rest from the object. We may need to cap the length to avoid past EOF reads.
     276           2 :         eofCap := int64(adjustedLen)
     277           2 :         if adjustedOfs+eofCap > objSize {
     278           2 :                 eofCap = objSize - adjustedOfs
     279           2 :         }
     280           2 :         if err := objReader.ReadAt(ctx, adjustedP[:eofCap], adjustedOfs); err != nil {
     281           0 :                 return err
     282           0 :         }
     283           2 :         copy(p, adjustedP[sizeOfOffAdjustment:])
     284           2 : 
     285           2 :         start := time.Now()
     286           2 :         c.writeWorkers.QueueWrite(fileNum, adjustedP, adjustedOfs)
     287           2 :         c.metrics.queuePutLatency.Observe(float64(time.Since(start)))
     288           2 : 
     289           2 :         return nil
     290             : }
     291             : 
     292             : // get attempts to read the requested data from the cache, if it is already
     293             : // there.
     294             : //
     295             : // If all data is available, returns n = len(p).
     296             : //
     297             : // If data is partially available, a prefix of the data is read; returns n < len(p)
     298             : // and no error. If no prefix is available, returns n = 0 and no error.
     299           2 : func (c *Cache) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
     300           2 :         // The data extent might cross shard boundaries, hence the loop. In the hot
     301           2 :         // path, max two iterations of this loop will be executed, since reads are sized
     302           2 :         // in units of sstable block size.
     303           2 :         var multiShard bool
     304           2 :         for {
     305           2 :                 shard := c.getShard(fileNum, ofs+int64(n))
     306           2 :                 cappedLen := len(p[n:])
     307           2 :                 if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
     308           1 :                         cappedLen = toBoundary
     309           1 :                 }
     310           2 :                 numRead, err := shard.get(fileNum, p[n:n+cappedLen], ofs+int64(n))
     311           2 :                 if err != nil {
     312           1 :                         return n, err
     313           1 :                 }
     314           2 :                 n += numRead
     315           2 :                 if numRead < cappedLen {
     316           2 :                         // We only read a prefix from this shard.
     317           2 :                         return n, nil
     318           2 :                 }
     319           2 :                 if n == len(p) {
     320           2 :                         // We are done.
     321           2 :                         return n, nil
     322           2 :                 }
     323             :                 // Data extent crosses shard boundary, continue with next shard.
     324           1 :                 if !multiShard {
     325           1 :                         c.metrics.multiShardReads.Add(1)
     326           1 :                         multiShard = true
     327           1 :                 }
     328             :         }
     329             : }
     330             : 
     331             : // set attempts to write the requested data to the cache. Both ofs & len(p) must
     332             : // be multiples of the block size.
     333             : //
     334             : // If all of p is not written to the shard, set returns a non-nil error.
     335           2 : func (c *Cache) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
     336           2 :         if invariants.Enabled {
     337           2 :                 if c.bm.Remainder(ofs) != 0 || c.bm.Remainder(int64(len(p))) != 0 {
     338           0 :                         panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
     339             :                 }
     340             :         }
     341             : 
     342             :         // The data extent might cross shard boundaries, hence the loop. In the hot
     343             :         // path, max two iterations of this loop will be executed, since reads are sized
     344             :         // in units of sstable block size.
     345           2 :         n := 0
     346           2 :         for {
     347           2 :                 shard := c.getShard(fileNum, ofs+int64(n))
     348           2 :                 cappedLen := len(p[n:])
     349           2 :                 if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
     350           1 :                         cappedLen = toBoundary
     351           1 :                 }
     352           2 :                 err := shard.set(fileNum, p[n:n+cappedLen], ofs+int64(n))
     353           2 :                 if err != nil {
     354           0 :                         return err
     355           0 :                 }
     356             :                 // set returns an error if cappedLen bytes aren't written to the shard.
     357           2 :                 n += cappedLen
     358           2 :                 if n == len(p) {
     359           2 :                         // We are done.
     360           2 :                         return nil
     361           2 :                 }
     362             :                 // Data extent crosses shard boundary, continue with next shard.
     363             :         }
     364             : }
     365             : 
     366           2 : func (c *Cache) getShard(fileNum base.DiskFileNum, ofs int64) *shard {
     367           2 :         const prime64 = 1099511628211
     368           2 :         hash := uint64(fileNum)*prime64 + uint64(ofs/c.shardingBlockSize)
     369           2 :         // TODO(josh): Instance change ops are often run in production. Such an operation
     370           2 :         // updates len(c.shards); see openSharedCache. As a result, the behavior of this
     371           2 :         // function changes, and the cache empties out at restart time. We may want a better
     372           2 :         // story here eventually.
     373           2 :         return &c.shards[hash%uint64(len(c.shards))]
     374           2 : }
     375             : 
     376             : type shard struct {
     377             :         cache             *Cache
     378             :         file              vfs.File
     379             :         sizeInBlocks      int64
     380             :         bm                blockMath
     381             :         shardingBlockSize int64
     382             :         mu                struct {
     383             :                 sync.Mutex
     384             :                 // TODO(josh): None of these datastructures are space-efficient.
     385             :                 // Focusing on correctness to start.
     386             :                 where  whereMap
     387             :                 blocks []cacheBlockState
     388             :                 // Head of LRU list (doubly-linked circular).
     389             :                 lruHead cacheBlockIndex
     390             :                 // Head of free list (singly-linked chain).
     391             :                 freeHead cacheBlockIndex
     392             :         }
     393             : }
     394             : 
     395             : type cacheBlockState struct {
     396             :         lock    lockState
     397             :         logical logicalBlockID
     398             : 
     399             :         // next is the next block in the LRU or free list (or invalidBlockIndex if it
     400             :         // is the last block in the free list).
     401             :         next cacheBlockIndex
     402             : 
     403             :         // prev is the previous block in the LRU list. It is not used when the block
     404             :         // is in the free list.
     405             :         prev cacheBlockIndex
     406             : }
     407             : 
     408             : // Maps a logical block in an SST to an index of the cache block with the
     409             : // file contents (to the "cache block index").
     410             : type whereMap map[logicalBlockID]cacheBlockIndex
     411             : 
     412             : type logicalBlockID struct {
     413             :         filenum       base.DiskFileNum
     414             :         cacheBlockIdx cacheBlockIndex
     415             : }
     416             : 
     417             : type lockState int64
     418             : 
     419             : const (
     420             :         unlocked lockState = 0
     421             :         // >0 lockState tracks the number of distinct readers of some cache block / logical block
     422             :         // which is in the secondary cache. It is used to ensure that a cache block is not evicted
     423             :         // and overwritten, while there are active readers.
     424             :         readLockTakenInc = 1
     425             :         // -1 lockState indicates that some cache block is currently being populated with data from
     426             :         // blob storage. It is used to ensure that a cache block is not read or evicted again, while
     427             :         // it is being populated.
     428             :         writeLockTaken = -1
     429             : )
     430             : 
     431             : func (s *shard) init(
     432             :         cache *Cache,
     433             :         fs vfs.FS,
     434             :         fsDir string,
     435             :         shardIdx int,
     436             :         sizeInBlocks int64,
     437             :         blockSize int,
     438             :         shardingBlockSize int64,
     439           2 : ) error {
     440           2 :         *s = shard{
     441           2 :                 cache:        cache,
     442           2 :                 sizeInBlocks: sizeInBlocks,
     443           2 :         }
     444           2 :         if blockSize < 1024 || shardingBlockSize%int64(blockSize) != 0 {
     445           0 :                 return errors.Newf("invalid block size %d (must divide %d)", blockSize, shardingBlockSize)
     446           0 :         }
     447           2 :         s.bm = makeBlockMath(blockSize)
     448           2 :         s.shardingBlockSize = shardingBlockSize
     449           2 :         file, err := fs.OpenReadWrite(fs.PathJoin(fsDir, fmt.Sprintf("SHARED-CACHE-%03d", shardIdx)), vfs.WriteCategoryUnspecified)
     450           2 :         if err != nil {
     451           0 :                 return err
     452           0 :         }
     453             :         // TODO(radu): truncate file if necessary (especially important if we restart
     454             :         // with more shards).
     455           2 :         if err := file.Preallocate(0, int64(blockSize)*sizeInBlocks); err != nil {
     456           0 :                 return err
     457           0 :         }
     458           2 :         s.file = file
     459           2 : 
     460           2 :         // TODO(josh): Right now, the secondary cache is not persistent. All existing
     461           2 :         // cache contents will be over-written, since all metadata is only stored in
     462           2 :         // memory.
     463           2 :         s.mu.where = make(whereMap)
     464           2 :         s.mu.blocks = make([]cacheBlockState, sizeInBlocks)
     465           2 :         s.mu.lruHead = invalidBlockIndex
     466           2 :         s.mu.freeHead = invalidBlockIndex
     467           2 :         for i := range s.mu.blocks {
     468           2 :                 s.freePush(cacheBlockIndex(i))
     469           2 :         }
     470             : 
     471           2 :         return nil
     472             : }
     473             : 
     474           2 : func (s *shard) close() error {
     475           2 :         defer func() {
     476           2 :                 s.file = nil
     477           2 :         }()
     478           2 :         return s.file.Close()
     479             : }
     480             : 
     481             : // freePush pushes a block to the front of the free list.
     482           2 : func (s *shard) freePush(index cacheBlockIndex) {
     483           2 :         s.mu.blocks[index].next = s.mu.freeHead
     484           2 :         s.mu.freeHead = index
     485           2 : }
     486             : 
     487             : // freePop removes the block from the front of the free list. Must not be called
     488             : // if the list is empty (i.e. freeHead = invalidBlockIndex).
     489           2 : func (s *shard) freePop() cacheBlockIndex {
     490           2 :         index := s.mu.freeHead
     491           2 :         s.mu.freeHead = s.mu.blocks[index].next
     492           2 :         return index
     493           2 : }
     494             : 
     495             : // lruInsertFront inserts a block at the front of the LRU list.
     496           2 : func (s *shard) lruInsertFront(index cacheBlockIndex) {
     497           2 :         b := &s.mu.blocks[index]
     498           2 :         if s.mu.lruHead == invalidBlockIndex {
     499           2 :                 b.next = index
     500           2 :                 b.prev = index
     501           2 :         } else {
     502           2 :                 b.next = s.mu.lruHead
     503           2 :                 h := &s.mu.blocks[s.mu.lruHead]
     504           2 :                 b.prev = h.prev
     505           2 :                 s.mu.blocks[h.prev].next = index
     506           2 :                 h.prev = index
     507           2 :         }
     508           2 :         s.mu.lruHead = index
     509             : }
     510             : 
     511           2 : func (s *shard) lruNext(index cacheBlockIndex) cacheBlockIndex {
     512           2 :         return s.mu.blocks[index].next
     513           2 : }
     514             : 
     515           2 : func (s *shard) lruPrev(index cacheBlockIndex) cacheBlockIndex {
     516           2 :         return s.mu.blocks[index].prev
     517           2 : }
     518             : 
     519             : // lruUnlink removes a block from the LRU list.
     520           2 : func (s *shard) lruUnlink(index cacheBlockIndex) {
     521           2 :         b := &s.mu.blocks[index]
     522           2 :         if b.next == index {
     523           2 :                 s.mu.lruHead = invalidBlockIndex
     524           2 :         } else {
     525           2 :                 s.mu.blocks[b.prev].next = b.next
     526           2 :                 s.mu.blocks[b.next].prev = b.prev
     527           2 :                 if s.mu.lruHead == index {
     528           2 :                         s.mu.lruHead = b.next
     529           2 :                 }
     530             :         }
     531           2 :         b.next, b.prev = invalidBlockIndex, invalidBlockIndex
     532             : }
     533             : 
     534             : // get attempts to read the requested data from the shard. The data must not
     535             : // cross a shard boundary.
     536             : //
     537             : // If all data is available, returns n = len(p).
     538             : //
     539             : // If data is partially available, a prefix of the data is read; returns n < len(p)
     540             : // and no error. If no prefix is available, returns n = 0 and no error.
     541             : //
     542             : // TODO(josh): Today, if there are two cache blocks needed to satisfy a read, and the
     543             : // first block is not in the cache and the second one is, we will read both from
     544             : // blob storage. We should fix this. This is not an unlikely scenario if we are doing
     545             : // a reverse scan, since those iterate over sstable blocks in reverse order and due to
     546             : // cache block aligned reads will have read the suffix of the sstable block that will
     547             : // be needed next.
     548           2 : func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
     549           2 :         if invariants.Enabled {
     550           2 :                 if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
     551           0 :                         panic(fmt.Sprintf("get crosses shard boundary: %v %v", ofs, len(p)))
     552             :                 }
     553           2 :                 s.assertShardStateIsConsistent()
     554             :         }
     555             : 
     556             :         // The data extent might cross cache block boundaries, hence the loop. In the hot
     557             :         // path, max two iterations of this loop will be executed, since reads are sized
     558             :         // in units of sstable block size.
     559           2 :         var multiBlock bool
     560           2 :         for {
     561           2 :                 k := logicalBlockID{
     562           2 :                         filenum:       fileNum,
     563           2 :                         cacheBlockIdx: s.bm.Block(ofs + int64(n)),
     564           2 :                 }
     565           2 :                 s.mu.Lock()
     566           2 :                 cacheBlockIdx, ok := s.mu.where[k]
     567           2 :                 // TODO(josh): Multiple reads within the same few milliseconds (anything that is smaller
     568           2 :                 // than blob storage read latency) that miss on the same logical block ID will not necessarily
     569           2 :                 // be rare. We may want to do only one read, with the later readers blocking on the first read
     570           2 :                 // completing. This could be implemented either here or in the primary block cache. See
     571           2 :                 // https://github.com/cockroachdb/pebble/pull/2586 for additional discussion.
     572           2 :                 if !ok {
     573           2 :                         s.mu.Unlock()
     574           2 :                         return n, nil
     575           2 :                 }
     576           2 :                 if s.mu.blocks[cacheBlockIdx].lock == writeLockTaken {
     577           2 :                         // In practice, if we have two reads of the same SST block in close succession, we
     578           2 :                         // would expect the second to hit in the in-memory block cache. So it's not worth
     579           2 :                         // optimizing this case here.
     580           2 :                         s.mu.Unlock()
     581           2 :                         return n, nil
     582           2 :                 }
     583           2 :                 s.mu.blocks[cacheBlockIdx].lock += readLockTakenInc
     584           2 :                 // Move to front of the LRU list.
     585           2 :                 s.lruUnlink(cacheBlockIdx)
     586           2 :                 s.lruInsertFront(cacheBlockIdx)
     587           2 :                 s.mu.Unlock()
     588           2 : 
     589           2 :                 readAt := s.bm.BlockOffset(cacheBlockIdx)
     590           2 :                 readSize := s.bm.BlockSize()
     591           2 :                 if n == 0 { // if first read
     592           2 :                         rem := s.bm.Remainder(ofs)
     593           2 :                         readAt += rem
     594           2 :                         readSize -= int(rem)
     595           2 :                 }
     596             : 
     597           2 :                 if len(p[n:]) <= readSize {
     598           2 :                         start := time.Now()
     599           2 :                         numRead, err := s.file.ReadAt(p[n:], readAt)
     600           2 :                         s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
     601           2 :                         s.dropReadLock(cacheBlockIdx)
     602           2 :                         return n + numRead, err
     603           2 :                 }
     604           1 :                 start := time.Now()
     605           1 :                 numRead, err := s.file.ReadAt(p[n:n+readSize], readAt)
     606           1 :                 s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
     607           1 :                 s.dropReadLock(cacheBlockIdx)
     608           1 :                 if err != nil {
     609           0 :                         return 0, err
     610           0 :                 }
     611             : 
     612             :                 // Note that numRead == readSize, since we checked for an error above.
     613           1 :                 n += numRead
     614           1 : 
     615           1 :                 if !multiBlock {
     616           1 :                         s.cache.metrics.multiBlockReads.Add(1)
     617           1 :                         multiBlock = true
     618           1 :                 }
     619             :         }
     620             : }
     621             : 
     622             : // set attempts to write the requested data to the shard. The data must not
     623             : // cross a shard boundary, and both ofs & len(p) must be multiples of the
     624             : // block size.
     625             : //
     626             : // If all of p is not written to the shard, set returns a non-nil error.
     627           2 : func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
     628           2 :         if invariants.Enabled {
     629           2 :                 if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
     630           0 :                         panic(fmt.Sprintf("set crosses shard boundary: %v %v", ofs, len(p)))
     631             :                 }
     632           2 :                 if s.bm.Remainder(ofs) != 0 || s.bm.Remainder(int64(len(p))) != 0 {
     633           0 :                         panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
     634             :                 }
     635           2 :                 s.assertShardStateIsConsistent()
     636             :         }
     637             : 
     638             :         // The data extent might cross cache block boundaries, hence the loop. In the hot
     639             :         // path, max two iterations of this loop will be executed, since reads are sized
     640             :         // in units of sstable block size.
     641           2 :         n := 0
     642           2 :         for {
     643           2 :                 if n == len(p) {
     644           2 :                         return nil
     645           2 :                 }
     646           2 :                 if invariants.Enabled {
     647           2 :                         if n > len(p) {
     648           0 :                                 panic(fmt.Sprintf("set with n greater than len(p): %v %v", n, len(p)))
     649             :                         }
     650             :                 }
     651             : 
     652             :                 // If the logical block is already in the cache, we should skip doing a set.
     653           2 :                 k := logicalBlockID{
     654           2 :                         filenum:       fileNum,
     655           2 :                         cacheBlockIdx: s.bm.Block(ofs + int64(n)),
     656           2 :                 }
     657           2 :                 s.mu.Lock()
     658           2 :                 if _, ok := s.mu.where[k]; ok {
     659           2 :                         s.mu.Unlock()
     660           2 :                         n += s.bm.BlockSize()
     661           2 :                         continue
     662             :                 }
     663             : 
     664           2 :                 var cacheBlockIdx cacheBlockIndex
     665           2 :                 if s.mu.freeHead == invalidBlockIndex {
     666           2 :                         if invariants.Enabled && s.mu.lruHead == invalidBlockIndex {
     667           0 :                                 panic("both LRU and free lists empty")
     668             :                         }
     669             : 
     670             :                         // Find the last element in the LRU list which is not locked.
     671           2 :                         for idx := s.lruPrev(s.mu.lruHead); ; idx = s.lruPrev(idx) {
     672           2 :                                 if lock := s.mu.blocks[idx].lock; lock == unlocked {
     673           2 :                                         cacheBlockIdx = idx
     674           2 :                                         break
     675             :                                 }
     676           0 :                                 if idx == s.mu.lruHead {
     677           0 :                                         // No unlocked block to evict.
     678           0 :                                         //
     679           0 :                                         // TODO(josh): We may want to block until a block frees up, instead of returning
     680           0 :                                         // an error here. But I think we can do that later on, e.g. after running some production
     681           0 :                                         // experiments.
     682           0 :                                         s.mu.Unlock()
     683           0 :                                         return errors.New("no block to evict so skipping write to cache")
     684           0 :                                 }
     685             :                         }
     686           2 :                         s.cache.metrics.evictions.Add(1)
     687           2 :                         s.lruUnlink(cacheBlockIdx)
     688           2 :                         delete(s.mu.where, s.mu.blocks[cacheBlockIdx].logical)
     689           2 :                 } else {
     690           2 :                         s.cache.metrics.count.Add(1)
     691           2 :                         cacheBlockIdx = s.freePop()
     692           2 :                 }
     693             : 
     694           2 :                 s.lruInsertFront(cacheBlockIdx)
     695           2 :                 s.mu.where[k] = cacheBlockIdx
     696           2 :                 s.mu.blocks[cacheBlockIdx].logical = k
     697           2 :                 s.mu.blocks[cacheBlockIdx].lock = writeLockTaken
     698           2 :                 s.mu.Unlock()
     699           2 : 
     700           2 :                 writeAt := s.bm.BlockOffset(cacheBlockIdx)
     701           2 : 
     702           2 :                 writeSize := s.bm.BlockSize()
     703           2 :                 if len(p[n:]) <= writeSize {
     704           2 :                         writeSize = len(p[n:])
     705           2 :                 }
     706             : 
     707           2 :                 start := time.Now()
     708           2 :                 _, err := s.file.WriteAt(p[n:n+writeSize], writeAt)
     709           2 :                 s.cache.metrics.diskWriteLatency.Observe(float64(time.Since(start)))
     710           2 :                 if err != nil {
     711           0 :                         // Free the block.
     712           0 :                         s.mu.Lock()
     713           0 :                         defer s.mu.Unlock()
     714           0 : 
     715           0 :                         delete(s.mu.where, k)
     716           0 :                         s.lruUnlink(cacheBlockIdx)
     717           0 :                         s.freePush(cacheBlockIdx)
     718           0 :                         return err
     719           0 :                 }
     720           2 :                 s.dropWriteLock(cacheBlockIdx)
     721           2 :                 n += writeSize
     722             :         }
     723             : }
     724             : 
     725             : // Doesn't inline currently. This might be okay, but something to keep in mind.
     726           2 : func (s *shard) dropReadLock(cacheBlockInd cacheBlockIndex) {
     727           2 :         s.mu.Lock()
     728           2 :         s.mu.blocks[cacheBlockInd].lock -= readLockTakenInc
     729           2 :         if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock < 0 {
     730           0 :                 panic(fmt.Sprintf("unexpected lock state %v in dropReadLock", s.mu.blocks[cacheBlockInd].lock))
     731             :         }
     732           2 :         s.mu.Unlock()
     733             : }
     734             : 
     735             : // Doesn't inline currently. This might be okay, but something to keep in mind.
     736           2 : func (s *shard) dropWriteLock(cacheBlockInd cacheBlockIndex) {
     737           2 :         s.mu.Lock()
     738           2 :         if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock != writeLockTaken {
     739           0 :                 panic(fmt.Sprintf("unexpected lock state %v in dropWriteLock", s.mu.blocks[cacheBlockInd].lock))
     740             :         }
     741           2 :         s.mu.blocks[cacheBlockInd].lock = unlocked
     742           2 :         s.mu.Unlock()
     743             : }
     744             : 
     745           2 : func (s *shard) assertShardStateIsConsistent() {
     746           2 :         s.mu.Lock()
     747           2 :         defer s.mu.Unlock()
     748           2 : 
     749           2 :         lruLen := 0
     750           2 :         if s.mu.lruHead != invalidBlockIndex {
     751           2 :                 for b := s.mu.lruHead; ; {
     752           2 :                         lruLen++
     753           2 :                         if idx, ok := s.mu.where[s.mu.blocks[b].logical]; !ok || idx != b {
     754           0 :                                 panic("block in LRU list with no entry in where map")
     755             :                         }
     756           2 :                         b = s.lruNext(b)
     757           2 :                         if b == s.mu.lruHead {
     758           2 :                                 break
     759             :                         }
     760             :                 }
     761             :         }
     762           2 :         if lruLen != len(s.mu.where) {
     763           0 :                 panic(fmt.Sprintf("lru list len is %d but where map has %d entries", lruLen, len(s.mu.where)))
     764             :         }
     765           2 :         freeLen := 0
     766           2 :         for n := s.mu.freeHead; n != invalidBlockIndex; n = s.mu.blocks[n].next {
     767           2 :                 freeLen++
     768           2 :         }
     769             : 
     770           2 :         if lruLen+freeLen != int(s.sizeInBlocks) {
     771           0 :                 panic(fmt.Sprintf("%d lru blocks and %d free blocks don't add up to %d", lruLen, freeLen, s.sizeInBlocks))
     772             :         }
     773           2 :         for i := range s.mu.blocks {
     774           2 :                 if state := s.mu.blocks[i].lock; state < writeLockTaken {
     775           0 :                         panic(fmt.Sprintf("lock state %v is not allowed", state))
     776             :                 }
     777             :         }
     778             : }
     779             : 
     780             : // cacheBlockIndex is the index of a blockSize-aligned cache block.
     781             : type cacheBlockIndex int64
     782             : 
     783             : // invalidBlockIndex is used for the head of a list when the list is empty.
     784             : const invalidBlockIndex cacheBlockIndex = -1
     785             : 
     786             : // blockMath is a helper type for performing conversions between offsets and
     787             : // block indexes.
     788             : type blockMath struct {
     789             :         blockSizeBits int8
     790             : }
     791             : 
     792           2 : func makeBlockMath(blockSize int) blockMath {
     793           2 :         bm := blockMath{
     794           2 :                 blockSizeBits: int8(bits.Len64(uint64(blockSize)) - 1),
     795           2 :         }
     796           2 :         if blockSize != (1 << bm.blockSizeBits) {
     797           0 :                 panic(fmt.Sprintf("blockSize %d is not a power of 2", blockSize))
     798             :         }
     799           2 :         return bm
     800             : }
     801             : 
     802           2 : func (bm blockMath) mask() int64 {
     803           2 :         return (1 << bm.blockSizeBits) - 1
     804           2 : }
     805             : 
     806             : // BlockSize returns the block size.
     807           2 : func (bm blockMath) BlockSize() int {
     808           2 :         return 1 << bm.blockSizeBits
     809           2 : }
     810             : 
     811             : // Block returns the block index containing the given offset.
     812           2 : func (bm blockMath) Block(offset int64) cacheBlockIndex {
     813           2 :         return cacheBlockIndex(offset >> bm.blockSizeBits)
     814           2 : }
     815             : 
     816             : // Remainder returns the offset relative to the start of the cache block.
     817           2 : func (bm blockMath) Remainder(offset int64) int64 {
     818           2 :         return offset & bm.mask()
     819           2 : }
     820             : 
     821             : // BlockOffset returns the object offset where the given block starts.
     822           2 : func (bm blockMath) BlockOffset(block cacheBlockIndex) int64 {
     823           2 :         return int64(block) << bm.blockSizeBits
     824           2 : }
     825             : 
     826             : // RoundUp rounds up the given value to the closest multiple of block size.
     827           2 : func (bm blockMath) RoundUp(x int64) int64 {
     828           2 :         return (x + bm.mask()) & ^(bm.mask())
     829           2 : }
     830             : 
     831             : type writeWorkers struct {
     832             :         doneCh        chan struct{}
     833             :         doneWaitGroup sync.WaitGroup
     834             : 
     835             :         numWorkers int
     836             :         tasksCh    chan writeTask
     837             : }
     838             : 
     839             : type writeTask struct {
     840             :         fileNum base.DiskFileNum
     841             :         p       []byte
     842             :         offset  int64
     843             : }
     844             : 
     845             : // Start starts the worker goroutines.
     846           2 : func (w *writeWorkers) Start(c *Cache, numWorkers int) {
     847           2 :         doneCh := make(chan struct{})
     848           2 :         tasksCh := make(chan writeTask, numWorkers*writeTasksPerWorker)
     849           2 : 
     850           2 :         w.numWorkers = numWorkers
     851           2 :         w.doneCh = doneCh
     852           2 :         w.tasksCh = tasksCh
     853           2 :         w.doneWaitGroup.Add(numWorkers)
     854           2 :         for i := 0; i < numWorkers; i++ {
     855           2 :                 go func() {
     856           2 :                         defer w.doneWaitGroup.Done()
     857           2 :                         for {
     858           2 :                                 select {
     859           2 :                                 case <-doneCh:
     860           2 :                                         return
     861           2 :                                 case task, ok := <-tasksCh:
     862           2 :                                         if !ok {
     863           1 :                                                 // The tasks channel was closed; this is used in testing code to
     864           1 :                                                 // ensure all writes are completed.
     865           1 :                                                 return
     866           1 :                                         }
     867             :                                         // TODO(radu): set() can perform multiple writes; perhaps each one
     868             :                                         // should be its own task.
     869           2 :                                         start := time.Now()
     870           2 :                                         err := c.set(task.fileNum, task.p, task.offset)
     871           2 :                                         c.metrics.putLatency.Observe(float64(time.Since(start)))
     872           2 :                                         if err != nil {
     873           0 :                                                 c.metrics.writeBackFailures.Add(1)
     874           0 :                                                 // TODO(radu): throttle logs.
     875           0 :                                                 c.logger.Errorf("writing back to cache after miss failed: %v", err)
     876           0 :                                         }
     877             :                                 }
     878             :                         }
     879             :                 }()
     880             :         }
     881             : }
     882             : 
     883             : // Stop waits for any in-progress writes to complete and stops the worker
     884             : // goroutines and waits for any in-pro. Any queued writes not yet started are
     885             : // discarded.
     886           2 : func (w *writeWorkers) Stop() {
     887           2 :         close(w.doneCh)
     888           2 :         w.doneCh = nil
     889           2 :         w.tasksCh = nil
     890           2 :         w.doneWaitGroup.Wait()
     891           2 : }
     892             : 
     893             : // QueueWrite adds a write task to the queue. Can block if the queue is full.
     894           2 : func (w *writeWorkers) QueueWrite(fileNum base.DiskFileNum, p []byte, offset int64) {
     895           2 :         w.tasksCh <- writeTask{
     896           2 :                 fileNum: fileNum,
     897           2 :                 p:       p,
     898           2 :                 offset:  offset,
     899           2 :         }
     900           2 : }

Generated by: LCOV version 1.14