LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider/sharedcache - shared_cache.go (source / functions) Hit Total Coverage
Test: 2023-09-11 08:17Z 1efa535d - tests only.lcov Lines: 397 486 81.7 %
Date: 2023-09-11 08:18:20 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sharedcache
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "io"
      11             :         "math/bits"
      12             :         "sync"
      13             :         "sync/atomic"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/objstorage/remote"
      20             :         "github.com/cockroachdb/pebble/vfs"
      21             :         "github.com/prometheus/client_golang/prometheus"
      22             : )
      23             : 
      24             : // Cache is a persistent cache backed by a local filesystem. It is intended
      25             : // to cache data that is in slower shared storage (e.g. S3), hence the
      26             : // package name 'sharedcache'.
      27             : type Cache struct {
      28             :         shards       []shard
      29             :         writeWorkers writeWorkers
      30             : 
      31             :         bm                blockMath
      32             :         shardingBlockSize int64
      33             : 
      34             :         logger  base.Logger
      35             :         metrics internalMetrics
      36             : }
      37             : 
      38             : // Metrics is a struct containing metrics exported by the secondary cache.
      39             : // TODO(josh): Reconsider the set of metrics exported by the secondary cache
      40             : // before we release the secondary cache to users. We choose to export many metrics
      41             : // right now, so we learn a lot from the benchmarking we are doing over the 23.2
      42             : // cycle.
      43             : type Metrics struct {
      44             :         // The number of sstable bytes stored in the cache.
      45             :         Size int64
      46             :         // The count of cache blocks in the cache (not sstable blocks).
      47             :         Count int64
      48             : 
      49             :         // The number of calls to ReadAt.
      50             :         TotalReads int64
      51             :         // The number of calls to ReadAt that require reading data from 2+ shards.
      52             :         MultiShardReads int64
      53             :         // The number of calls to ReadAt that require reading data from 2+ cache blocks.
      54             :         MultiBlockReads int64
      55             :         // The number of calls to ReadAt where all data returned was read from the cache.
      56             :         ReadsWithFullHit int64
      57             :         // The number of calls to ReadAt where some data returned was read from the cache.
      58             :         ReadsWithPartialHit int64
      59             :         // The number of calls to ReadAt where no data returned was read from the cache.
      60             :         ReadsWithNoHit int64
      61             : 
      62             :         // The number of times a cache block was evicted from the cache.
      63             :         Evictions int64
      64             :         // The number of times writing a cache block to the cache failed.
      65             :         WriteBackFailures int64
      66             : 
      67             :         // The latency of calls to get some data from the cache.
      68             :         GetLatency prometheus.Histogram
      69             :         // The latency of reads of a single cache block from disk.
      70             :         DiskReadLatency prometheus.Histogram
      71             :         // The latency of writing data to write back to the cache to a channel.
      72             :         // Generally should be low, but if the channel is full, could be high.
      73             :         QueuePutLatency prometheus.Histogram
      74             :         // The latency of calls to put some data read from block storage into the cache.
      75             :         PutLatency prometheus.Histogram
      76             :         // The latency of writes of a single cache block to disk.
      77             :         DiskWriteLatency prometheus.Histogram
      78             : }
      79             : 
      80             : // See docs at Metrics.
      81             : type internalMetrics struct {
      82             :         count atomic.Int64
      83             : 
      84             :         totalReads          atomic.Int64
      85             :         multiShardReads     atomic.Int64
      86             :         multiBlockReads     atomic.Int64
      87             :         readsWithFullHit    atomic.Int64
      88             :         readsWithPartialHit atomic.Int64
      89             :         readsWithNoHit      atomic.Int64
      90             : 
      91             :         evictions         atomic.Int64
      92             :         writeBackFailures atomic.Int64
      93             : 
      94             :         getLatency       prometheus.Histogram
      95             :         diskReadLatency  prometheus.Histogram
      96             :         queuePutLatency  prometheus.Histogram
      97             :         putLatency       prometheus.Histogram
      98             :         diskWriteLatency prometheus.Histogram
      99             : }
     100             : 
     101             : const (
     102             :         // writeWorkersPerShard is used to establish the number of worker goroutines
     103             :         // that perform writes to the cache.
     104             :         writeWorkersPerShard = 4
     105             :         // writeTaskPerWorker is used to establish how many tasks can be queued up
     106             :         // until we have to block.
     107             :         writeTasksPerWorker = 4
     108             : )
     109             : 
     110             : // Open opens a cache. If there is no existing cache at fsDir, a new one
     111             : // is created.
     112             : func Open(
     113             :         fs vfs.FS,
     114             :         logger base.Logger,
     115             :         fsDir string,
     116             :         blockSize int,
     117             :         // shardingBlockSize is the size of a shard block. The cache is split into contiguous
     118             :         // shardingBlockSize units. The units are distributed across multiple independent shards
     119             :         // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
     120             :         // policies operate at the level of shard, not whole cache. This is done to reduce lock
     121             :         // contention.
     122             :         shardingBlockSize int64,
     123             :         sizeBytes int64,
     124             :         numShards int,
     125           1 : ) (*Cache, error) {
     126           1 :         min := shardingBlockSize * int64(numShards)
     127           1 :         if sizeBytes < min {
     128           0 :                 return nil, errors.Errorf("cache size %d lower than min %d", sizeBytes, min)
     129           0 :         }
     130             : 
     131           1 :         c := &Cache{
     132           1 :                 logger:            logger,
     133           1 :                 bm:                makeBlockMath(blockSize),
     134           1 :                 shardingBlockSize: shardingBlockSize,
     135           1 :         }
     136           1 :         c.shards = make([]shard, numShards)
     137           1 :         blocksPerShard := sizeBytes / int64(numShards) / int64(blockSize)
     138           1 :         for i := range c.shards {
     139           1 :                 if err := c.shards[i].init(c, fs, fsDir, i, blocksPerShard, blockSize, shardingBlockSize); err != nil {
     140           0 :                         return nil, err
     141           0 :                 }
     142             :         }
     143             : 
     144           1 :         c.writeWorkers.Start(c, numShards*writeWorkersPerShard)
     145           1 : 
     146           1 :         buckets := prometheus.ExponentialBucketsRange(float64(time.Millisecond*1), float64(10*time.Second), 50)
     147           1 :         c.metrics.getLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
     148           1 :         c.metrics.diskReadLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
     149           1 :         c.metrics.putLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
     150           1 :         c.metrics.diskWriteLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
     151           1 : 
     152           1 :         // Measures a channel write, so lower min.
     153           1 :         buckets = prometheus.ExponentialBucketsRange(float64(time.Microsecond*1), float64(10*time.Second), 50)
     154           1 :         c.metrics.queuePutLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
     155           1 : 
     156           1 :         return c, nil
     157             : }
     158             : 
     159             : // Close closes the cache. Methods such as ReadAt should not be called after Close is
     160             : // called.
     161           1 : func (c *Cache) Close() error {
     162           1 :         c.writeWorkers.Stop()
     163           1 : 
     164           1 :         var retErr error
     165           1 :         for i := range c.shards {
     166           1 :                 if err := c.shards[i].close(); err != nil && retErr == nil {
     167           0 :                         retErr = err
     168           0 :                 }
     169             :         }
     170           1 :         c.shards = nil
     171           1 :         return retErr
     172             : }
     173             : 
     174             : // Metrics return metrics for the cache. Callers should not mutate
     175             : // the returned histograms, which are pointer types.
     176           1 : func (c *Cache) Metrics() Metrics {
     177           1 :         return Metrics{
     178           1 :                 Count:               c.metrics.count.Load(),
     179           1 :                 Size:                c.metrics.count.Load() * int64(c.bm.BlockSize()),
     180           1 :                 TotalReads:          c.metrics.totalReads.Load(),
     181           1 :                 MultiShardReads:     c.metrics.multiShardReads.Load(),
     182           1 :                 MultiBlockReads:     c.metrics.multiBlockReads.Load(),
     183           1 :                 ReadsWithFullHit:    c.metrics.readsWithFullHit.Load(),
     184           1 :                 ReadsWithPartialHit: c.metrics.readsWithPartialHit.Load(),
     185           1 :                 ReadsWithNoHit:      c.metrics.readsWithNoHit.Load(),
     186           1 :                 Evictions:           c.metrics.evictions.Load(),
     187           1 :                 WriteBackFailures:   c.metrics.writeBackFailures.Load(),
     188           1 :                 GetLatency:          c.metrics.getLatency,
     189           1 :                 DiskReadLatency:     c.metrics.diskReadLatency,
     190           1 :                 QueuePutLatency:     c.metrics.queuePutLatency,
     191           1 :                 PutLatency:          c.metrics.putLatency,
     192           1 :                 DiskWriteLatency:    c.metrics.diskWriteLatency,
     193           1 :         }
     194           1 : }
     195             : 
     196             : // ReadFlags contains options for Cache.ReadAt.
     197             : type ReadFlags struct {
     198             :         // ReadOnly instructs ReadAt to not write any new data into the cache; it is
     199             :         // used when the data is unlikely to be used again.
     200             :         ReadOnly bool
     201             : }
     202             : 
     203             : // ReadAt performs a read form an object, attempting to use cached data when
     204             : // possible.
     205             : func (c *Cache) ReadAt(
     206             :         ctx context.Context,
     207             :         fileNum base.DiskFileNum,
     208             :         p []byte,
     209             :         ofs int64,
     210             :         objReader remote.ObjectReader,
     211             :         objSize int64,
     212             :         flags ReadFlags,
     213           1 : ) error {
     214           1 :         c.metrics.totalReads.Add(1)
     215           1 :         if ofs >= objSize {
     216           0 :                 if invariants.Enabled {
     217           0 :                         panic(fmt.Sprintf("invalid ReadAt offset %v %v", ofs, objSize))
     218             :                 }
     219           0 :                 return io.EOF
     220             :         }
     221             :         // TODO(radu): for compaction reads, we may not want to read from the cache at
     222             :         // all.
     223           1 :         {
     224           1 :                 start := time.Now()
     225           1 :                 n, err := c.get(fileNum, p, ofs)
     226           1 :                 c.metrics.getLatency.Observe(float64(time.Since(start)))
     227           1 :                 if err != nil {
     228           0 :                         return err
     229           0 :                 }
     230           1 :                 if n == len(p) {
     231           1 :                         // Everything was in cache!
     232           1 :                         c.metrics.readsWithFullHit.Add(1)
     233           1 :                         return nil
     234           1 :                 }
     235           1 :                 if n == 0 {
     236           1 :                         c.metrics.readsWithNoHit.Add(1)
     237           1 :                 } else {
     238           1 :                         c.metrics.readsWithPartialHit.Add(1)
     239           1 :                 }
     240             : 
     241             :                 // Note this. The below code does not need the original ofs, as with the earlier
     242             :                 // reading from the cache done, the relevant offset is ofs + int64(n). Same with p.
     243           1 :                 ofs += int64(n)
     244           1 :                 p = p[n:]
     245           1 : 
     246           1 :                 if invariants.Enabled {
     247           0 :                         if n != 0 && c.bm.Remainder(ofs) != 0 {
     248           0 :                                 panic(fmt.Sprintf("after non-zero read from cache, ofs is not block-aligned: %v %v", ofs, n))
     249             :                         }
     250             :                 }
     251             :         }
     252             : 
     253           1 :         if flags.ReadOnly {
     254           1 :                 return objReader.ReadAt(ctx, p, ofs)
     255           1 :         }
     256             : 
     257             :         // We must do reads with offset & size that are multiples of the block size. Else
     258             :         // later cache hits may return incorrect zeroed results from the cache.
     259           1 :         firstBlockInd := c.bm.Block(ofs)
     260           1 :         adjustedOfs := c.bm.BlockOffset(firstBlockInd)
     261           1 : 
     262           1 :         // Take the length of what is left to read plus the length of the adjustment of
     263           1 :         // the offset plus the size of a block minus one and divide by the size of a block
     264           1 :         // to get the number of blocks to read from the object.
     265           1 :         sizeOfOffAdjustment := int(ofs - adjustedOfs)
     266           1 :         adjustedLen := int(c.bm.RoundUp(int64(len(p) + sizeOfOffAdjustment)))
     267           1 :         adjustedP := make([]byte, adjustedLen)
     268           1 : 
     269           1 :         // Read the rest from the object. We may need to cap the length to avoid past EOF reads.
     270           1 :         eofCap := int64(adjustedLen)
     271           1 :         if adjustedOfs+eofCap > objSize {
     272           1 :                 eofCap = objSize - adjustedOfs
     273           1 :         }
     274           1 :         if err := objReader.ReadAt(ctx, adjustedP[:eofCap], adjustedOfs); err != nil {
     275           0 :                 return err
     276           0 :         }
     277           1 :         copy(p, adjustedP[sizeOfOffAdjustment:])
     278           1 : 
     279           1 :         start := time.Now()
     280           1 :         c.writeWorkers.QueueWrite(fileNum, adjustedP, adjustedOfs)
     281           1 :         c.metrics.queuePutLatency.Observe(float64(time.Since(start)))
     282           1 : 
     283           1 :         return nil
     284             : }
     285             : 
     286             : // get attempts to read the requested data from the cache, if it is already
     287             : // there.
     288             : //
     289             : // If all data is available, returns n = len(p).
     290             : //
     291             : // If data is partially available, a prefix of the data is read; returns n < len(p)
     292             : // and no error. If no prefix is available, returns n = 0 and no error.
     293           1 : func (c *Cache) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
     294           1 :         // The data extent might cross shard boundaries, hence the loop. In the hot
     295           1 :         // path, max two iterations of this loop will be executed, since reads are sized
     296           1 :         // in units of sstable block size.
     297           1 :         var multiShard bool
     298           1 :         for {
     299           1 :                 shard := c.getShard(fileNum, ofs+int64(n))
     300           1 :                 cappedLen := len(p[n:])
     301           1 :                 if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
     302           1 :                         cappedLen = toBoundary
     303           1 :                 }
     304           1 :                 numRead, err := shard.get(fileNum, p[n:n+cappedLen], ofs+int64(n))
     305           1 :                 if err != nil {
     306           0 :                         return n, err
     307           0 :                 }
     308           1 :                 n += numRead
     309           1 :                 if numRead < cappedLen {
     310           1 :                         // We only read a prefix from this shard.
     311           1 :                         return n, nil
     312           1 :                 }
     313           1 :                 if n == len(p) {
     314           1 :                         // We are done.
     315           1 :                         return n, nil
     316           1 :                 }
     317             :                 // Data extent crosses shard boundary, continue with next shard.
     318           1 :                 if !multiShard {
     319           1 :                         c.metrics.multiShardReads.Add(1)
     320           1 :                         multiShard = true
     321           1 :                 }
     322             :         }
     323             : }
     324             : 
     325             : // set attempts to write the requested data to the cache. Both ofs & len(p) must
     326             : // be multiples of the block size.
     327             : //
     328             : // If all of p is not written to the shard, set returns a non-nil error.
     329           1 : func (c *Cache) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
     330           1 :         if invariants.Enabled {
     331           0 :                 if c.bm.Remainder(ofs) != 0 || c.bm.Remainder(int64(len(p))) != 0 {
     332           0 :                         panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
     333             :                 }
     334             :         }
     335             : 
     336             :         // The data extent might cross shard boundaries, hence the loop. In the hot
     337             :         // path, max two iterations of this loop will be executed, since reads are sized
     338             :         // in units of sstable block size.
     339           1 :         n := 0
     340           1 :         for {
     341           1 :                 shard := c.getShard(fileNum, ofs+int64(n))
     342           1 :                 cappedLen := len(p[n:])
     343           1 :                 if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
     344           1 :                         cappedLen = toBoundary
     345           1 :                 }
     346           1 :                 err := shard.set(fileNum, p[n:n+cappedLen], ofs+int64(n))
     347           1 :                 if err != nil {
     348           0 :                         return err
     349           0 :                 }
     350             :                 // set returns an error if cappedLen bytes aren't written to the shard.
     351           1 :                 n += cappedLen
     352           1 :                 if n == len(p) {
     353           1 :                         // We are done.
     354           1 :                         return nil
     355           1 :                 }
     356             :                 // Data extent crosses shard boundary, continue with next shard.
     357             :         }
     358             : }
     359             : 
     360           1 : func (c *Cache) getShard(fileNum base.DiskFileNum, ofs int64) *shard {
     361           1 :         const prime64 = 1099511628211
     362           1 :         hash := uint64(fileNum.FileNum())*prime64 + uint64(ofs/c.shardingBlockSize)
     363           1 :         // TODO(josh): Instance change ops are often run in production. Such an operation
     364           1 :         // updates len(c.shards); see openSharedCache. As a result, the behavior of this
     365           1 :         // function changes, and the cache empties out at restart time. We may want a better
     366           1 :         // story here eventually.
     367           1 :         return &c.shards[hash%uint64(len(c.shards))]
     368           1 : }
     369             : 
     370             : type shard struct {
     371             :         cache             *Cache
     372             :         file              vfs.File
     373             :         sizeInBlocks      int64
     374             :         bm                blockMath
     375             :         shardingBlockSize int64
     376             :         mu                struct {
     377             :                 sync.Mutex
     378             :                 // TODO(josh): None of these datastructures are space-efficient.
     379             :                 // Focusing on correctness to start.
     380             :                 where  whereMap
     381             :                 blocks []cacheBlockState
     382             :                 // Head of LRU list (doubly-linked circular).
     383             :                 lruHead cacheBlockIndex
     384             :                 // Head of free list (singly-linked chain).
     385             :                 freeHead cacheBlockIndex
     386             :         }
     387             : }
     388             : 
     389             : type cacheBlockState struct {
     390             :         lock    lockState
     391             :         logical logicalBlockID
     392             : 
     393             :         // next is the next block in the LRU or free list (or invalidBlockIndex if it
     394             :         // is the last block in the free list).
     395             :         next cacheBlockIndex
     396             : 
     397             :         // prev is the previous block in the LRU list. It is not used when the block
     398             :         // is in the free list.
     399             :         prev cacheBlockIndex
     400             : }
     401             : 
     402             : // Maps a logical block in an SST to an index of the cache block with the
     403             : // file contents (to the "cache block index").
     404             : type whereMap map[logicalBlockID]cacheBlockIndex
     405             : 
     406             : type logicalBlockID struct {
     407             :         filenum       base.DiskFileNum
     408             :         cacheBlockIdx cacheBlockIndex
     409             : }
     410             : 
     411             : type lockState int64
     412             : 
     413             : const (
     414             :         unlocked lockState = 0
     415             :         // >0 lockState tracks the number of distinct readers of some cache block / logical block
     416             :         // which is in the secondary cache. It is used to ensure that a cache block is not evicted
     417             :         // and overwritten, while there are active readers.
     418             :         readLockTakenInc = 1
     419             :         // -1 lockState indicates that some cache block is currently being populated with data from
     420             :         // blob storage. It is used to ensure that a cache block is not read or evicted again, while
     421             :         // it is being populated.
     422             :         writeLockTaken = -1
     423             : )
     424             : 
     425             : func (s *shard) init(
     426             :         cache *Cache,
     427             :         fs vfs.FS,
     428             :         fsDir string,
     429             :         shardIdx int,
     430             :         sizeInBlocks int64,
     431             :         blockSize int,
     432             :         shardingBlockSize int64,
     433           1 : ) error {
     434           1 :         *s = shard{
     435           1 :                 cache:        cache,
     436           1 :                 sizeInBlocks: sizeInBlocks,
     437           1 :         }
     438           1 :         if blockSize < 1024 || shardingBlockSize%int64(blockSize) != 0 {
     439           0 :                 return errors.Newf("invalid block size %d (must divide %d)", blockSize, shardingBlockSize)
     440           0 :         }
     441           1 :         s.bm = makeBlockMath(blockSize)
     442           1 :         s.shardingBlockSize = shardingBlockSize
     443           1 :         file, err := fs.OpenReadWrite(fs.PathJoin(fsDir, fmt.Sprintf("SHARED-CACHE-%03d", shardIdx)))
     444           1 :         if err != nil {
     445           0 :                 return err
     446           0 :         }
     447             :         // TODO(radu): truncate file if necessary (especially important if we restart
     448             :         // with more shards).
     449           1 :         if err := file.Preallocate(0, int64(blockSize)*sizeInBlocks); err != nil {
     450           0 :                 return err
     451           0 :         }
     452           1 :         s.file = file
     453           1 : 
     454           1 :         // TODO(josh): Right now, the secondary cache is not persistent. All existing
     455           1 :         // cache contents will be over-written, since all metadata is only stored in
     456           1 :         // memory.
     457           1 :         s.mu.where = make(whereMap)
     458           1 :         s.mu.blocks = make([]cacheBlockState, sizeInBlocks)
     459           1 :         s.mu.lruHead = invalidBlockIndex
     460           1 :         s.mu.freeHead = invalidBlockIndex
     461           1 :         for i := range s.mu.blocks {
     462           1 :                 s.freePush(cacheBlockIndex(i))
     463           1 :         }
     464             : 
     465           1 :         return nil
     466             : }
     467             : 
     468           1 : func (s *shard) close() error {
     469           1 :         defer func() {
     470           1 :                 s.file = nil
     471           1 :         }()
     472           1 :         return s.file.Close()
     473             : }
     474             : 
     475             : // freePush pushes a block to the front of the free list.
     476           1 : func (s *shard) freePush(index cacheBlockIndex) {
     477           1 :         s.mu.blocks[index].next = s.mu.freeHead
     478           1 :         s.mu.freeHead = index
     479           1 : }
     480             : 
     481             : // freePop removes the block from the front of the free list. Must not be called
     482             : // if the list is empty (i.e. freeHead = invalidBlockIndex).
     483           1 : func (s *shard) freePop() cacheBlockIndex {
     484           1 :         index := s.mu.freeHead
     485           1 :         s.mu.freeHead = s.mu.blocks[index].next
     486           1 :         return index
     487           1 : }
     488             : 
     489             : // lruInsertFront inserts a block at the front of the LRU list.
     490           1 : func (s *shard) lruInsertFront(index cacheBlockIndex) {
     491           1 :         b := &s.mu.blocks[index]
     492           1 :         if s.mu.lruHead == invalidBlockIndex {
     493           1 :                 b.next = index
     494           1 :                 b.prev = index
     495           1 :         } else {
     496           1 :                 b.next = s.mu.lruHead
     497           1 :                 h := &s.mu.blocks[s.mu.lruHead]
     498           1 :                 b.prev = h.prev
     499           1 :                 s.mu.blocks[h.prev].next = index
     500           1 :                 h.prev = index
     501           1 :         }
     502           1 :         s.mu.lruHead = index
     503             : }
     504             : 
     505           1 : func (s *shard) lruNext(index cacheBlockIndex) cacheBlockIndex {
     506           1 :         return s.mu.blocks[index].next
     507           1 : }
     508             : 
     509           1 : func (s *shard) lruPrev(index cacheBlockIndex) cacheBlockIndex {
     510           1 :         return s.mu.blocks[index].prev
     511           1 : }
     512             : 
     513             : // lruUnlink removes a block from the LRU list.
     514           1 : func (s *shard) lruUnlink(index cacheBlockIndex) {
     515           1 :         b := &s.mu.blocks[index]
     516           1 :         if b.next == index {
     517           1 :                 s.mu.lruHead = invalidBlockIndex
     518           1 :         } else {
     519           1 :                 s.mu.blocks[b.prev].next = b.next
     520           1 :                 s.mu.blocks[b.next].prev = b.prev
     521           1 :                 if s.mu.lruHead == index {
     522           1 :                         s.mu.lruHead = b.next
     523           1 :                 }
     524             :         }
     525           1 :         b.next, b.prev = invalidBlockIndex, invalidBlockIndex
     526             : }
     527             : 
     528             : // get attempts to read the requested data from the shard. The data must not
     529             : // cross a shard boundary.
     530             : //
     531             : // If all data is available, returns n = len(p).
     532             : //
     533             : // If data is partially available, a prefix of the data is read; returns n < len(p)
     534             : // and no error. If no prefix is available, returns n = 0 and no error.
     535             : //
     536             : // TODO(josh): Today, if there are two cache blocks needed to satisfy a read, and the
     537             : // first block is not in the cache and the second one is, we will read both from
     538             : // blob storage. We should fix this. This is not an unlikely scenario if we are doing
     539             : // a reverse scan, since those iterate over sstable blocks in reverse order and due to
     540             : // cache block aligned reads will have read the suffix of the sstable block that will
     541             : // be needed next.
     542           1 : func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
     543           1 :         if invariants.Enabled {
     544           0 :                 if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
     545           0 :                         panic(fmt.Sprintf("get crosses shard boundary: %v %v", ofs, len(p)))
     546             :                 }
     547           0 :                 s.assertShardStateIsConsistent()
     548             :         }
     549             : 
     550             :         // The data extent might cross cache block boundaries, hence the loop. In the hot
     551             :         // path, max two iterations of this loop will be executed, since reads are sized
     552             :         // in units of sstable block size.
     553           1 :         var multiBlock bool
     554           1 :         for {
     555           1 :                 k := logicalBlockID{
     556           1 :                         filenum:       fileNum,
     557           1 :                         cacheBlockIdx: s.bm.Block(ofs + int64(n)),
     558           1 :                 }
     559           1 :                 s.mu.Lock()
     560           1 :                 cacheBlockIdx, ok := s.mu.where[k]
     561           1 :                 // TODO(josh): Multiple reads within the same few milliseconds (anything that is smaller
     562           1 :                 // than blob storage read latency) that miss on the same logical block ID will not necessarily
     563           1 :                 // be rare. We may want to do only one read, with the later readers blocking on the first read
     564           1 :                 // completing. This could be implemented either here or in the primary block cache. See
     565           1 :                 // https://github.com/cockroachdb/pebble/pull/2586 for additional discussion.
     566           1 :                 if !ok {
     567           1 :                         s.mu.Unlock()
     568           1 :                         return n, nil
     569           1 :                 }
     570           1 :                 if s.mu.blocks[cacheBlockIdx].lock == writeLockTaken {
     571           1 :                         // In practice, if we have two reads of the same SST block in close succession, we
     572           1 :                         // would expect the second to hit in the in-memory block cache. So it's not worth
     573           1 :                         // optimizing this case here.
     574           1 :                         s.mu.Unlock()
     575           1 :                         return n, nil
     576           1 :                 }
     577           1 :                 s.mu.blocks[cacheBlockIdx].lock += readLockTakenInc
     578           1 :                 // Move to front of the LRU list.
     579           1 :                 s.lruUnlink(cacheBlockIdx)
     580           1 :                 s.lruInsertFront(cacheBlockIdx)
     581           1 :                 s.mu.Unlock()
     582           1 : 
     583           1 :                 readAt := s.bm.BlockOffset(cacheBlockIdx)
     584           1 :                 readSize := s.bm.BlockSize()
     585           1 :                 if n == 0 { // if first read
     586           1 :                         rem := s.bm.Remainder(ofs)
     587           1 :                         readAt += rem
     588           1 :                         readSize -= int(rem)
     589           1 :                 }
     590             : 
     591           1 :                 if len(p[n:]) <= readSize {
     592           1 :                         start := time.Now()
     593           1 :                         numRead, err := s.file.ReadAt(p[n:], readAt)
     594           1 :                         s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
     595           1 :                         s.dropReadLock(cacheBlockIdx)
     596           1 :                         return n + numRead, err
     597           1 :                 }
     598           1 :                 start := time.Now()
     599           1 :                 numRead, err := s.file.ReadAt(p[n:n+readSize], readAt)
     600           1 :                 s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
     601           1 :                 s.dropReadLock(cacheBlockIdx)
     602           1 :                 if err != nil {
     603           0 :                         return 0, err
     604           0 :                 }
     605             : 
     606             :                 // Note that numRead == readSize, since we checked for an error above.
     607           1 :                 n += numRead
     608           1 : 
     609           1 :                 if !multiBlock {
     610           1 :                         s.cache.metrics.multiBlockReads.Add(1)
     611           1 :                         multiBlock = true
     612           1 :                 }
     613             :         }
     614             : }
     615             : 
     616             : // set attempts to write the requested data to the shard. The data must not
     617             : // cross a shard boundary, and both ofs & len(p) must be multiples of the
     618             : // block size.
     619             : //
     620             : // If all of p is not written to the shard, set returns a non-nil error.
     621           1 : func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
     622           1 :         if invariants.Enabled {
     623           0 :                 if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
     624           0 :                         panic(fmt.Sprintf("set crosses shard boundary: %v %v", ofs, len(p)))
     625             :                 }
     626           0 :                 if s.bm.Remainder(ofs) != 0 || s.bm.Remainder(int64(len(p))) != 0 {
     627           0 :                         panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
     628             :                 }
     629           0 :                 s.assertShardStateIsConsistent()
     630             :         }
     631             : 
     632             :         // The data extent might cross cache block boundaries, hence the loop. In the hot
     633             :         // path, max two iterations of this loop will be executed, since reads are sized
     634             :         // in units of sstable block size.
     635           1 :         n := 0
     636           1 :         for {
     637           1 :                 if n == len(p) {
     638           1 :                         return nil
     639           1 :                 }
     640           1 :                 if invariants.Enabled {
     641           0 :                         if n > len(p) {
     642           0 :                                 panic(fmt.Sprintf("set with n greater than len(p): %v %v", n, len(p)))
     643             :                         }
     644             :                 }
     645             : 
     646             :                 // If the logical block is already in the cache, we should skip doing a set.
     647           1 :                 k := logicalBlockID{
     648           1 :                         filenum:       fileNum,
     649           1 :                         cacheBlockIdx: s.bm.Block(ofs + int64(n)),
     650           1 :                 }
     651           1 :                 s.mu.Lock()
     652           1 :                 if _, ok := s.mu.where[k]; ok {
     653           1 :                         s.mu.Unlock()
     654           1 :                         n += s.bm.BlockSize()
     655           1 :                         continue
     656             :                 }
     657             : 
     658           1 :                 var cacheBlockIdx cacheBlockIndex
     659           1 :                 if s.mu.freeHead == invalidBlockIndex {
     660           1 :                         if invariants.Enabled && s.mu.lruHead == invalidBlockIndex {
     661           0 :                                 panic("both LRU and free lists empty")
     662             :                         }
     663             : 
     664             :                         // Find the last element in the LRU list which is not locked.
     665           1 :                         for idx := s.lruPrev(s.mu.lruHead); ; idx = s.lruPrev(idx) {
     666           1 :                                 if lock := s.mu.blocks[idx].lock; lock == unlocked {
     667           1 :                                         cacheBlockIdx = idx
     668           1 :                                         break
     669             :                                 }
     670           0 :                                 if idx == s.mu.lruHead {
     671           0 :                                         // No unlocked block to evict.
     672           0 :                                         //
     673           0 :                                         // TODO(josh): We may want to block until a block frees up, instead of returning
     674           0 :                                         // an error here. But I think we can do that later on, e.g. after running some production
     675           0 :                                         // experiments.
     676           0 :                                         s.mu.Unlock()
     677           0 :                                         return errors.New("no block to evict so skipping write to cache")
     678           0 :                                 }
     679             :                         }
     680           1 :                         s.cache.metrics.evictions.Add(1)
     681           1 :                         s.lruUnlink(cacheBlockIdx)
     682           1 :                         delete(s.mu.where, s.mu.blocks[cacheBlockIdx].logical)
     683           1 :                 } else {
     684           1 :                         s.cache.metrics.count.Add(1)
     685           1 :                         cacheBlockIdx = s.freePop()
     686           1 :                 }
     687             : 
     688           1 :                 s.lruInsertFront(cacheBlockIdx)
     689           1 :                 s.mu.where[k] = cacheBlockIdx
     690           1 :                 s.mu.blocks[cacheBlockIdx].logical = k
     691           1 :                 s.mu.blocks[cacheBlockIdx].lock = writeLockTaken
     692           1 :                 s.mu.Unlock()
     693           1 : 
     694           1 :                 writeAt := s.bm.BlockOffset(cacheBlockIdx)
     695           1 : 
     696           1 :                 writeSize := s.bm.BlockSize()
     697           1 :                 if len(p[n:]) <= writeSize {
     698           1 :                         writeSize = len(p[n:])
     699           1 :                 }
     700             : 
     701           1 :                 start := time.Now()
     702           1 :                 _, err := s.file.WriteAt(p[n:n+writeSize], writeAt)
     703           1 :                 s.cache.metrics.diskWriteLatency.Observe(float64(time.Since(start)))
     704           1 :                 if err != nil {
     705           0 :                         // Free the block.
     706           0 :                         s.mu.Lock()
     707           0 :                         defer s.mu.Unlock()
     708           0 : 
     709           0 :                         delete(s.mu.where, k)
     710           0 :                         s.lruUnlink(cacheBlockIdx)
     711           0 :                         s.freePush(cacheBlockIdx)
     712           0 :                         return err
     713           0 :                 }
     714           1 :                 s.dropWriteLock(cacheBlockIdx)
     715           1 :                 n += writeSize
     716             :         }
     717             : }
     718             : 
     719             : // Doesn't inline currently. This might be okay, but something to keep in mind.
     720           1 : func (s *shard) dropReadLock(cacheBlockInd cacheBlockIndex) {
     721           1 :         s.mu.Lock()
     722           1 :         s.mu.blocks[cacheBlockInd].lock -= readLockTakenInc
     723           1 :         if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock < 0 {
     724           0 :                 panic(fmt.Sprintf("unexpected lock state %v in dropReadLock", s.mu.blocks[cacheBlockInd].lock))
     725             :         }
     726           1 :         s.mu.Unlock()
     727             : }
     728             : 
     729             : // Doesn't inline currently. This might be okay, but something to keep in mind.
     730           1 : func (s *shard) dropWriteLock(cacheBlockInd cacheBlockIndex) {
     731           1 :         s.mu.Lock()
     732           1 :         if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock != writeLockTaken {
     733           0 :                 panic(fmt.Sprintf("unexpected lock state %v in dropWriteLock", s.mu.blocks[cacheBlockInd].lock))
     734             :         }
     735           1 :         s.mu.blocks[cacheBlockInd].lock = unlocked
     736           1 :         s.mu.Unlock()
     737             : }
     738             : 
     739           0 : func (s *shard) assertShardStateIsConsistent() {
     740           0 :         s.mu.Lock()
     741           0 :         defer s.mu.Unlock()
     742           0 : 
     743           0 :         lruLen := 0
     744           0 :         if s.mu.lruHead != invalidBlockIndex {
     745           0 :                 for b := s.mu.lruHead; ; {
     746           0 :                         lruLen++
     747           0 :                         if idx, ok := s.mu.where[s.mu.blocks[b].logical]; !ok || idx != b {
     748           0 :                                 panic("block in LRU list with no entry in where map")
     749             :                         }
     750           0 :                         b = s.lruNext(b)
     751           0 :                         if b == s.mu.lruHead {
     752           0 :                                 break
     753             :                         }
     754             :                 }
     755             :         }
     756           0 :         if lruLen != len(s.mu.where) {
     757           0 :                 panic(fmt.Sprintf("lru list len is %d but where map has %d entries", lruLen, len(s.mu.where)))
     758             :         }
     759           0 :         freeLen := 0
     760           0 :         for n := s.mu.freeHead; n != invalidBlockIndex; n = s.mu.blocks[n].next {
     761           0 :                 freeLen++
     762           0 :         }
     763             : 
     764           0 :         if lruLen+freeLen != int(s.sizeInBlocks) {
     765           0 :                 panic(fmt.Sprintf("%d lru blocks and %d free blocks don't add up to %d", lruLen, freeLen, s.sizeInBlocks))
     766             :         }
     767           0 :         for i := range s.mu.blocks {
     768           0 :                 if state := s.mu.blocks[i].lock; state < writeLockTaken {
     769           0 :                         panic(fmt.Sprintf("lock state %v is not allowed", state))
     770             :                 }
     771             :         }
     772             : }
     773             : 
     774             : // cacheBlockIndex is the index of a blockSize-aligned cache block.
     775             : type cacheBlockIndex int64
     776             : 
     777             : // invalidBlockIndex is used for the head of a list when the list is empty.
     778             : const invalidBlockIndex cacheBlockIndex = -1
     779             : 
     780             : // blockMath is a helper type for performing conversions between offsets and
     781             : // block indexes.
     782             : type blockMath struct {
     783             :         blockSizeBits int8
     784             : }
     785             : 
     786           1 : func makeBlockMath(blockSize int) blockMath {
     787           1 :         bm := blockMath{
     788           1 :                 blockSizeBits: int8(bits.Len64(uint64(blockSize)) - 1),
     789           1 :         }
     790           1 :         if blockSize != (1 << bm.blockSizeBits) {
     791           0 :                 panic(fmt.Sprintf("blockSize %d is not a power of 2", blockSize))
     792             :         }
     793           1 :         return bm
     794             : }
     795             : 
     796           1 : func (bm blockMath) mask() int64 {
     797           1 :         return (1 << bm.blockSizeBits) - 1
     798           1 : }
     799             : 
     800             : // BlockSize returns the block size.
     801           1 : func (bm blockMath) BlockSize() int {
     802           1 :         return 1 << bm.blockSizeBits
     803           1 : }
     804             : 
     805             : // Block returns the block index containing the given offset.
     806           1 : func (bm blockMath) Block(offset int64) cacheBlockIndex {
     807           1 :         return cacheBlockIndex(offset >> bm.blockSizeBits)
     808           1 : }
     809             : 
     810             : // Remainder returns the offset relative to the start of the cache block.
     811           1 : func (bm blockMath) Remainder(offset int64) int64 {
     812           1 :         return offset & bm.mask()
     813           1 : }
     814             : 
     815             : // BlockOffset returns the object offset where the given block starts.
     816           1 : func (bm blockMath) BlockOffset(block cacheBlockIndex) int64 {
     817           1 :         return int64(block) << bm.blockSizeBits
     818           1 : }
     819             : 
     820             : // RoundUp rounds up the given value to the closest multiple of block size.
     821           1 : func (bm blockMath) RoundUp(x int64) int64 {
     822           1 :         return (x + bm.mask()) & ^(bm.mask())
     823           1 : }
     824             : 
     825             : type writeWorkers struct {
     826             :         doneCh        chan struct{}
     827             :         doneWaitGroup sync.WaitGroup
     828             : 
     829             :         numWorkers int
     830             :         tasksCh    chan writeTask
     831             : }
     832             : 
     833             : type writeTask struct {
     834             :         fileNum base.DiskFileNum
     835             :         p       []byte
     836             :         offset  int64
     837             : }
     838             : 
     839             : // Start starts the worker goroutines.
     840           1 : func (w *writeWorkers) Start(c *Cache, numWorkers int) {
     841           1 :         doneCh := make(chan struct{})
     842           1 :         tasksCh := make(chan writeTask, numWorkers*writeTasksPerWorker)
     843           1 : 
     844           1 :         w.numWorkers = numWorkers
     845           1 :         w.doneCh = doneCh
     846           1 :         w.tasksCh = tasksCh
     847           1 :         w.doneWaitGroup.Add(numWorkers)
     848           1 :         for i := 0; i < numWorkers; i++ {
     849           1 :                 go func() {
     850           1 :                         defer w.doneWaitGroup.Done()
     851           1 :                         for {
     852           1 :                                 select {
     853           1 :                                 case <-doneCh:
     854           1 :                                         return
     855           1 :                                 case task, ok := <-tasksCh:
     856           1 :                                         if !ok {
     857           1 :                                                 // The tasks channel was closed; this is used in testing code to
     858           1 :                                                 // ensure all writes are completed.
     859           1 :                                                 return
     860           1 :                                         }
     861             :                                         // TODO(radu): set() can perform multiple writes; perhaps each one
     862             :                                         // should be its own task.
     863           1 :                                         start := time.Now()
     864           1 :                                         err := c.set(task.fileNum, task.p, task.offset)
     865           1 :                                         c.metrics.putLatency.Observe(float64(time.Since(start)))
     866           1 :                                         if err != nil {
     867           0 :                                                 c.metrics.writeBackFailures.Add(1)
     868           0 :                                                 // TODO(radu): throttle logs.
     869           0 :                                                 c.logger.Infof("writing back to cache after miss failed: %v", err)
     870           0 :                                         }
     871             :                                 }
     872             :                         }
     873             :                 }()
     874             :         }
     875             : }
     876             : 
     877             : // Stop waits for any in-progress writes to complete and stops the worker
     878             : // goroutines and waits for any in-pro. Any queued writes not yet started are
     879             : // discarded.
     880           1 : func (w *writeWorkers) Stop() {
     881           1 :         close(w.doneCh)
     882           1 :         w.doneCh = nil
     883           1 :         w.tasksCh = nil
     884           1 :         w.doneWaitGroup.Wait()
     885           1 : }
     886             : 
     887             : // QueueWrite adds a write task to the queue. Can block if the queue is full.
     888           1 : func (w *writeWorkers) QueueWrite(fileNum base.DiskFileNum, p []byte, offset int64) {
     889           1 :         w.tasksCh <- writeTask{
     890           1 :                 fileNum: fileNum,
     891           1 :                 p:       p,
     892           1 :                 offset:  offset,
     893           1 :         }
     894           1 : }

Generated by: LCOV version 1.14