Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package cache
6 :
7 : import (
8 : "context"
9 : "sync"
10 : "time"
11 :
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/swiss"
14 : )
15 :
16 : // readShard coordinates the read of a block that will be put in the cache. It
17 : // ensures only one goroutine is reading a block, and other callers block
18 : // until that goroutine is done (with success or failure). In the case of
19 : // success, the other goroutines will use the value that was read, even if it
20 : // is too large to be placed in the cache, or got evicted from the cache
21 : // before they got scheduled. In the case of a failure (read error or context
22 : // cancellation), one of the waiters will be given a turn to do the read.
23 : //
24 : // This turn-taking ensures that a large number of concurrent attempts to read
25 : // the same block that is not in the cache does not result in the same number
26 : // of reads from the filesystem (or remote storage). We have seen large spikes
27 : // in memory usage (and CPU usage for memory allocation/deallocation) without
28 : // this turn-taking.
29 : //
30 : // It introduces a small risk related to context cancellation -- if many
31 : // readers assigned a turn exceed their deadline while doing the read and
32 : // report an error, a reader with a longer deadline can unnecessarily wait. We
33 : // accept this risk for now since the primary production use in CockroachDB is
34 : // filesystem reads, where context cancellation is not respected. We do
35 : // introduce an error duration metric emitted in traces that can be used to
36 : // quantify such wasteful waiting. Note that this same risk extends to waiting
37 : // on the Options.LoadBlockSema, so the error duration metric includes the
38 : // case of an error when waiting on the semaphore (as a side effect of that
39 : // waiting happening in the caller, sstable.Reader).
40 : //
41 : // Design choices and motivation:
42 : //
43 : // - readShard is tightly integrated with a cache shard: At its core,
44 : // readShard is a map with synchronization. For the same reason the cache is
45 : // sharded (for higher concurrency by sharding the mutex), it is beneficial
46 : // to shard synchronization on readShard. By making readShard a member of
47 : // shard, this sharding is trivially accomplished. Additionally, the code
48 : // feels cleaner when there isn't a race between a cache miss, followed by
49 : // creating a readEntry that is no longer needed because someone else has
50 : // done the read since the miss and inserted into the cache. By making the
51 : // readShard use shard.mu, such a race is avoided. A side benefit is that
52 : // the cache interaction can be hidden behind readEntry.SetReadValue. One
53 : // disadvantage of this tightly integrated design is that it does not
54 : // encompass readers that will put the read value into a block.BufferPool --
55 : // we don't worry about those since block.BufferPool is only used for
56 : // compactions and there is at most one compaction reader of a block. There
57 : // is the possibility that the compaction reader and a user-facing iterator
58 : // reader will do duplicate reads, but we accept that deficiency.
59 : //
60 : // - readMap is separate from shard.blocks map: One could have a design which
61 : // extends the cache entry and unifies the two maps. However, we never want
62 : // to evict a readEntry while there are readers waiting for the block read
63 : // (including the case where the corresponding file is being removed from
64 : // shard.files). Also, the number of stable cache entries is huge and
65 : // therefore is manually allocated, while the number of readEntries is small
66 : // (so manual allocation isn't necessary). For these reasons we maintain a
67 : // separate map. This separation also results in more modular code, instead
68 : // of piling more stuff into shard.
69 : type readShard struct {
70 : // shard is only used for locking, and calling shard.Set.
71 : shard *shard
72 : // Protected by shard.mu.
73 : //
74 : // shard.mu is never held when acquiring readEntry.mu. shard.mu is a shared
75 : // resource and must be released quickly.
76 : shardMu struct {
77 : readMap swiss.Map[key, *readEntry]
78 : }
79 : }
80 :
81 1 : func (rs *readShard) Init(shard *shard) *readShard {
82 1 : *rs = readShard{
83 1 : shard: shard,
84 1 : }
85 1 : // Choice of 16 is arbitrary.
86 1 : rs.shardMu.readMap.Init(16)
87 1 : return rs
88 1 : }
89 :
90 : // getReadEntryLocked gets a *readEntry for (id, fileNum, offset). shard.mu is
91 : // already write locked.
92 1 : func (rs *readShard) getReadEntryLocked(id ID, fileNum base.DiskFileNum, offset uint64) *readEntry {
93 1 : k := key{fileKey{id, fileNum}, offset}
94 1 : e, ok := rs.shardMu.readMap.Get(k)
95 1 : if !ok {
96 1 : e = newReadEntry(rs, id, fileNum, offset)
97 1 : rs.shardMu.readMap.Put(k, e)
98 1 : } else {
99 1 : e.refCount.acquireAllowZero()
100 1 : }
101 1 : return e
102 : }
103 :
104 0 : func (rs *readShard) lenForTesting() int {
105 0 : rs.shard.mu.Lock()
106 0 : defer rs.shard.mu.Unlock()
107 0 : return rs.shardMu.readMap.Len()
108 0 : }
109 :
110 : // readEntry is used to coordinate between concurrent attempted readers of the
111 : // same block.
112 : type readEntry struct {
113 : readShard *readShard
114 : id ID
115 : fileNum base.DiskFileNum
116 : offset uint64
117 : mu struct {
118 : sync.RWMutex
119 : // v, when non-nil, has a ref from readEntry, which is unreffed when
120 : // readEntry is deleted from the readMap.
121 : v *Value
122 : // isReading and ch together capture the state of whether someone has been
123 : // granted a turn to read, and of readers waiting for that read to finish.
124 : // ch is lazily allocated since most readEntries will not see concurrent
125 : // readers. This lazy allocation results in one transition of ch from nil
126 : // to non-nil, so waiters can read this non-nil ch and block on reading
127 : // from it without holding mu.
128 : //
129 : // ch is written to, to signal one waiter to start doing the read. ch is
130 : // closed when the value is successfully read and has been stored in v, so
131 : // that all waiters wake up and read v. ch is a buffered channel with a
132 : // capacity of 1.
133 : //
134 : // State transitions when trying to wait for turn:
135 : // Case !isReading:
136 : // set isReading=true; Drain the ch if non-nil and non-empty; proceed
137 : // with turn to do the read.
138 : // Case isReading:
139 : // allocate ch if nil; wait on ch
140 : // Finished reading successfully:
141 : // set isReading=false; if ch is non-nil, close ch.
142 : // Finished reading with failure:
143 : // set isReading=false; if ch is non-nil, write to ch.
144 : //
145 : // INVARIANT:
146 : // isReading => ch is nil or ch is empty.
147 : isReading bool
148 : ch chan struct{}
149 : // Total duration of reads and semaphore waiting that resulted in error.
150 : errorDuration time.Duration
151 : readStart time.Time
152 : }
153 : // Count of ReadHandles that refer to this readEntry. Increments always hold
154 : // shard.mu. So if this is found to be 0 while holding shard.mu, it is safe
155 : // to delete readEntry from readShard.shardMu.readMap.
156 : refCount refcnt
157 : }
158 :
159 : var readEntryPool = sync.Pool{
160 1 : New: func() interface{} {
161 1 : return &readEntry{}
162 1 : },
163 : }
164 :
165 1 : func newReadEntry(rs *readShard, id ID, fileNum base.DiskFileNum, offset uint64) *readEntry {
166 1 : e := readEntryPool.Get().(*readEntry)
167 1 : *e = readEntry{
168 1 : readShard: rs,
169 1 : id: id,
170 1 : fileNum: fileNum,
171 1 : offset: offset,
172 1 : }
173 1 : e.refCount.init(1)
174 1 : return e
175 1 : }
176 :
177 : // waitForReadPermissionOrHandle returns either an already read value (in
178 : // Handle), an error (if the context was cancelled), or neither, which is a
179 : // directive to the caller to do the read. In this last case the caller must
180 : // call either setReadValue or setReadError.
181 : //
182 : // In all cases, errorDuration is populated with the total duration that
183 : // readers that observed an error (setReadError) spent in doing the read. This
184 : // duration can be greater than the time spend in waitForReadPermissionHandle,
185 : // since some of these errors could have occurred prior to this call. But it
186 : // serves as a rough indicator of whether turn taking could have caused higher
187 : // latency due to context cancellation.
188 : func (e *readEntry) waitForReadPermissionOrHandle(
189 : ctx context.Context,
190 1 : ) (h Handle, errorDuration time.Duration, err error) {
191 1 : constructHandleLocked := func() Handle {
192 1 : if e.mu.v == nil {
193 0 : panic("value is nil")
194 : }
195 1 : e.mu.v.acquire()
196 1 : return Handle{value: e.mu.v}
197 : }
198 1 : becomeReaderLocked := func() {
199 1 : if e.mu.v != nil {
200 0 : panic("value is non-nil")
201 : }
202 1 : if e.mu.isReading {
203 0 : panic("isReading is already true")
204 : }
205 1 : e.mu.isReading = true
206 1 : if e.mu.ch != nil {
207 0 : // Drain the channel, so that no one else mistakenly believes they
208 0 : // should read.
209 0 : select {
210 0 : case <-e.mu.ch:
211 0 : default:
212 : }
213 : }
214 1 : e.mu.readStart = time.Now()
215 : }
216 1 : unlockAndUnrefAndTryRemoveFromMap := func(readLock bool) (errorDuration time.Duration) {
217 1 : removeState := e.makeTryRemoveStateLocked()
218 1 : errorDuration = e.mu.errorDuration
219 1 : if readLock {
220 1 : e.mu.RUnlock()
221 1 : } else {
222 1 : e.mu.Unlock()
223 1 : }
224 1 : unrefAndTryRemoveFromMap(removeState)
225 1 : return errorDuration
226 : }
227 :
228 1 : for {
229 1 : e.mu.Lock()
230 1 : if e.mu.v != nil {
231 1 : // Value has already been read.
232 1 : h := constructHandleLocked()
233 1 : errorDuration = unlockAndUnrefAndTryRemoveFromMap(false)
234 1 : return h, errorDuration, nil
235 1 : }
236 : // Not already read. Wait for turn to do the read or for someone else to do
237 : // the read.
238 1 : if !e.mu.isReading {
239 1 : // Have permission to do the read.
240 1 : becomeReaderLocked()
241 1 : errorDuration = e.mu.errorDuration
242 1 : e.mu.Unlock()
243 1 : return Handle{}, errorDuration, nil
244 1 : }
245 1 : if e.mu.ch == nil {
246 1 : // Rare case when multiple readers are concurrently trying to read. If
247 1 : // this turns out to be common enough we could use a sync.Pool.
248 1 : e.mu.ch = make(chan struct{}, 1)
249 1 : }
250 1 : ch := e.mu.ch
251 1 : e.mu.Unlock()
252 1 : select {
253 0 : case <-ctx.Done():
254 0 : e.mu.Lock()
255 0 : errorDuration = unlockAndUnrefAndTryRemoveFromMap(false)
256 0 : return Handle{}, errorDuration, ctx.Err()
257 1 : case _, ok := <-ch:
258 1 : if !ok {
259 1 : // Channel closed, so value was read.
260 1 : e.mu.RLock()
261 1 : if e.mu.v == nil {
262 0 : panic("value is nil")
263 : }
264 1 : h := constructHandleLocked()
265 1 : errorDuration = unlockAndUnrefAndTryRemoveFromMap(true)
266 1 : return h, errorDuration, nil
267 : }
268 : // Else, probably granted permission to do the read. NB: since isReading
269 : // is false, someone else can slip through before this thread acquires
270 : // e.mu, and take the turn. So try to actually get the turn by trying
271 : // again in the loop.
272 : }
273 : }
274 : }
275 :
276 : // tryRemoveState captures the state needed by tryRemoveFromMap. The caller
277 : // constructs it before calling unrefAndTryRemoveFromMap since it typically
278 : // held readEntry.mu, which avoids acquiring it again in
279 : // unrefAndTryRemoveFromMap.
280 : type tryRemoveState struct {
281 : rs *readShard
282 : k key
283 : e *readEntry
284 : }
285 :
286 : // makeTryRemoveStateLocked initializes tryRemoveState.
287 1 : func (e *readEntry) makeTryRemoveStateLocked() tryRemoveState {
288 1 : return tryRemoveState{
289 1 : rs: e.readShard,
290 1 : k: key{fileKey{e.id, e.fileNum}, e.offset},
291 1 : e: e,
292 1 : }
293 1 : }
294 :
295 : // unrefAndTryRemoveFromMap tries to remove s.k => s.e from the map in s.rs.
296 : // It is possible that after unreffing that s.e has already been removed, and
297 : // is now back in the sync.Pool, or being reused (for the same or different
298 : // key). This is because after unreffing, which caused the s.e.refCount to
299 : // become zero, but before acquiring shard.mu, it could have been incremented
300 : // and decremented concurrently, and some other goroutine could have observed
301 : // a different decrement to 0, and raced ahead and deleted s.e from the
302 : // readMap.
303 1 : func unrefAndTryRemoveFromMap(s tryRemoveState) {
304 1 : if !s.e.refCount.release() {
305 1 : return
306 1 : }
307 1 : s.rs.shard.mu.Lock()
308 1 : e2, ok := s.rs.shardMu.readMap.Get(s.k)
309 1 : if !ok || e2 != s.e {
310 1 : // Already removed.
311 1 : s.rs.shard.mu.Unlock()
312 1 : return
313 1 : }
314 1 : if s.e.refCount.value() != 0 {
315 0 : s.rs.shard.mu.Unlock()
316 0 : return
317 0 : }
318 : // k => e and e.refCount == 0. And it cannot be incremented since
319 : // shard.mu.Lock() is held. So remove from map.
320 1 : s.rs.shardMu.readMap.Delete(s.k)
321 1 : s.rs.shard.mu.Unlock()
322 1 :
323 1 : // Free s.e.
324 1 : s.e.mu.Lock()
325 1 : if s.e.mu.v != nil {
326 1 : s.e.mu.v.release()
327 1 : s.e.mu.v = nil
328 1 : }
329 1 : s.e.mu.Unlock()
330 1 : *s.e = readEntry{}
331 1 : readEntryPool.Put(s.e)
332 : }
333 :
334 1 : func (e *readEntry) setReadValue(v *Value) Handle {
335 1 : // Add to the cache before taking another ref for readEntry, since the cache
336 1 : // expects ref=1 when it is called.
337 1 : //
338 1 : // TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure
339 1 : // that it is added as etHot. The common case will be e.refCount = 1, and we
340 1 : // don't want to acquire e.mu twice, so one way to do this would be relax
341 1 : // the invariant in shard.Set that requires Value.refs() == 1. Then we can
342 1 : // do the work under e.mu before calling shard.Set.
343 1 : h := e.readShard.shard.Set(e.id, e.fileNum, e.offset, v)
344 1 : e.mu.Lock()
345 1 : // Acquire a ref for readEntry, since we are going to remember it in e.mu.v.
346 1 : v.acquire()
347 1 : e.mu.v = v
348 1 : if !e.mu.isReading {
349 0 : panic("isReading is false")
350 : }
351 1 : e.mu.isReading = false
352 1 : if e.mu.ch != nil {
353 1 : // Inform all waiters so they can use e.mu.v. Not all readers have called
354 1 : // readEntry.waitForReadPermissionOrHandle, and those will also use
355 1 : // e.mu.v.
356 1 : close(e.mu.ch)
357 1 : }
358 1 : removeState := e.makeTryRemoveStateLocked()
359 1 : e.mu.Unlock()
360 1 : unrefAndTryRemoveFromMap(removeState)
361 1 : return h
362 : }
363 :
364 1 : func (e *readEntry) setReadError(err error) {
365 1 : e.mu.Lock()
366 1 : if !e.mu.isReading {
367 0 : panic("isReading is false")
368 : }
369 1 : e.mu.isReading = false
370 1 : if e.mu.ch != nil {
371 0 : select {
372 0 : case e.mu.ch <- struct{}{}:
373 0 : default:
374 0 : panic("channel is not empty")
375 : }
376 : }
377 1 : e.mu.errorDuration += time.Since(e.mu.readStart)
378 1 : removeState := e.makeTryRemoveStateLocked()
379 1 : e.mu.Unlock()
380 1 : unrefAndTryRemoveFromMap(removeState)
381 : }
382 :
383 : // ReadHandle represents a contract with a caller that had a miss when doing a
384 : // cache lookup, and wants to do a read and insert the read block into the
385 : // cache. The contract applies when ReadHandle.Valid returns true, in which
386 : // case the caller has been assigned the turn to do the read (and others are
387 : // potentially waiting for it).
388 : //
389 : // Contract:
390 : //
391 : // The caller must immediately start doing a read, or can first wait on a
392 : // shared resource that would also block a different reader if it was assigned
393 : // the turn instead (specifically, this refers to Options.LoadBlockSema).
394 : // After the read, it must either call SetReadValue or SetReadError depending
395 : // on whether the read succeeded or failed.
396 : type ReadHandle struct {
397 : entry *readEntry
398 : }
399 :
400 : // Valid returns true for a valid ReadHandle.
401 1 : func (rh ReadHandle) Valid() bool {
402 1 : return rh.entry != nil
403 1 : }
404 :
405 : // SetReadValue provides the Value that the caller has read. The caller is
406 : // responsible for releasing the returned Handle when it is no longer needed.
407 1 : func (rh ReadHandle) SetReadValue(v *Value) Handle {
408 1 : return rh.entry.setReadValue(v)
409 1 : }
410 :
411 : // SetReadError specifies that the caller has encountered a read error.
412 1 : func (rh ReadHandle) SetReadError(err error) {
413 1 : rh.entry.setReadError(err)
414 1 : }
|