LCOV - code coverage report
Current view: top level - pebble/internal/cache - read_shard.go (source / functions) Hit Total Coverage
Test: 2024-12-17 08:17Z e12d2c0a - tests only.lcov Lines: 159 186 85.5 %
Date: 2024-12-17 08:17:52 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package cache
       6             : 
       7             : import (
       8             :         "context"
       9             :         "sync"
      10             :         "time"
      11             : 
      12             :         "github.com/cockroachdb/pebble/internal/base"
      13             :         "github.com/cockroachdb/swiss"
      14             : )
      15             : 
      16             : // readShard coordinates the read of a block that will be put in the cache. It
      17             : // ensures only one goroutine is reading a block, and other callers block
      18             : // until that goroutine is done (with success or failure). In the case of
      19             : // success, the other goroutines will use the value that was read, even if it
      20             : // is too large to be placed in the cache, or got evicted from the cache
      21             : // before they got scheduled. In the case of a failure (read error or context
      22             : // cancellation), one of the waiters will be given a turn to do the read.
      23             : //
      24             : // This turn-taking ensures that a large number of concurrent attempts to read
      25             : // the same block that is not in the cache does not result in the same number
      26             : // of reads from the filesystem (or remote storage). We have seen large spikes
      27             : // in memory usage (and CPU usage for memory allocation/deallocation) without
      28             : // this turn-taking.
      29             : //
      30             : // It introduces a small risk related to context cancellation -- if many
      31             : // readers assigned a turn exceed their deadline while doing the read and
      32             : // report an error, a reader with a longer deadline can unnecessarily wait. We
      33             : // accept this risk for now since the primary production use in CockroachDB is
      34             : // filesystem reads, where context cancellation is not respected. We do
      35             : // introduce an error duration metric emitted in traces that can be used to
      36             : // quantify such wasteful waiting. Note that this same risk extends to waiting
      37             : // on the Options.LoadBlockSema, so the error duration metric includes the
      38             : // case of an error when waiting on the semaphore (as a side effect of that
      39             : // waiting happening in the caller, sstable.Reader).
      40             : //
      41             : // Design choices and motivation:
      42             : //
      43             : //   - readShard is tightly integrated with a cache shard: At its core,
      44             : //     readShard is a map with synchronization. For the same reason the cache is
      45             : //     sharded (for higher concurrency by sharding the mutex), it is beneficial
      46             : //     to shard synchronization on readShard. By making readShard a member of
      47             : //     shard, this sharding is trivially accomplished. Additionally, the code
      48             : //     feels cleaner when there isn't a race between a cache miss, followed by
      49             : //     creating a readEntry that is no longer needed because someone else has
      50             : //     done the read since the miss and inserted into the cache. By making the
      51             : //     readShard use shard.mu, such a race is avoided. A side benefit is that
      52             : //     the cache interaction can be hidden behind readEntry.SetReadValue. One
      53             : //     disadvantage of this tightly integrated design is that it does not
      54             : //     encompass readers that will put the read value into a block.BufferPool --
      55             : //     we don't worry about those since block.BufferPool is only used for
      56             : //     compactions and there is at most one compaction reader of a block. There
      57             : //     is the possibility that the compaction reader and a user-facing iterator
      58             : //     reader will do duplicate reads, but we accept that deficiency.
      59             : //
      60             : //   - readMap is separate from shard.blocks map: One could have a design which
      61             : //     extends the cache entry and unifies the two maps. However, we never want
      62             : //     to evict a readEntry while there are readers waiting for the block read
      63             : //     (including the case where the corresponding file is being removed from
      64             : //     shard.files). Also, the number of stable cache entries is huge and
      65             : //     therefore is manually allocated, while the number of readEntries is small
      66             : //     (so manual allocation isn't necessary). For these reasons we maintain a
      67             : //     separate map. This separation also results in more modular code, instead
      68             : //     of piling more stuff into shard.
      69             : type readShard struct {
      70             :         // shard is only used for locking, and calling shard.Set.
      71             :         shard *shard
      72             :         // Protected by shard.mu.
      73             :         //
      74             :         // shard.mu is never held when acquiring readEntry.mu. shard.mu is a shared
      75             :         // resource and must be released quickly.
      76             :         shardMu struct {
      77             :                 readMap swiss.Map[key, *readEntry]
      78             :         }
      79             : }
      80             : 
      81           1 : func (rs *readShard) Init(shard *shard) *readShard {
      82           1 :         *rs = readShard{
      83           1 :                 shard: shard,
      84           1 :         }
      85           1 :         // Choice of 16 is arbitrary.
      86           1 :         rs.shardMu.readMap.Init(16)
      87           1 :         return rs
      88           1 : }
      89             : 
      90             : // getReadEntryLocked gets a *readEntry for (id, fileNum, offset). shard.mu is
      91             : // already write locked.
      92           1 : func (rs *readShard) getReadEntryLocked(id ID, fileNum base.DiskFileNum, offset uint64) *readEntry {
      93           1 :         k := key{fileKey{id, fileNum}, offset}
      94           1 :         e, ok := rs.shardMu.readMap.Get(k)
      95           1 :         if !ok {
      96           1 :                 e = newReadEntry(rs, id, fileNum, offset)
      97           1 :                 rs.shardMu.readMap.Put(k, e)
      98           1 :         } else {
      99           1 :                 e.refCount.acquireAllowZero()
     100           1 :         }
     101           1 :         return e
     102             : }
     103             : 
     104           0 : func (rs *readShard) lenForTesting() int {
     105           0 :         rs.shard.mu.Lock()
     106           0 :         defer rs.shard.mu.Unlock()
     107           0 :         return rs.shardMu.readMap.Len()
     108           0 : }
     109             : 
     110             : // readEntry is used to coordinate between concurrent attempted readers of the
     111             : // same block.
     112             : type readEntry struct {
     113             :         readShard *readShard
     114             :         id        ID
     115             :         fileNum   base.DiskFileNum
     116             :         offset    uint64
     117             :         mu        struct {
     118             :                 sync.RWMutex
     119             :                 // v, when non-nil, has a ref from readEntry, which is unreffed when
     120             :                 // readEntry is deleted from the readMap.
     121             :                 v *Value
     122             :                 // isReading and ch together capture the state of whether someone has been
     123             :                 // granted a turn to read, and of readers waiting for that read to finish.
     124             :                 // ch is lazily allocated since most readEntries will not see concurrent
     125             :                 // readers. This lazy allocation results in one transition of ch from nil
     126             :                 // to non-nil, so waiters can read this non-nil ch and block on reading
     127             :                 // from it without holding mu.
     128             :                 //
     129             :                 // ch is written to, to signal one waiter to start doing the read. ch is
     130             :                 // closed when the value is successfully read and has been stored in v, so
     131             :                 // that all waiters wake up and read v. ch is a buffered channel with a
     132             :                 // capacity of 1.
     133             :                 //
     134             :                 // State transitions when trying to wait for turn:
     135             :                 // Case !isReading:
     136             :                 //   set isReading=true; Drain the ch if non-nil and non-empty; proceed
     137             :                 //   with turn to do the read.
     138             :                 // Case isReading:
     139             :                 //   allocate ch if nil; wait on ch
     140             :                 // Finished reading successfully:
     141             :                 //   set isReading=false; if ch is non-nil, close ch.
     142             :                 // Finished reading with failure:
     143             :                 //   set isReading=false; if ch is non-nil, write to ch.
     144             :                 //
     145             :                 // INVARIANT:
     146             :                 // isReading => ch is nil or ch is empty.
     147             :                 isReading bool
     148             :                 ch        chan struct{}
     149             :                 // Total duration of reads and semaphore waiting that resulted in error.
     150             :                 errorDuration time.Duration
     151             :                 readStart     time.Time
     152             :         }
     153             :         // Count of ReadHandles that refer to this readEntry. Increments always hold
     154             :         // shard.mu. So if this is found to be 0 while holding shard.mu, it is safe
     155             :         // to delete readEntry from readShard.shardMu.readMap.
     156             :         refCount refcnt
     157             : }
     158             : 
     159             : var readEntryPool = sync.Pool{
     160           1 :         New: func() interface{} {
     161           1 :                 return &readEntry{}
     162           1 :         },
     163             : }
     164             : 
     165           1 : func newReadEntry(rs *readShard, id ID, fileNum base.DiskFileNum, offset uint64) *readEntry {
     166           1 :         e := readEntryPool.Get().(*readEntry)
     167           1 :         *e = readEntry{
     168           1 :                 readShard: rs,
     169           1 :                 id:        id,
     170           1 :                 fileNum:   fileNum,
     171           1 :                 offset:    offset,
     172           1 :         }
     173           1 :         e.refCount.init(1)
     174           1 :         return e
     175           1 : }
     176             : 
     177             : // waitForReadPermissionOrHandle returns either an already read value (in
     178             : // Handle), an error (if the context was cancelled), or neither, which is a
     179             : // directive to the caller to do the read. In this last case the caller must
     180             : // call either setReadValue or setReadError.
     181             : //
     182             : // In all cases, errorDuration is populated with the total duration that
     183             : // readers that observed an error (setReadError) spent in doing the read. This
     184             : // duration can be greater than the time spend in waitForReadPermissionHandle,
     185             : // since some of these errors could have occurred prior to this call. But it
     186             : // serves as a rough indicator of whether turn taking could have caused higher
     187             : // latency due to context cancellation.
     188             : func (e *readEntry) waitForReadPermissionOrHandle(
     189             :         ctx context.Context,
     190           1 : ) (h Handle, errorDuration time.Duration, err error) {
     191           1 :         constructHandleLocked := func() Handle {
     192           1 :                 if e.mu.v == nil {
     193           0 :                         panic("value is nil")
     194             :                 }
     195           1 :                 e.mu.v.acquire()
     196           1 :                 return Handle{value: e.mu.v}
     197             :         }
     198           1 :         becomeReaderLocked := func() {
     199           1 :                 if e.mu.v != nil {
     200           0 :                         panic("value is non-nil")
     201             :                 }
     202           1 :                 if e.mu.isReading {
     203           0 :                         panic("isReading is already true")
     204             :                 }
     205           1 :                 e.mu.isReading = true
     206           1 :                 if e.mu.ch != nil {
     207           0 :                         // Drain the channel, so that no one else mistakenly believes they
     208           0 :                         // should read.
     209           0 :                         select {
     210           0 :                         case <-e.mu.ch:
     211           0 :                         default:
     212             :                         }
     213             :                 }
     214           1 :                 e.mu.readStart = time.Now()
     215             :         }
     216           1 :         unlockAndUnrefAndTryRemoveFromMap := func(readLock bool) (errorDuration time.Duration) {
     217           1 :                 removeState := e.makeTryRemoveStateLocked()
     218           1 :                 errorDuration = e.mu.errorDuration
     219           1 :                 if readLock {
     220           1 :                         e.mu.RUnlock()
     221           1 :                 } else {
     222           1 :                         e.mu.Unlock()
     223           1 :                 }
     224           1 :                 unrefAndTryRemoveFromMap(removeState)
     225           1 :                 return errorDuration
     226             :         }
     227             : 
     228           1 :         for {
     229           1 :                 e.mu.Lock()
     230           1 :                 if e.mu.v != nil {
     231           1 :                         // Value has already been read.
     232           1 :                         h := constructHandleLocked()
     233           1 :                         errorDuration = unlockAndUnrefAndTryRemoveFromMap(false)
     234           1 :                         return h, errorDuration, nil
     235           1 :                 }
     236             :                 // Not already read. Wait for turn to do the read or for someone else to do
     237             :                 // the read.
     238           1 :                 if !e.mu.isReading {
     239           1 :                         // Have permission to do the read.
     240           1 :                         becomeReaderLocked()
     241           1 :                         errorDuration = e.mu.errorDuration
     242           1 :                         e.mu.Unlock()
     243           1 :                         return Handle{}, errorDuration, nil
     244           1 :                 }
     245           1 :                 if e.mu.ch == nil {
     246           1 :                         // Rare case when multiple readers are concurrently trying to read. If
     247           1 :                         // this turns out to be common enough we could use a sync.Pool.
     248           1 :                         e.mu.ch = make(chan struct{}, 1)
     249           1 :                 }
     250           1 :                 ch := e.mu.ch
     251           1 :                 e.mu.Unlock()
     252           1 :                 select {
     253           0 :                 case <-ctx.Done():
     254           0 :                         e.mu.Lock()
     255           0 :                         errorDuration = unlockAndUnrefAndTryRemoveFromMap(false)
     256           0 :                         return Handle{}, errorDuration, ctx.Err()
     257           1 :                 case _, ok := <-ch:
     258           1 :                         if !ok {
     259           1 :                                 // Channel closed, so value was read.
     260           1 :                                 e.mu.RLock()
     261           1 :                                 if e.mu.v == nil {
     262           0 :                                         panic("value is nil")
     263             :                                 }
     264           1 :                                 h := constructHandleLocked()
     265           1 :                                 errorDuration = unlockAndUnrefAndTryRemoveFromMap(true)
     266           1 :                                 return h, errorDuration, nil
     267             :                         }
     268             :                         // Else, probably granted permission to do the read. NB: since isReading
     269             :                         // is false, someone else can slip through before this thread acquires
     270             :                         // e.mu, and take the turn. So try to actually get the turn by trying
     271             :                         // again in the loop.
     272             :                 }
     273             :         }
     274             : }
     275             : 
     276             : // tryRemoveState captures the state needed by tryRemoveFromMap. The caller
     277             : // constructs it before calling unrefAndTryRemoveFromMap since it typically
     278             : // held readEntry.mu, which avoids acquiring it again in
     279             : // unrefAndTryRemoveFromMap.
     280             : type tryRemoveState struct {
     281             :         rs *readShard
     282             :         k  key
     283             :         e  *readEntry
     284             : }
     285             : 
     286             : // makeTryRemoveStateLocked initializes tryRemoveState.
     287           1 : func (e *readEntry) makeTryRemoveStateLocked() tryRemoveState {
     288           1 :         return tryRemoveState{
     289           1 :                 rs: e.readShard,
     290           1 :                 k:  key{fileKey{e.id, e.fileNum}, e.offset},
     291           1 :                 e:  e,
     292           1 :         }
     293           1 : }
     294             : 
     295             : // unrefAndTryRemoveFromMap tries to remove s.k => s.e from the map in s.rs.
     296             : // It is possible that after unreffing that s.e has already been removed, and
     297             : // is now back in the sync.Pool, or being reused (for the same or different
     298             : // key). This is because after unreffing, which caused the s.e.refCount to
     299             : // become zero, but before acquiring shard.mu, it could have been incremented
     300             : // and decremented concurrently, and some other goroutine could have observed
     301             : // a different decrement to 0, and raced ahead and deleted s.e from the
     302             : // readMap.
     303           1 : func unrefAndTryRemoveFromMap(s tryRemoveState) {
     304           1 :         if !s.e.refCount.release() {
     305           1 :                 return
     306           1 :         }
     307           1 :         s.rs.shard.mu.Lock()
     308           1 :         e2, ok := s.rs.shardMu.readMap.Get(s.k)
     309           1 :         if !ok || e2 != s.e {
     310           1 :                 // Already removed.
     311           1 :                 s.rs.shard.mu.Unlock()
     312           1 :                 return
     313           1 :         }
     314           1 :         if s.e.refCount.value() != 0 {
     315           0 :                 s.rs.shard.mu.Unlock()
     316           0 :                 return
     317           0 :         }
     318             :         // k => e and e.refCount == 0. And it cannot be incremented since
     319             :         // shard.mu.Lock() is held. So remove from map.
     320           1 :         s.rs.shardMu.readMap.Delete(s.k)
     321           1 :         s.rs.shard.mu.Unlock()
     322           1 : 
     323           1 :         // Free s.e.
     324           1 :         s.e.mu.Lock()
     325           1 :         if s.e.mu.v != nil {
     326           1 :                 s.e.mu.v.release()
     327           1 :                 s.e.mu.v = nil
     328           1 :         }
     329           1 :         s.e.mu.Unlock()
     330           1 :         *s.e = readEntry{}
     331           1 :         readEntryPool.Put(s.e)
     332             : }
     333             : 
     334           1 : func (e *readEntry) setReadValue(v *Value) Handle {
     335           1 :         // Add to the cache before taking another ref for readEntry, since the cache
     336           1 :         // expects ref=1 when it is called.
     337           1 :         //
     338           1 :         // TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure
     339           1 :         // that it is added as etHot. The common case will be e.refCount = 1, and we
     340           1 :         // don't want to acquire e.mu twice, so one way to do this would be relax
     341           1 :         // the invariant in shard.Set that requires Value.refs() == 1. Then we can
     342           1 :         // do the work under e.mu before calling shard.Set.
     343           1 :         h := e.readShard.shard.Set(e.id, e.fileNum, e.offset, v)
     344           1 :         e.mu.Lock()
     345           1 :         // Acquire a ref for readEntry, since we are going to remember it in e.mu.v.
     346           1 :         v.acquire()
     347           1 :         e.mu.v = v
     348           1 :         if !e.mu.isReading {
     349           0 :                 panic("isReading is false")
     350             :         }
     351           1 :         e.mu.isReading = false
     352           1 :         if e.mu.ch != nil {
     353           1 :                 // Inform all waiters so they can use e.mu.v. Not all readers have called
     354           1 :                 // readEntry.waitForReadPermissionOrHandle, and those will also use
     355           1 :                 // e.mu.v.
     356           1 :                 close(e.mu.ch)
     357           1 :         }
     358           1 :         removeState := e.makeTryRemoveStateLocked()
     359           1 :         e.mu.Unlock()
     360           1 :         unrefAndTryRemoveFromMap(removeState)
     361           1 :         return h
     362             : }
     363             : 
     364           1 : func (e *readEntry) setReadError(err error) {
     365           1 :         e.mu.Lock()
     366           1 :         if !e.mu.isReading {
     367           0 :                 panic("isReading is false")
     368             :         }
     369           1 :         e.mu.isReading = false
     370           1 :         if e.mu.ch != nil {
     371           0 :                 select {
     372           0 :                 case e.mu.ch <- struct{}{}:
     373           0 :                 default:
     374           0 :                         panic("channel is not empty")
     375             :                 }
     376             :         }
     377           1 :         e.mu.errorDuration += time.Since(e.mu.readStart)
     378           1 :         removeState := e.makeTryRemoveStateLocked()
     379           1 :         e.mu.Unlock()
     380           1 :         unrefAndTryRemoveFromMap(removeState)
     381             : }
     382             : 
     383             : // ReadHandle represents a contract with a caller that had a miss when doing a
     384             : // cache lookup, and wants to do a read and insert the read block into the
     385             : // cache. The contract applies when ReadHandle.Valid returns true, in which
     386             : // case the caller has been assigned the turn to do the read (and others are
     387             : // potentially waiting for it).
     388             : //
     389             : // Contract:
     390             : //
     391             : // The caller must immediately start doing a read, or can first wait on a
     392             : // shared resource that would also block a different reader if it was assigned
     393             : // the turn instead (specifically, this refers to Options.LoadBlockSema).
     394             : // After the read, it must either call SetReadValue or SetReadError depending
     395             : // on whether the read succeeded or failed.
     396             : type ReadHandle struct {
     397             :         entry *readEntry
     398             : }
     399             : 
     400             : // Valid returns true for a valid ReadHandle.
     401           1 : func (rh ReadHandle) Valid() bool {
     402           1 :         return rh.entry != nil
     403           1 : }
     404             : 
     405             : // SetReadValue provides the Value that the caller has read. The caller is
     406             : // responsible for releasing the returned Handle when it is no longer needed.
     407           1 : func (rh ReadHandle) SetReadValue(v *Value) Handle {
     408           1 :         return rh.entry.setReadValue(v)
     409           1 : }
     410             : 
     411             : // SetReadError specifies that the caller has encountered a read error.
     412           1 : func (rh ReadHandle) SetReadError(err error) {
     413           1 :         rh.entry.setReadError(err)
     414           1 : }

Generated by: LCOV version 1.14