LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider/sharedcache - shared_cache.go (source / functions) Hit Total Coverage
Test: 2024-04-04 08:16Z 65d5ba68 - meta test only.lcov Lines: 408 485 84.1 %
Date: 2024-04-04 08:16:55 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sharedcache
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "io"
      11             :         "math/bits"
      12             :         "sync"
      13             :         "sync/atomic"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/objstorage/remote"
      20             :         "github.com/cockroachdb/pebble/vfs"
      21             :         "github.com/prometheus/client_golang/prometheus"
      22             : )
      23             : 
      24             : // Exported to enable exporting from package pebble to enable
      25             : // exporting metrics with below buckets in CRDB.
      26             : var (
      27             :         IOBuckets           = prometheus.ExponentialBucketsRange(float64(time.Millisecond*1), float64(10*time.Second), 50)
      28             :         ChannelWriteBuckets = prometheus.ExponentialBucketsRange(float64(time.Microsecond*1), float64(10*time.Second), 50)
      29             : )
      30             : 
      31             : // Cache is a persistent cache backed by a local filesystem. It is intended
      32             : // to cache data that is in slower shared storage (e.g. S3), hence the
      33             : // package name 'sharedcache'.
      34             : type Cache struct {
      35             :         shards       []shard
      36             :         writeWorkers writeWorkers
      37             : 
      38             :         bm                blockMath
      39             :         shardingBlockSize int64
      40             : 
      41             :         logger  base.Logger
      42             :         metrics internalMetrics
      43             : }
      44             : 
      45             : // Metrics is a struct containing metrics exported by the secondary cache.
      46             : // TODO(josh): Reconsider the set of metrics exported by the secondary cache
      47             : // before we release the secondary cache to users. We choose to export many metrics
      48             : // right now, so we learn a lot from the benchmarking we are doing over the 23.2
      49             : // cycle.
      50             : type Metrics struct {
      51             :         // The number of sstable bytes stored in the cache.
      52             :         Size int64
      53             :         // The count of cache blocks in the cache (not sstable blocks).
      54             :         Count int64
      55             : 
      56             :         // The number of calls to ReadAt.
      57             :         TotalReads int64
      58             :         // The number of calls to ReadAt that require reading data from 2+ shards.
      59             :         MultiShardReads int64
      60             :         // The number of calls to ReadAt that require reading data from 2+ cache blocks.
      61             :         MultiBlockReads int64
      62             :         // The number of calls to ReadAt where all data returned was read from the cache.
      63             :         ReadsWithFullHit int64
      64             :         // The number of calls to ReadAt where some data returned was read from the cache.
      65             :         ReadsWithPartialHit int64
      66             :         // The number of calls to ReadAt where no data returned was read from the cache.
      67             :         ReadsWithNoHit int64
      68             : 
      69             :         // The number of times a cache block was evicted from the cache.
      70             :         Evictions int64
      71             :         // The number of times writing a cache block to the cache failed.
      72             :         WriteBackFailures int64
      73             : 
      74             :         // The latency of calls to get some data from the cache.
      75             :         GetLatency prometheus.Histogram
      76             :         // The latency of reads of a single cache block from disk.
      77             :         DiskReadLatency prometheus.Histogram
      78             :         // The latency of writing data to write back to the cache to a channel.
      79             :         // Generally should be low, but if the channel is full, could be high.
      80             :         QueuePutLatency prometheus.Histogram
      81             :         // The latency of calls to put some data read from block storage into the cache.
      82             :         PutLatency prometheus.Histogram
      83             :         // The latency of writes of a single cache block to disk.
      84             :         DiskWriteLatency prometheus.Histogram
      85             : }
      86             : 
      87             : // See docs at Metrics.
      88             : type internalMetrics struct {
      89             :         count atomic.Int64
      90             : 
      91             :         totalReads          atomic.Int64
      92             :         multiShardReads     atomic.Int64
      93             :         multiBlockReads     atomic.Int64
      94             :         readsWithFullHit    atomic.Int64
      95             :         readsWithPartialHit atomic.Int64
      96             :         readsWithNoHit      atomic.Int64
      97             : 
      98             :         evictions         atomic.Int64
      99             :         writeBackFailures atomic.Int64
     100             : 
     101             :         getLatency       prometheus.Histogram
     102             :         diskReadLatency  prometheus.Histogram
     103             :         queuePutLatency  prometheus.Histogram
     104             :         putLatency       prometheus.Histogram
     105             :         diskWriteLatency prometheus.Histogram
     106             : }
     107             : 
     108             : const (
     109             :         // writeWorkersPerShard is used to establish the number of worker goroutines
     110             :         // that perform writes to the cache.
     111             :         writeWorkersPerShard = 4
     112             :         // writeTaskPerWorker is used to establish how many tasks can be queued up
     113             :         // until we have to block.
     114             :         writeTasksPerWorker = 4
     115             : )
     116             : 
     117             : // Open opens a cache. If there is no existing cache at fsDir, a new one
     118             : // is created.
     119             : func Open(
     120             :         fs vfs.FS,
     121             :         logger base.Logger,
     122             :         fsDir string,
     123             :         blockSize int,
     124             :         // shardingBlockSize is the size of a shard block. The cache is split into contiguous
     125             :         // shardingBlockSize units. The units are distributed across multiple independent shards
     126             :         // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
     127             :         // policies operate at the level of shard, not whole cache. This is done to reduce lock
     128             :         // contention.
     129             :         shardingBlockSize int64,
     130             :         sizeBytes int64,
     131             :         numShards int,
     132           1 : ) (*Cache, error) {
     133           1 :         if minSize := shardingBlockSize * int64(numShards); sizeBytes < minSize {
     134           0 :                 // Up the size so that we have one block per shard. In practice, this should
     135           0 :                 // only happen in tests.
     136           0 :                 sizeBytes = minSize
     137           0 :         }
     138             : 
     139           1 :         c := &Cache{
     140           1 :                 logger:            logger,
     141           1 :                 bm:                makeBlockMath(blockSize),
     142           1 :                 shardingBlockSize: shardingBlockSize,
     143           1 :         }
     144           1 :         c.shards = make([]shard, numShards)
     145           1 :         blocksPerShard := sizeBytes / int64(numShards) / int64(blockSize)
     146           1 :         for i := range c.shards {
     147           1 :                 if err := c.shards[i].init(c, fs, fsDir, i, blocksPerShard, blockSize, shardingBlockSize); err != nil {
     148           0 :                         return nil, err
     149           0 :                 }
     150             :         }
     151             : 
     152           1 :         c.writeWorkers.Start(c, numShards*writeWorkersPerShard)
     153           1 : 
     154           1 :         c.metrics.getLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
     155           1 :         c.metrics.diskReadLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
     156           1 :         c.metrics.putLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
     157           1 :         c.metrics.diskWriteLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
     158           1 : 
     159           1 :         // Measures a channel write, so lower min.
     160           1 :         c.metrics.queuePutLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: ChannelWriteBuckets})
     161           1 : 
     162           1 :         return c, nil
     163             : }
     164             : 
     165             : // Close closes the cache. Methods such as ReadAt should not be called after Close is
     166             : // called.
     167           1 : func (c *Cache) Close() error {
     168           1 :         c.writeWorkers.Stop()
     169           1 : 
     170           1 :         var retErr error
     171           1 :         for i := range c.shards {
     172           1 :                 if err := c.shards[i].close(); err != nil && retErr == nil {
     173           0 :                         retErr = err
     174           0 :                 }
     175             :         }
     176           1 :         c.shards = nil
     177           1 :         return retErr
     178             : }
     179             : 
     180             : // Metrics return metrics for the cache. Callers should not mutate
     181             : // the returned histograms, which are pointer types.
     182           1 : func (c *Cache) Metrics() Metrics {
     183           1 :         return Metrics{
     184           1 :                 Count:               c.metrics.count.Load(),
     185           1 :                 Size:                c.metrics.count.Load() * int64(c.bm.BlockSize()),
     186           1 :                 TotalReads:          c.metrics.totalReads.Load(),
     187           1 :                 MultiShardReads:     c.metrics.multiShardReads.Load(),
     188           1 :                 MultiBlockReads:     c.metrics.multiBlockReads.Load(),
     189           1 :                 ReadsWithFullHit:    c.metrics.readsWithFullHit.Load(),
     190           1 :                 ReadsWithPartialHit: c.metrics.readsWithPartialHit.Load(),
     191           1 :                 ReadsWithNoHit:      c.metrics.readsWithNoHit.Load(),
     192           1 :                 Evictions:           c.metrics.evictions.Load(),
     193           1 :                 WriteBackFailures:   c.metrics.writeBackFailures.Load(),
     194           1 :                 GetLatency:          c.metrics.getLatency,
     195           1 :                 DiskReadLatency:     c.metrics.diskReadLatency,
     196           1 :                 QueuePutLatency:     c.metrics.queuePutLatency,
     197           1 :                 PutLatency:          c.metrics.putLatency,
     198           1 :                 DiskWriteLatency:    c.metrics.diskWriteLatency,
     199           1 :         }
     200           1 : }
     201             : 
     202             : // ReadFlags contains options for Cache.ReadAt.
     203             : type ReadFlags struct {
     204             :         // ReadOnly instructs ReadAt to not write any new data into the cache; it is
     205             :         // used when the data is unlikely to be used again.
     206             :         ReadOnly bool
     207             : }
     208             : 
     209             : // ReadAt performs a read form an object, attempting to use cached data when
     210             : // possible.
     211             : func (c *Cache) ReadAt(
     212             :         ctx context.Context,
     213             :         fileNum base.DiskFileNum,
     214             :         p []byte,
     215             :         ofs int64,
     216             :         objReader remote.ObjectReader,
     217             :         objSize int64,
     218             :         flags ReadFlags,
     219           1 : ) error {
     220           1 :         c.metrics.totalReads.Add(1)
     221           1 :         if ofs >= objSize {
     222           0 :                 if invariants.Enabled {
     223           0 :                         panic(fmt.Sprintf("invalid ReadAt offset %v %v", ofs, objSize))
     224             :                 }
     225           0 :                 return io.EOF
     226             :         }
     227             :         // TODO(radu): for compaction reads, we may not want to read from the cache at
     228             :         // all.
     229           1 :         {
     230           1 :                 start := time.Now()
     231           1 :                 n, err := c.get(fileNum, p, ofs)
     232           1 :                 c.metrics.getLatency.Observe(float64(time.Since(start)))
     233           1 :                 if err != nil {
     234           0 :                         return err
     235           0 :                 }
     236           1 :                 if n == len(p) {
     237           1 :                         // Everything was in cache!
     238           1 :                         c.metrics.readsWithFullHit.Add(1)
     239           1 :                         return nil
     240           1 :                 }
     241           1 :                 if n == 0 {
     242           1 :                         c.metrics.readsWithNoHit.Add(1)
     243           1 :                 } else {
     244           0 :                         c.metrics.readsWithPartialHit.Add(1)
     245           0 :                 }
     246             : 
     247             :                 // Note this. The below code does not need the original ofs, as with the earlier
     248             :                 // reading from the cache done, the relevant offset is ofs + int64(n). Same with p.
     249           1 :                 ofs += int64(n)
     250           1 :                 p = p[n:]
     251           1 : 
     252           1 :                 if invariants.Enabled {
     253           1 :                         if n != 0 && c.bm.Remainder(ofs) != 0 {
     254           0 :                                 panic(fmt.Sprintf("after non-zero read from cache, ofs is not block-aligned: %v %v", ofs, n))
     255             :                         }
     256             :                 }
     257             :         }
     258             : 
     259           1 :         if flags.ReadOnly {
     260           1 :                 return objReader.ReadAt(ctx, p, ofs)
     261           1 :         }
     262             : 
     263             :         // We must do reads with offset & size that are multiples of the block size. Else
     264             :         // later cache hits may return incorrect zeroed results from the cache.
     265           1 :         firstBlockInd := c.bm.Block(ofs)
     266           1 :         adjustedOfs := c.bm.BlockOffset(firstBlockInd)
     267           1 : 
     268           1 :         // Take the length of what is left to read plus the length of the adjustment of
     269           1 :         // the offset plus the size of a block minus one and divide by the size of a block
     270           1 :         // to get the number of blocks to read from the object.
     271           1 :         sizeOfOffAdjustment := int(ofs - adjustedOfs)
     272           1 :         adjustedLen := int(c.bm.RoundUp(int64(len(p) + sizeOfOffAdjustment)))
     273           1 :         adjustedP := make([]byte, adjustedLen)
     274           1 : 
     275           1 :         // Read the rest from the object. We may need to cap the length to avoid past EOF reads.
     276           1 :         eofCap := int64(adjustedLen)
     277           1 :         if adjustedOfs+eofCap > objSize {
     278           1 :                 eofCap = objSize - adjustedOfs
     279           1 :         }
     280           1 :         if err := objReader.ReadAt(ctx, adjustedP[:eofCap], adjustedOfs); err != nil {
     281           0 :                 return err
     282           0 :         }
     283           1 :         copy(p, adjustedP[sizeOfOffAdjustment:])
     284           1 : 
     285           1 :         start := time.Now()
     286           1 :         c.writeWorkers.QueueWrite(fileNum, adjustedP, adjustedOfs)
     287           1 :         c.metrics.queuePutLatency.Observe(float64(time.Since(start)))
     288           1 : 
     289           1 :         return nil
     290             : }
     291             : 
     292             : // get attempts to read the requested data from the cache, if it is already
     293             : // there.
     294             : //
     295             : // If all data is available, returns n = len(p).
     296             : //
     297             : // If data is partially available, a prefix of the data is read; returns n < len(p)
     298             : // and no error. If no prefix is available, returns n = 0 and no error.
     299           1 : func (c *Cache) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
     300           1 :         // The data extent might cross shard boundaries, hence the loop. In the hot
     301           1 :         // path, max two iterations of this loop will be executed, since reads are sized
     302           1 :         // in units of sstable block size.
     303           1 :         var multiShard bool
     304           1 :         for {
     305           1 :                 shard := c.getShard(fileNum, ofs+int64(n))
     306           1 :                 cappedLen := len(p[n:])
     307           1 :                 if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
     308           0 :                         cappedLen = toBoundary
     309           0 :                 }
     310           1 :                 numRead, err := shard.get(fileNum, p[n:n+cappedLen], ofs+int64(n))
     311           1 :                 if err != nil {
     312           0 :                         return n, err
     313           0 :                 }
     314           1 :                 n += numRead
     315           1 :                 if numRead < cappedLen {
     316           1 :                         // We only read a prefix from this shard.
     317           1 :                         return n, nil
     318           1 :                 }
     319           1 :                 if n == len(p) {
     320           1 :                         // We are done.
     321           1 :                         return n, nil
     322           1 :                 }
     323             :                 // Data extent crosses shard boundary, continue with next shard.
     324           0 :                 if !multiShard {
     325           0 :                         c.metrics.multiShardReads.Add(1)
     326           0 :                         multiShard = true
     327           0 :                 }
     328             :         }
     329             : }
     330             : 
     331             : // set attempts to write the requested data to the cache. Both ofs & len(p) must
     332             : // be multiples of the block size.
     333             : //
     334             : // If all of p is not written to the shard, set returns a non-nil error.
     335           1 : func (c *Cache) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
     336           1 :         if invariants.Enabled {
     337           1 :                 if c.bm.Remainder(ofs) != 0 || c.bm.Remainder(int64(len(p))) != 0 {
     338           0 :                         panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
     339             :                 }
     340             :         }
     341             : 
     342             :         // The data extent might cross shard boundaries, hence the loop. In the hot
     343             :         // path, max two iterations of this loop will be executed, since reads are sized
     344             :         // in units of sstable block size.
     345           1 :         n := 0
     346           1 :         for {
     347           1 :                 shard := c.getShard(fileNum, ofs+int64(n))
     348           1 :                 cappedLen := len(p[n:])
     349           1 :                 if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
     350           0 :                         cappedLen = toBoundary
     351           0 :                 }
     352           1 :                 err := shard.set(fileNum, p[n:n+cappedLen], ofs+int64(n))
     353           1 :                 if err != nil {
     354           0 :                         return err
     355           0 :                 }
     356             :                 // set returns an error if cappedLen bytes aren't written to the shard.
     357           1 :                 n += cappedLen
     358           1 :                 if n == len(p) {
     359           1 :                         // We are done.
     360           1 :                         return nil
     361           1 :                 }
     362             :                 // Data extent crosses shard boundary, continue with next shard.
     363             :         }
     364             : }
     365             : 
     366           1 : func (c *Cache) getShard(fileNum base.DiskFileNum, ofs int64) *shard {
     367           1 :         const prime64 = 1099511628211
     368           1 :         hash := uint64(fileNum)*prime64 + uint64(ofs/c.shardingBlockSize)
     369           1 :         // TODO(josh): Instance change ops are often run in production. Such an operation
     370           1 :         // updates len(c.shards); see openSharedCache. As a result, the behavior of this
     371           1 :         // function changes, and the cache empties out at restart time. We may want a better
     372           1 :         // story here eventually.
     373           1 :         return &c.shards[hash%uint64(len(c.shards))]
     374           1 : }
     375             : 
     376             : type shard struct {
     377             :         cache             *Cache
     378             :         file              vfs.File
     379             :         sizeInBlocks      int64
     380             :         bm                blockMath
     381             :         shardingBlockSize int64
     382             :         mu                struct {
     383             :                 sync.Mutex
     384             :                 // TODO(josh): None of these datastructures are space-efficient.
     385             :                 // Focusing on correctness to start.
     386             :                 where  whereMap
     387             :                 blocks []cacheBlockState
     388             :                 // Head of LRU list (doubly-linked circular).
     389             :                 lruHead cacheBlockIndex
     390             :                 // Head of free list (singly-linked chain).
     391             :                 freeHead cacheBlockIndex
     392             :         }
     393             : }
     394             : 
     395             : type cacheBlockState struct {
     396             :         lock    lockState
     397             :         logical logicalBlockID
     398             : 
     399             :         // next is the next block in the LRU or free list (or invalidBlockIndex if it
     400             :         // is the last block in the free list).
     401             :         next cacheBlockIndex
     402             : 
     403             :         // prev is the previous block in the LRU list. It is not used when the block
     404             :         // is in the free list.
     405             :         prev cacheBlockIndex
     406             : }
     407             : 
     408             : // Maps a logical block in an SST to an index of the cache block with the
     409             : // file contents (to the "cache block index").
     410             : type whereMap map[logicalBlockID]cacheBlockIndex
     411             : 
     412             : type logicalBlockID struct {
     413             :         filenum       base.DiskFileNum
     414             :         cacheBlockIdx cacheBlockIndex
     415             : }
     416             : 
     417             : type lockState int64
     418             : 
     419             : const (
     420             :         unlocked lockState = 0
     421             :         // >0 lockState tracks the number of distinct readers of some cache block / logical block
     422             :         // which is in the secondary cache. It is used to ensure that a cache block is not evicted
     423             :         // and overwritten, while there are active readers.
     424             :         readLockTakenInc = 1
     425             :         // -1 lockState indicates that some cache block is currently being populated with data from
     426             :         // blob storage. It is used to ensure that a cache block is not read or evicted again, while
     427             :         // it is being populated.
     428             :         writeLockTaken = -1
     429             : )
     430             : 
     431             : func (s *shard) init(
     432             :         cache *Cache,
     433             :         fs vfs.FS,
     434             :         fsDir string,
     435             :         shardIdx int,
     436             :         sizeInBlocks int64,
     437             :         blockSize int,
     438             :         shardingBlockSize int64,
     439           1 : ) error {
     440           1 :         *s = shard{
     441           1 :                 cache:        cache,
     442           1 :                 sizeInBlocks: sizeInBlocks,
     443           1 :         }
     444           1 :         if blockSize < 1024 || shardingBlockSize%int64(blockSize) != 0 {
     445           0 :                 return errors.Newf("invalid block size %d (must divide %d)", blockSize, shardingBlockSize)
     446           0 :         }
     447           1 :         s.bm = makeBlockMath(blockSize)
     448           1 :         s.shardingBlockSize = shardingBlockSize
     449           1 :         file, err := fs.OpenReadWrite(fs.PathJoin(fsDir, fmt.Sprintf("SHARED-CACHE-%03d", shardIdx)))
     450           1 :         if err != nil {
     451           0 :                 return err
     452           0 :         }
     453             :         // TODO(radu): truncate file if necessary (especially important if we restart
     454             :         // with more shards).
     455           1 :         if err := file.Preallocate(0, int64(blockSize)*sizeInBlocks); err != nil {
     456           0 :                 return err
     457           0 :         }
     458           1 :         s.file = file
     459           1 : 
     460           1 :         // TODO(josh): Right now, the secondary cache is not persistent. All existing
     461           1 :         // cache contents will be over-written, since all metadata is only stored in
     462           1 :         // memory.
     463           1 :         s.mu.where = make(whereMap)
     464           1 :         s.mu.blocks = make([]cacheBlockState, sizeInBlocks)
     465           1 :         s.mu.lruHead = invalidBlockIndex
     466           1 :         s.mu.freeHead = invalidBlockIndex
     467           1 :         for i := range s.mu.blocks {
     468           1 :                 s.freePush(cacheBlockIndex(i))
     469           1 :         }
     470             : 
     471           1 :         return nil
     472             : }
     473             : 
     474           1 : func (s *shard) close() error {
     475           1 :         defer func() {
     476           1 :                 s.file = nil
     477           1 :         }()
     478           1 :         return s.file.Close()
     479             : }
     480             : 
     481             : // freePush pushes a block to the front of the free list.
     482           1 : func (s *shard) freePush(index cacheBlockIndex) {
     483           1 :         s.mu.blocks[index].next = s.mu.freeHead
     484           1 :         s.mu.freeHead = index
     485           1 : }
     486             : 
     487             : // freePop removes the block from the front of the free list. Must not be called
     488             : // if the list is empty (i.e. freeHead = invalidBlockIndex).
     489           1 : func (s *shard) freePop() cacheBlockIndex {
     490           1 :         index := s.mu.freeHead
     491           1 :         s.mu.freeHead = s.mu.blocks[index].next
     492           1 :         return index
     493           1 : }
     494             : 
     495             : // lruInsertFront inserts a block at the front of the LRU list.
     496           1 : func (s *shard) lruInsertFront(index cacheBlockIndex) {
     497           1 :         b := &s.mu.blocks[index]
     498           1 :         if s.mu.lruHead == invalidBlockIndex {
     499           1 :                 b.next = index
     500           1 :                 b.prev = index
     501           1 :         } else {
     502           1 :                 b.next = s.mu.lruHead
     503           1 :                 h := &s.mu.blocks[s.mu.lruHead]
     504           1 :                 b.prev = h.prev
     505           1 :                 s.mu.blocks[h.prev].next = index
     506           1 :                 h.prev = index
     507           1 :         }
     508           1 :         s.mu.lruHead = index
     509             : }
     510             : 
     511           1 : func (s *shard) lruNext(index cacheBlockIndex) cacheBlockIndex {
     512           1 :         return s.mu.blocks[index].next
     513           1 : }
     514             : 
     515           1 : func (s *shard) lruPrev(index cacheBlockIndex) cacheBlockIndex {
     516           1 :         return s.mu.blocks[index].prev
     517           1 : }
     518             : 
     519             : // lruUnlink removes a block from the LRU list.
     520           1 : func (s *shard) lruUnlink(index cacheBlockIndex) {
     521           1 :         b := &s.mu.blocks[index]
     522           1 :         if b.next == index {
     523           1 :                 s.mu.lruHead = invalidBlockIndex
     524           1 :         } else {
     525           1 :                 s.mu.blocks[b.prev].next = b.next
     526           1 :                 s.mu.blocks[b.next].prev = b.prev
     527           1 :                 if s.mu.lruHead == index {
     528           1 :                         s.mu.lruHead = b.next
     529           1 :                 }
     530             :         }
     531           1 :         b.next, b.prev = invalidBlockIndex, invalidBlockIndex
     532             : }
     533             : 
     534             : // get attempts to read the requested data from the shard. The data must not
     535             : // cross a shard boundary.
     536             : //
     537             : // If all data is available, returns n = len(p).
     538             : //
     539             : // If data is partially available, a prefix of the data is read; returns n < len(p)
     540             : // and no error. If no prefix is available, returns n = 0 and no error.
     541             : //
     542             : // TODO(josh): Today, if there are two cache blocks needed to satisfy a read, and the
     543             : // first block is not in the cache and the second one is, we will read both from
     544             : // blob storage. We should fix this. This is not an unlikely scenario if we are doing
     545             : // a reverse scan, since those iterate over sstable blocks in reverse order and due to
     546             : // cache block aligned reads will have read the suffix of the sstable block that will
     547             : // be needed next.
     548           1 : func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
     549           1 :         if invariants.Enabled {
     550           1 :                 if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
     551           0 :                         panic(fmt.Sprintf("get crosses shard boundary: %v %v", ofs, len(p)))
     552             :                 }
     553           1 :                 s.assertShardStateIsConsistent()
     554             :         }
     555             : 
     556             :         // The data extent might cross cache block boundaries, hence the loop. In the hot
     557             :         // path, max two iterations of this loop will be executed, since reads are sized
     558             :         // in units of sstable block size.
     559           1 :         var multiBlock bool
     560           1 :         for {
     561           1 :                 k := logicalBlockID{
     562           1 :                         filenum:       fileNum,
     563           1 :                         cacheBlockIdx: s.bm.Block(ofs + int64(n)),
     564           1 :                 }
     565           1 :                 s.mu.Lock()
     566           1 :                 cacheBlockIdx, ok := s.mu.where[k]
     567           1 :                 // TODO(josh): Multiple reads within the same few milliseconds (anything that is smaller
     568           1 :                 // than blob storage read latency) that miss on the same logical block ID will not necessarily
     569           1 :                 // be rare. We may want to do only one read, with the later readers blocking on the first read
     570           1 :                 // completing. This could be implemented either here or in the primary block cache. See
     571           1 :                 // https://github.com/cockroachdb/pebble/pull/2586 for additional discussion.
     572           1 :                 if !ok {
     573           1 :                         s.mu.Unlock()
     574           1 :                         return n, nil
     575           1 :                 }
     576           1 :                 if s.mu.blocks[cacheBlockIdx].lock == writeLockTaken {
     577           1 :                         // In practice, if we have two reads of the same SST block in close succession, we
     578           1 :                         // would expect the second to hit in the in-memory block cache. So it's not worth
     579           1 :                         // optimizing this case here.
     580           1 :                         s.mu.Unlock()
     581           1 :                         return n, nil
     582           1 :                 }
     583           1 :                 s.mu.blocks[cacheBlockIdx].lock += readLockTakenInc
     584           1 :                 // Move to front of the LRU list.
     585           1 :                 s.lruUnlink(cacheBlockIdx)
     586           1 :                 s.lruInsertFront(cacheBlockIdx)
     587           1 :                 s.mu.Unlock()
     588           1 : 
     589           1 :                 readAt := s.bm.BlockOffset(cacheBlockIdx)
     590           1 :                 readSize := s.bm.BlockSize()
     591           1 :                 if n == 0 { // if first read
     592           1 :                         rem := s.bm.Remainder(ofs)
     593           1 :                         readAt += rem
     594           1 :                         readSize -= int(rem)
     595           1 :                 }
     596             : 
     597           1 :                 if len(p[n:]) <= readSize {
     598           1 :                         start := time.Now()
     599           1 :                         numRead, err := s.file.ReadAt(p[n:], readAt)
     600           1 :                         s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
     601           1 :                         s.dropReadLock(cacheBlockIdx)
     602           1 :                         return n + numRead, err
     603           1 :                 }
     604           1 :                 start := time.Now()
     605           1 :                 numRead, err := s.file.ReadAt(p[n:n+readSize], readAt)
     606           1 :                 s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
     607           1 :                 s.dropReadLock(cacheBlockIdx)
     608           1 :                 if err != nil {
     609           0 :                         return 0, err
     610           0 :                 }
     611             : 
     612             :                 // Note that numRead == readSize, since we checked for an error above.
     613           1 :                 n += numRead
     614           1 : 
     615           1 :                 if !multiBlock {
     616           1 :                         s.cache.metrics.multiBlockReads.Add(1)
     617           1 :                         multiBlock = true
     618           1 :                 }
     619             :         }
     620             : }
     621             : 
     622             : // set attempts to write the requested data to the shard. The data must not
     623             : // cross a shard boundary, and both ofs & len(p) must be multiples of the
     624             : // block size.
     625             : //
     626             : // If all of p is not written to the shard, set returns a non-nil error.
     627           1 : func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
     628           1 :         if invariants.Enabled {
     629           1 :                 if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
     630           0 :                         panic(fmt.Sprintf("set crosses shard boundary: %v %v", ofs, len(p)))
     631             :                 }
     632           1 :                 if s.bm.Remainder(ofs) != 0 || s.bm.Remainder(int64(len(p))) != 0 {
     633           0 :                         panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
     634             :                 }
     635           1 :                 s.assertShardStateIsConsistent()
     636             :         }
     637             : 
     638             :         // The data extent might cross cache block boundaries, hence the loop. In the hot
     639             :         // path, max two iterations of this loop will be executed, since reads are sized
     640             :         // in units of sstable block size.
     641           1 :         n := 0
     642           1 :         for {
     643           1 :                 if n == len(p) {
     644           1 :                         return nil
     645           1 :                 }
     646           1 :                 if invariants.Enabled {
     647           1 :                         if n > len(p) {
     648           0 :                                 panic(fmt.Sprintf("set with n greater than len(p): %v %v", n, len(p)))
     649             :                         }
     650             :                 }
     651             : 
     652             :                 // If the logical block is already in the cache, we should skip doing a set.
     653           1 :                 k := logicalBlockID{
     654           1 :                         filenum:       fileNum,
     655           1 :                         cacheBlockIdx: s.bm.Block(ofs + int64(n)),
     656           1 :                 }
     657           1 :                 s.mu.Lock()
     658           1 :                 if _, ok := s.mu.where[k]; ok {
     659           1 :                         s.mu.Unlock()
     660           1 :                         n += s.bm.BlockSize()
     661           1 :                         continue
     662             :                 }
     663             : 
     664           1 :                 var cacheBlockIdx cacheBlockIndex
     665           1 :                 if s.mu.freeHead == invalidBlockIndex {
     666           1 :                         if invariants.Enabled && s.mu.lruHead == invalidBlockIndex {
     667           0 :                                 panic("both LRU and free lists empty")
     668             :                         }
     669             : 
     670             :                         // Find the last element in the LRU list which is not locked.
     671           1 :                         for idx := s.lruPrev(s.mu.lruHead); ; idx = s.lruPrev(idx) {
     672           1 :                                 if lock := s.mu.blocks[idx].lock; lock == unlocked {
     673           1 :                                         cacheBlockIdx = idx
     674           1 :                                         break
     675             :                                 }
     676           0 :                                 if idx == s.mu.lruHead {
     677           0 :                                         // No unlocked block to evict.
     678           0 :                                         //
     679           0 :                                         // TODO(josh): We may want to block until a block frees up, instead of returning
     680           0 :                                         // an error here. But I think we can do that later on, e.g. after running some production
     681           0 :                                         // experiments.
     682           0 :                                         s.mu.Unlock()
     683           0 :                                         return errors.New("no block to evict so skipping write to cache")
     684           0 :                                 }
     685             :                         }
     686           1 :                         s.cache.metrics.evictions.Add(1)
     687           1 :                         s.lruUnlink(cacheBlockIdx)
     688           1 :                         delete(s.mu.where, s.mu.blocks[cacheBlockIdx].logical)
     689           1 :                 } else {
     690           1 :                         s.cache.metrics.count.Add(1)
     691           1 :                         cacheBlockIdx = s.freePop()
     692           1 :                 }
     693             : 
     694           1 :                 s.lruInsertFront(cacheBlockIdx)
     695           1 :                 s.mu.where[k] = cacheBlockIdx
     696           1 :                 s.mu.blocks[cacheBlockIdx].logical = k
     697           1 :                 s.mu.blocks[cacheBlockIdx].lock = writeLockTaken
     698           1 :                 s.mu.Unlock()
     699           1 : 
     700           1 :                 writeAt := s.bm.BlockOffset(cacheBlockIdx)
     701           1 : 
     702           1 :                 writeSize := s.bm.BlockSize()
     703           1 :                 if len(p[n:]) <= writeSize {
     704           1 :                         writeSize = len(p[n:])
     705           1 :                 }
     706             : 
     707           1 :                 start := time.Now()
     708           1 :                 _, err := s.file.WriteAt(p[n:n+writeSize], writeAt)
     709           1 :                 s.cache.metrics.diskWriteLatency.Observe(float64(time.Since(start)))
     710           1 :                 if err != nil {
     711           0 :                         // Free the block.
     712           0 :                         s.mu.Lock()
     713           0 :                         defer s.mu.Unlock()
     714           0 : 
     715           0 :                         delete(s.mu.where, k)
     716           0 :                         s.lruUnlink(cacheBlockIdx)
     717           0 :                         s.freePush(cacheBlockIdx)
     718           0 :                         return err
     719           0 :                 }
     720           1 :                 s.dropWriteLock(cacheBlockIdx)
     721           1 :                 n += writeSize
     722             :         }
     723             : }
     724             : 
     725             : // Doesn't inline currently. This might be okay, but something to keep in mind.
     726           1 : func (s *shard) dropReadLock(cacheBlockInd cacheBlockIndex) {
     727           1 :         s.mu.Lock()
     728           1 :         s.mu.blocks[cacheBlockInd].lock -= readLockTakenInc
     729           1 :         if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock < 0 {
     730           0 :                 panic(fmt.Sprintf("unexpected lock state %v in dropReadLock", s.mu.blocks[cacheBlockInd].lock))
     731             :         }
     732           1 :         s.mu.Unlock()
     733             : }
     734             : 
     735             : // Doesn't inline currently. This might be okay, but something to keep in mind.
     736           1 : func (s *shard) dropWriteLock(cacheBlockInd cacheBlockIndex) {
     737           1 :         s.mu.Lock()
     738           1 :         if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock != writeLockTaken {
     739           0 :                 panic(fmt.Sprintf("unexpected lock state %v in dropWriteLock", s.mu.blocks[cacheBlockInd].lock))
     740             :         }
     741           1 :         s.mu.blocks[cacheBlockInd].lock = unlocked
     742           1 :         s.mu.Unlock()
     743             : }
     744             : 
     745           1 : func (s *shard) assertShardStateIsConsistent() {
     746           1 :         s.mu.Lock()
     747           1 :         defer s.mu.Unlock()
     748           1 : 
     749           1 :         lruLen := 0
     750           1 :         if s.mu.lruHead != invalidBlockIndex {
     751           1 :                 for b := s.mu.lruHead; ; {
     752           1 :                         lruLen++
     753           1 :                         if idx, ok := s.mu.where[s.mu.blocks[b].logical]; !ok || idx != b {
     754           0 :                                 panic("block in LRU list with no entry in where map")
     755             :                         }
     756           1 :                         b = s.lruNext(b)
     757           1 :                         if b == s.mu.lruHead {
     758           1 :                                 break
     759             :                         }
     760             :                 }
     761             :         }
     762           1 :         if lruLen != len(s.mu.where) {
     763           0 :                 panic(fmt.Sprintf("lru list len is %d but where map has %d entries", lruLen, len(s.mu.where)))
     764             :         }
     765           1 :         freeLen := 0
     766           1 :         for n := s.mu.freeHead; n != invalidBlockIndex; n = s.mu.blocks[n].next {
     767           1 :                 freeLen++
     768           1 :         }
     769             : 
     770           1 :         if lruLen+freeLen != int(s.sizeInBlocks) {
     771           0 :                 panic(fmt.Sprintf("%d lru blocks and %d free blocks don't add up to %d", lruLen, freeLen, s.sizeInBlocks))
     772             :         }
     773           1 :         for i := range s.mu.blocks {
     774           1 :                 if state := s.mu.blocks[i].lock; state < writeLockTaken {
     775           0 :                         panic(fmt.Sprintf("lock state %v is not allowed", state))
     776             :                 }
     777             :         }
     778             : }
     779             : 
     780             : // cacheBlockIndex is the index of a blockSize-aligned cache block.
     781             : type cacheBlockIndex int64
     782             : 
     783             : // invalidBlockIndex is used for the head of a list when the list is empty.
     784             : const invalidBlockIndex cacheBlockIndex = -1
     785             : 
     786             : // blockMath is a helper type for performing conversions between offsets and
     787             : // block indexes.
     788             : type blockMath struct {
     789             :         blockSizeBits int8
     790             : }
     791             : 
     792           1 : func makeBlockMath(blockSize int) blockMath {
     793           1 :         bm := blockMath{
     794           1 :                 blockSizeBits: int8(bits.Len64(uint64(blockSize)) - 1),
     795           1 :         }
     796           1 :         if blockSize != (1 << bm.blockSizeBits) {
     797           0 :                 panic(fmt.Sprintf("blockSize %d is not a power of 2", blockSize))
     798             :         }
     799           1 :         return bm
     800             : }
     801             : 
     802           1 : func (bm blockMath) mask() int64 {
     803           1 :         return (1 << bm.blockSizeBits) - 1
     804           1 : }
     805             : 
     806             : // BlockSize returns the block size.
     807           1 : func (bm blockMath) BlockSize() int {
     808           1 :         return 1 << bm.blockSizeBits
     809           1 : }
     810             : 
     811             : // Block returns the block index containing the given offset.
     812           1 : func (bm blockMath) Block(offset int64) cacheBlockIndex {
     813           1 :         return cacheBlockIndex(offset >> bm.blockSizeBits)
     814           1 : }
     815             : 
     816             : // Remainder returns the offset relative to the start of the cache block.
     817           1 : func (bm blockMath) Remainder(offset int64) int64 {
     818           1 :         return offset & bm.mask()
     819           1 : }
     820             : 
     821             : // BlockOffset returns the object offset where the given block starts.
     822           1 : func (bm blockMath) BlockOffset(block cacheBlockIndex) int64 {
     823           1 :         return int64(block) << bm.blockSizeBits
     824           1 : }
     825             : 
     826             : // RoundUp rounds up the given value to the closest multiple of block size.
     827           1 : func (bm blockMath) RoundUp(x int64) int64 {
     828           1 :         return (x + bm.mask()) & ^(bm.mask())
     829           1 : }
     830             : 
     831             : type writeWorkers struct {
     832             :         doneCh        chan struct{}
     833             :         doneWaitGroup sync.WaitGroup
     834             : 
     835             :         numWorkers int
     836             :         tasksCh    chan writeTask
     837             : }
     838             : 
     839             : type writeTask struct {
     840             :         fileNum base.DiskFileNum
     841             :         p       []byte
     842             :         offset  int64
     843             : }
     844             : 
     845             : // Start starts the worker goroutines.
     846           1 : func (w *writeWorkers) Start(c *Cache, numWorkers int) {
     847           1 :         doneCh := make(chan struct{})
     848           1 :         tasksCh := make(chan writeTask, numWorkers*writeTasksPerWorker)
     849           1 : 
     850           1 :         w.numWorkers = numWorkers
     851           1 :         w.doneCh = doneCh
     852           1 :         w.tasksCh = tasksCh
     853           1 :         w.doneWaitGroup.Add(numWorkers)
     854           1 :         for i := 0; i < numWorkers; i++ {
     855           1 :                 go func() {
     856           1 :                         defer w.doneWaitGroup.Done()
     857           1 :                         for {
     858           1 :                                 select {
     859           1 :                                 case <-doneCh:
     860           1 :                                         return
     861           1 :                                 case task, ok := <-tasksCh:
     862           1 :                                         if !ok {
     863           0 :                                                 // The tasks channel was closed; this is used in testing code to
     864           0 :                                                 // ensure all writes are completed.
     865           0 :                                                 return
     866           0 :                                         }
     867             :                                         // TODO(radu): set() can perform multiple writes; perhaps each one
     868             :                                         // should be its own task.
     869           1 :                                         start := time.Now()
     870           1 :                                         err := c.set(task.fileNum, task.p, task.offset)
     871           1 :                                         c.metrics.putLatency.Observe(float64(time.Since(start)))
     872           1 :                                         if err != nil {
     873           0 :                                                 c.metrics.writeBackFailures.Add(1)
     874           0 :                                                 // TODO(radu): throttle logs.
     875           0 :                                                 c.logger.Errorf("writing back to cache after miss failed: %v", err)
     876           0 :                                         }
     877             :                                 }
     878             :                         }
     879             :                 }()
     880             :         }
     881             : }
     882             : 
     883             : // Stop waits for any in-progress writes to complete and stops the worker
     884             : // goroutines and waits for any in-pro. Any queued writes not yet started are
     885             : // discarded.
     886           1 : func (w *writeWorkers) Stop() {
     887           1 :         close(w.doneCh)
     888           1 :         w.doneCh = nil
     889           1 :         w.tasksCh = nil
     890           1 :         w.doneWaitGroup.Wait()
     891           1 : }
     892             : 
     893             : // QueueWrite adds a write task to the queue. Can block if the queue is full.
     894           1 : func (w *writeWorkers) QueueWrite(fileNum base.DiskFileNum, p []byte, offset int64) {
     895           1 :         w.tasksCh <- writeTask{
     896           1 :                 fileNum: fileNum,
     897           1 :                 p:       p,
     898           1 :                 offset:  offset,
     899           1 :         }
     900           1 : }

Generated by: LCOV version 1.14