Line data Source code
1 : // Copyright 2018. All rights reserved. Use of this source code is governed by
2 : // an MIT-style license that can be found in the LICENSE file.
3 :
4 : // Package cache implements the CLOCK-Pro caching algorithm.
5 : //
6 : // CLOCK-Pro is a patent-free alternative to the Adaptive Replacement Cache,
7 : // https://en.wikipedia.org/wiki/Adaptive_replacement_cache.
8 : // It is an approximation of LIRS ( https://en.wikipedia.org/wiki/LIRS_caching_algorithm ),
9 : // much like the CLOCK page replacement algorithm is an approximation of LRU.
10 : //
11 : // This implementation is based on the python code from https://bitbucket.org/SamiLehtinen/pyclockpro .
12 : //
13 : // Slides describing the algorithm: http://fr.slideshare.net/huliang64/clockpro
14 : //
15 : // The original paper: http://static.usenix.org/event/usenix05/tech/general/full_papers/jiang/jiang_html/html.html
16 : //
17 : // It is MIT licensed, like the original.
18 : package cache // import "github.com/cockroachdb/pebble/internal/cache"
19 :
20 : import (
21 : "context"
22 : "fmt"
23 : "os"
24 : "runtime"
25 : "runtime/debug"
26 : "strings"
27 : "sync"
28 : "sync/atomic"
29 : "time"
30 :
31 : "github.com/cockroachdb/pebble/internal/base"
32 : "github.com/cockroachdb/pebble/internal/invariants"
33 : )
34 :
35 : type fileKey struct {
36 : // id is the namespace for fileNums.
37 : id ID
38 : fileNum base.DiskFileNum
39 : }
40 :
41 : type key struct {
42 : fileKey
43 : offset uint64
44 : }
45 :
46 : // file returns the "file key" for the receiver. This is the key used for the
47 : // shard.files map.
48 2 : func (k key) file() key {
49 2 : k.offset = 0
50 2 : return k
51 2 : }
52 :
53 0 : func (k key) String() string {
54 0 : return fmt.Sprintf("%d/%d/%d", k.id, k.fileNum, k.offset)
55 0 : }
56 :
57 : // Handle provides a strong reference to a value in the cache. The reference
58 : // does not pin the value in the cache, but it does prevent the underlying byte
59 : // slice from being reused.
60 : type Handle struct {
61 : value *Value
62 : }
63 :
64 : // Valid returns true if the handle holds a value.
65 2 : func (h Handle) Valid() bool {
66 2 : return h.value != nil
67 2 : }
68 :
69 : // RawBuffer returns the value buffer. Note that this buffer holds the block
70 : // metadata and the data and should be used through a block.BufferHandle.
71 : //
72 : // RawBuffer can only be called if the handle is Valid().
73 2 : func (h Handle) RawBuffer() []byte {
74 2 : // NB: We don't increment shard.hits in this code path because we only want
75 2 : // to record a hit when the handle is retrieved from the cache.
76 2 : return h.value.buf
77 2 : }
78 :
79 : // Release releases the reference to the cache entry.
80 2 : func (h Handle) Release() {
81 2 : h.value.release()
82 2 : }
83 :
84 : type shard struct {
85 : hits atomic.Int64
86 : misses atomic.Int64
87 :
88 : mu sync.RWMutex
89 :
90 : reservedSize int64
91 : maxSize int64
92 : coldTarget int64
93 : blocks blockMap // fileNum+offset -> block
94 : files blockMap // fileNum -> list of blocks
95 :
96 : // The blocks and files maps store values in manually managed memory that is
97 : // invisible to the Go GC. This is fine for Value and entry objects that are
98 : // stored in manually managed memory, but when the "invariants" build tag is
99 : // set, all Value and entry objects are Go allocated and the entries map will
100 : // contain a reference to every entry.
101 : entries map[*entry]struct{}
102 :
103 : handHot *entry
104 : handCold *entry
105 : handTest *entry
106 :
107 : sizeHot int64
108 : sizeCold int64
109 : sizeTest int64
110 :
111 : // The count fields are used exclusively for asserting expectations.
112 : // We've seen infinite looping (cockroachdb/cockroach#70154) that
113 : // could be explained by a corrupted sizeCold. Through asserting on
114 : // these fields, we hope to gain more insight from any future
115 : // reproductions.
116 : countHot int64
117 : countCold int64
118 : countTest int64
119 :
120 : // Some fields in readShard are protected by mu. See comments in declaration
121 : // of readShard.
122 : readShard readShard
123 : }
124 :
125 2 : func (c *shard) init(maxSize int64) {
126 2 : *c = shard{
127 2 : maxSize: maxSize,
128 2 : coldTarget: maxSize,
129 2 : }
130 2 : if entriesGoAllocated {
131 2 : c.entries = make(map[*entry]struct{})
132 2 : }
133 2 : c.blocks.Init(16)
134 2 : c.files.Init(16)
135 2 : c.readShard.Init(c)
136 : }
137 :
138 : // getWithMaybeReadEntry is the internal helper for implementing
139 : // Cache.{Get,GetWithReadHandle}. When desireReadEntry is true, and the block
140 : // is not in the cache (!Handle.Valid()), a non-nil readEntry is returned.
141 : func (c *shard) getWithMaybeReadEntry(
142 : id ID, fileNum base.DiskFileNum, offset uint64, desireReadEntry bool,
143 2 : ) (Handle, *readEntry) {
144 2 : c.mu.RLock()
145 2 : var value *Value
146 2 : if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
147 2 : value = e.acquireValue()
148 2 : if value != nil {
149 2 : e.referenced.Store(true)
150 2 : }
151 : }
152 2 : c.mu.RUnlock()
153 2 : var re *readEntry
154 2 : if value == nil && desireReadEntry {
155 2 : c.mu.Lock()
156 2 : // After the c.mu.RUnlock(), someone could have inserted the value in the
157 2 : // cache. We could tolerate the race and do a file read, or do another map
158 2 : // lookup. We choose to do the latter, since the cost of a map lookup is
159 2 : // insignificant compared to the cost of reading a block from a file.
160 2 : if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
161 2 : value = e.acquireValue()
162 2 : if value != nil {
163 2 : e.referenced.Store(true)
164 2 : }
165 : }
166 2 : if value == nil {
167 2 : re = c.readShard.getReadEntryLocked(id, fileNum, offset)
168 2 : }
169 2 : c.mu.Unlock()
170 : }
171 2 : if value == nil {
172 2 : c.misses.Add(1)
173 2 : } else {
174 2 : c.hits.Add(1)
175 2 : }
176 2 : return Handle{value: value}, re
177 : }
178 :
179 2 : func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle {
180 2 : if n := value.refs(); n != 1 {
181 0 : panic(fmt.Sprintf("pebble: Value has already been added to the cache: refs=%d", n))
182 : }
183 :
184 2 : c.mu.Lock()
185 2 : defer c.mu.Unlock()
186 2 :
187 2 : k := key{fileKey{id, fileNum}, offset}
188 2 : e, _ := c.blocks.Get(k)
189 2 :
190 2 : switch {
191 2 : case e == nil:
192 2 : // no cache entry? add it
193 2 : e = newEntry(k, int64(len(value.buf)))
194 2 : e.setValue(value)
195 2 : if c.metaAdd(k, e) {
196 2 : value.ref.trace("add-cold")
197 2 : c.sizeCold += e.size
198 2 : c.countCold++
199 2 : } else {
200 2 : value.ref.trace("skip-cold")
201 2 : e.free()
202 2 : e = nil
203 2 : }
204 :
205 1 : case e.val != nil:
206 1 : // cache entry was a hot or cold page
207 1 : e.setValue(value)
208 1 : e.referenced.Store(true)
209 1 : delta := int64(len(value.buf)) - e.size
210 1 : e.size = int64(len(value.buf))
211 1 : if e.ptype == etHot {
212 1 : value.ref.trace("add-hot")
213 1 : c.sizeHot += delta
214 1 : } else {
215 1 : // TODO(sumeer): unclear why we don't set e.ptype to etHot on this path.
216 1 : // In the default case below, where the state is etTest we set it to
217 1 : // etHot. But etTest is "colder" than etCold, since the only transition
218 1 : // into etTest is etCold => etTest, so since etTest transitions to
219 1 : // etHot, then etCold should also transition.
220 1 : value.ref.trace("add-cold")
221 1 : c.sizeCold += delta
222 1 : }
223 1 : c.evict()
224 :
225 2 : default:
226 2 : // cache entry was a test page
227 2 : c.sizeTest -= e.size
228 2 : c.countTest--
229 2 : v := c.metaDel(e)
230 2 : if invariants.Enabled && v != nil {
231 0 : panic("value should be nil")
232 : }
233 2 : c.metaCheck(e)
234 2 :
235 2 : e.size = int64(len(value.buf))
236 2 : c.coldTarget += e.size
237 2 : if c.coldTarget > c.targetSize() {
238 1 : c.coldTarget = c.targetSize()
239 1 : }
240 :
241 2 : e.referenced.Store(false)
242 2 : e.setValue(value)
243 2 : e.ptype = etHot
244 2 : if c.metaAdd(k, e) {
245 2 : value.ref.trace("add-hot")
246 2 : c.sizeHot += e.size
247 2 : c.countHot++
248 2 : } else {
249 0 : value.ref.trace("skip-hot")
250 0 : e.free()
251 0 : e = nil
252 0 : }
253 : }
254 :
255 2 : c.checkConsistency()
256 2 :
257 2 : // Values are initialized with a reference count of 1. That reference count
258 2 : // is being transferred to the returned Handle.
259 2 : return Handle{value: value}
260 : }
261 :
262 2 : func (c *shard) checkConsistency() {
263 2 : // See the comment above the count{Hot,Cold,Test} fields.
264 2 : switch {
265 0 : case c.sizeHot < 0 || c.sizeCold < 0 || c.sizeTest < 0 || c.countHot < 0 || c.countCold < 0 || c.countTest < 0:
266 0 : panic(fmt.Sprintf("pebble: unexpected negative: %d (%d bytes) hot, %d (%d bytes) cold, %d (%d bytes) test",
267 0 : c.countHot, c.sizeHot, c.countCold, c.sizeCold, c.countTest, c.sizeTest))
268 0 : case c.sizeHot > 0 && c.countHot == 0:
269 0 : panic(fmt.Sprintf("pebble: mismatch %d hot size, %d hot count", c.sizeHot, c.countHot))
270 0 : case c.sizeCold > 0 && c.countCold == 0:
271 0 : panic(fmt.Sprintf("pebble: mismatch %d cold size, %d cold count", c.sizeCold, c.countCold))
272 0 : case c.sizeTest > 0 && c.countTest == 0:
273 0 : panic(fmt.Sprintf("pebble: mismatch %d test size, %d test count", c.sizeTest, c.countTest))
274 : }
275 : }
276 :
277 : // Delete deletes the cached value for the specified file and offset.
278 2 : func (c *shard) Delete(id ID, fileNum base.DiskFileNum, offset uint64) {
279 2 : // The common case is there is nothing to delete, so do a quick check with
280 2 : // shared lock.
281 2 : k := key{fileKey{id, fileNum}, offset}
282 2 : c.mu.RLock()
283 2 : _, exists := c.blocks.Get(k)
284 2 : c.mu.RUnlock()
285 2 : if !exists {
286 2 : return
287 2 : }
288 :
289 1 : var deletedValue *Value
290 1 : func() {
291 1 : c.mu.Lock()
292 1 : defer c.mu.Unlock()
293 1 :
294 1 : e, _ := c.blocks.Get(k)
295 1 : if e == nil {
296 0 : return
297 0 : }
298 1 : deletedValue = c.metaEvict(e)
299 1 : c.checkConsistency()
300 : }()
301 : // Now that the mutex has been dropped, release the reference which will
302 : // potentially free the memory associated with the previous cached value.
303 1 : deletedValue.release()
304 : }
305 :
306 : // EvictFile evicts all of the cache values for the specified file.
307 2 : func (c *shard) EvictFile(id ID, fileNum base.DiskFileNum) {
308 2 : fkey := key{fileKey{id, fileNum}, 0}
309 2 : for c.evictFileRun(fkey) {
310 2 : // Sched switch to give another goroutine an opportunity to acquire the
311 2 : // shard mutex.
312 2 : runtime.Gosched()
313 2 : }
314 : }
315 :
316 2 : func (c *shard) evictFileRun(fkey key) (moreRemaining bool) {
317 2 : // If most of the file's blocks are held in the block cache, evicting all
318 2 : // the blocks may take a while. We don't want to block the entire cache
319 2 : // shard, forcing concurrent readers to wait until we're finished. We drop
320 2 : // the mutex every [blocksPerMutexAcquisition] blocks to give other
321 2 : // goroutines an opportunity to make progress.
322 2 : const blocksPerMutexAcquisition = 5
323 2 : c.mu.Lock()
324 2 :
325 2 : // Releasing a value may result in free-ing it back to the memory allocator.
326 2 : // This can have a nontrivial cost that we'd prefer to not pay while holding
327 2 : // the shard mutex, so we collect the evicted values in a local slice and
328 2 : // only release them in a defer after dropping the cache mutex.
329 2 : var obsoleteValuesAlloc [blocksPerMutexAcquisition]*Value
330 2 : obsoleteValues := obsoleteValuesAlloc[:0]
331 2 : defer func() {
332 2 : c.mu.Unlock()
333 2 : for _, v := range obsoleteValues {
334 2 : v.release()
335 2 : }
336 : }()
337 :
338 2 : blocks, _ := c.files.Get(fkey)
339 2 : if blocks == nil {
340 2 : // No blocks for this file.
341 2 : return false
342 2 : }
343 :
344 : // b is the current head of the doubly linked list, and n is the entry after b.
345 2 : for b, n := blocks, (*entry)(nil); len(obsoleteValues) < cap(obsoleteValues); b = n {
346 2 : n = b.fileLink.next
347 2 : obsoleteValues = append(obsoleteValues, c.metaEvict(b))
348 2 : if b == n {
349 2 : // b == n represents the case where b was the last entry remaining
350 2 : // in the doubly linked list, which is why it pointed at itself. So
351 2 : // no more entries left.
352 2 : c.checkConsistency()
353 2 : return false
354 2 : }
355 : }
356 : // Exhausted blocksPerMutexAcquisition.
357 2 : return true
358 : }
359 :
360 2 : func (c *shard) Free() {
361 2 : c.mu.Lock()
362 2 : defer c.mu.Unlock()
363 2 :
364 2 : // NB: we use metaDel rather than metaEvict in order to avoid the expensive
365 2 : // metaCheck call when the "invariants" build tag is specified.
366 2 : for c.handHot != nil {
367 2 : e := c.handHot
368 2 : c.metaDel(c.handHot).release()
369 2 : e.free()
370 2 : }
371 :
372 2 : c.blocks.Close()
373 2 : c.files.Close()
374 : }
375 :
376 2 : func (c *shard) Reserve(n int) {
377 2 : c.mu.Lock()
378 2 : defer c.mu.Unlock()
379 2 : c.reservedSize += int64(n)
380 2 :
381 2 : // Changing c.reservedSize will either increase or decrease
382 2 : // the targetSize. But we want coldTarget to be in the range
383 2 : // [0, targetSize]. So, if c.targetSize decreases, make sure
384 2 : // that the coldTarget fits within the limits.
385 2 : targetSize := c.targetSize()
386 2 : if c.coldTarget > targetSize {
387 2 : c.coldTarget = targetSize
388 2 : }
389 :
390 2 : c.evict()
391 2 : c.checkConsistency()
392 : }
393 :
394 : // Size returns the current space used by the cache.
395 1 : func (c *shard) Size() int64 {
396 1 : c.mu.RLock()
397 1 : size := c.sizeHot + c.sizeCold
398 1 : c.mu.RUnlock()
399 1 : return size
400 1 : }
401 :
402 2 : func (c *shard) targetSize() int64 {
403 2 : target := c.maxSize - c.reservedSize
404 2 : // Always return a positive integer for targetSize. This is so that we don't
405 2 : // end up in an infinite loop in evict(), in cases where reservedSize is
406 2 : // greater than or equal to maxSize.
407 2 : if target < 1 {
408 2 : return 1
409 2 : }
410 2 : return target
411 : }
412 :
413 : // Add the entry to the cache, returning true if the entry was added and false
414 : // if it would not fit in the cache.
415 2 : func (c *shard) metaAdd(key key, e *entry) bool {
416 2 : c.evict()
417 2 : if e.size > c.targetSize() {
418 2 : // The entry is larger than the target cache size.
419 2 : return false
420 2 : }
421 :
422 2 : c.blocks.Put(key, e)
423 2 : if entriesGoAllocated {
424 2 : // Go allocated entries need to be referenced from Go memory. The entries
425 2 : // map provides that reference.
426 2 : c.entries[e] = struct{}{}
427 2 : }
428 :
429 2 : if c.handHot == nil {
430 2 : // first element
431 2 : c.handHot = e
432 2 : c.handCold = e
433 2 : c.handTest = e
434 2 : } else {
435 2 : c.handHot.link(e)
436 2 : }
437 :
438 2 : if c.handCold == c.handHot {
439 2 : c.handCold = c.handCold.prev()
440 2 : }
441 :
442 2 : fkey := key.file()
443 2 : if fileBlocks, _ := c.files.Get(fkey); fileBlocks == nil {
444 2 : c.files.Put(fkey, e)
445 2 : } else {
446 2 : fileBlocks.linkFile(e)
447 2 : }
448 2 : return true
449 : }
450 :
451 : // Remove the entry from the cache. This removes the entry from the blocks map,
452 : // the files map, and ensures that hand{Hot,Cold,Test} are not pointing at the
453 : // entry. Returns the deleted value that must be released, if any.
454 2 : func (c *shard) metaDel(e *entry) (deletedValue *Value) {
455 2 : if value := e.val; value != nil {
456 2 : value.ref.trace("metaDel")
457 2 : }
458 : // Remove the pointer to the value.
459 2 : deletedValue = e.val
460 2 : e.val = nil
461 2 :
462 2 : c.blocks.Delete(e.key)
463 2 : if entriesGoAllocated {
464 2 : // Go allocated entries need to be referenced from Go memory. The entries
465 2 : // map provides that reference.
466 2 : delete(c.entries, e)
467 2 : }
468 :
469 2 : if e == c.handHot {
470 2 : c.handHot = c.handHot.prev()
471 2 : }
472 2 : if e == c.handCold {
473 2 : c.handCold = c.handCold.prev()
474 2 : }
475 2 : if e == c.handTest {
476 2 : c.handTest = c.handTest.prev()
477 2 : }
478 :
479 2 : if e.unlink() == e {
480 2 : // This was the last entry in the cache.
481 2 : c.handHot = nil
482 2 : c.handCold = nil
483 2 : c.handTest = nil
484 2 : }
485 :
486 2 : fkey := e.key.file()
487 2 : if next := e.unlinkFile(); e == next {
488 2 : c.files.Delete(fkey)
489 2 : } else {
490 2 : c.files.Put(fkey, next)
491 2 : }
492 2 : return deletedValue
493 : }
494 :
495 : // Check that the specified entry is not referenced by the cache.
496 2 : func (c *shard) metaCheck(e *entry) {
497 2 : if invariants.Enabled {
498 2 : if _, ok := c.entries[e]; ok {
499 0 : fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in entries map\n%s",
500 0 : e, e.key, debug.Stack())
501 0 : os.Exit(1)
502 0 : }
503 2 : if c.blocks.findByValue(e) {
504 0 : fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in blocks map\n%#v\n%s",
505 0 : e, e.key, &c.blocks, debug.Stack())
506 0 : os.Exit(1)
507 0 : }
508 2 : if c.files.findByValue(e) {
509 0 : fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in files map\n%#v\n%s",
510 0 : e, e.key, &c.files, debug.Stack())
511 0 : os.Exit(1)
512 0 : }
513 : // NB: c.hand{Hot,Cold,Test} are pointers into a single linked list. We
514 : // only have to traverse one of them to check all of them.
515 2 : var countHot, countCold, countTest int64
516 2 : var sizeHot, sizeCold, sizeTest int64
517 2 : for t := c.handHot.next(); t != nil; t = t.next() {
518 2 : // Recompute count{Hot,Cold,Test} and size{Hot,Cold,Test}.
519 2 : switch t.ptype {
520 2 : case etHot:
521 2 : countHot++
522 2 : sizeHot += t.size
523 2 : case etCold:
524 2 : countCold++
525 2 : sizeCold += t.size
526 2 : case etTest:
527 2 : countTest++
528 2 : sizeTest += t.size
529 : }
530 2 : if e == t {
531 0 : fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in blocks list\n%s",
532 0 : e, e.key, debug.Stack())
533 0 : os.Exit(1)
534 0 : }
535 2 : if t == c.handHot {
536 2 : break
537 : }
538 : }
539 2 : if countHot != c.countHot || countCold != c.countCold || countTest != c.countTest ||
540 2 : sizeHot != c.sizeHot || sizeCold != c.sizeCold || sizeTest != c.sizeTest {
541 0 : fmt.Fprintf(os.Stderr, `divergence of Hot,Cold,Test statistics
542 0 : cache's statistics: hot %d, %d, cold %d, %d, test %d, %d
543 0 : recalculated statistics: hot %d, %d, cold %d, %d, test %d, %d\n%s`,
544 0 : c.countHot, c.sizeHot, c.countCold, c.sizeCold, c.countTest, c.sizeTest,
545 0 : countHot, sizeHot, countCold, sizeCold, countTest, sizeTest,
546 0 : debug.Stack())
547 0 : os.Exit(1)
548 0 : }
549 : }
550 : }
551 :
552 2 : func (c *shard) metaEvict(e *entry) (evictedValue *Value) {
553 2 : switch e.ptype {
554 1 : case etHot:
555 1 : c.sizeHot -= e.size
556 1 : c.countHot--
557 2 : case etCold:
558 2 : c.sizeCold -= e.size
559 2 : c.countCold--
560 1 : case etTest:
561 1 : c.sizeTest -= e.size
562 1 : c.countTest--
563 : }
564 2 : evictedValue = c.metaDel(e)
565 2 : c.metaCheck(e)
566 2 : e.free()
567 2 : return evictedValue
568 : }
569 :
570 2 : func (c *shard) evict() {
571 2 : for c.targetSize() <= c.sizeHot+c.sizeCold && c.handCold != nil {
572 2 : c.runHandCold(c.countCold, c.sizeCold)
573 2 : }
574 : }
575 :
576 2 : func (c *shard) runHandCold(countColdDebug, sizeColdDebug int64) {
577 2 : // countColdDebug and sizeColdDebug should equal c.countCold and
578 2 : // c.sizeCold. They're parameters only to aid in debugging of
579 2 : // cockroachdb/cockroach#70154. Since they're parameters, their
580 2 : // arguments will appear within stack traces should we encounter
581 2 : // a reproduction.
582 2 : if c.countCold != countColdDebug || c.sizeCold != sizeColdDebug {
583 0 : panic(fmt.Sprintf("runHandCold: cold count and size are %d, %d, arguments are %d and %d",
584 0 : c.countCold, c.sizeCold, countColdDebug, sizeColdDebug))
585 : }
586 :
587 2 : e := c.handCold
588 2 : if e.ptype == etCold {
589 2 : if e.referenced.Load() {
590 2 : e.referenced.Store(false)
591 2 : e.ptype = etHot
592 2 : c.sizeCold -= e.size
593 2 : c.countCold--
594 2 : c.sizeHot += e.size
595 2 : c.countHot++
596 2 : } else {
597 2 : e.setValue(nil)
598 2 : e.ptype = etTest
599 2 : c.sizeCold -= e.size
600 2 : c.countCold--
601 2 : c.sizeTest += e.size
602 2 : c.countTest++
603 2 : for c.targetSize() < c.sizeTest && c.handTest != nil {
604 2 : c.runHandTest()
605 2 : }
606 : }
607 : }
608 :
609 2 : c.handCold = c.handCold.next()
610 2 :
611 2 : for c.targetSize()-c.coldTarget <= c.sizeHot && c.handHot != nil {
612 2 : c.runHandHot()
613 2 : }
614 : }
615 :
616 2 : func (c *shard) runHandHot() {
617 2 : if c.handHot == c.handTest && c.handTest != nil {
618 2 : c.runHandTest()
619 2 : if c.handHot == nil {
620 2 : return
621 2 : }
622 : }
623 :
624 2 : e := c.handHot
625 2 : if e.ptype == etHot {
626 2 : if e.referenced.Load() {
627 2 : e.referenced.Store(false)
628 2 : } else {
629 2 : e.ptype = etCold
630 2 : c.sizeHot -= e.size
631 2 : c.countHot--
632 2 : c.sizeCold += e.size
633 2 : c.countCold++
634 2 : }
635 : }
636 :
637 2 : c.handHot = c.handHot.next()
638 : }
639 :
640 2 : func (c *shard) runHandTest() {
641 2 : if c.sizeCold > 0 && c.handTest == c.handCold && c.handCold != nil {
642 2 : // sizeCold is > 0, so assert that countCold == 0. See the
643 2 : // comment above count{Hot,Cold,Test}.
644 2 : if c.countCold == 0 {
645 0 : panic(fmt.Sprintf("pebble: mismatch %d cold size, %d cold count", c.sizeCold, c.countCold))
646 : }
647 :
648 2 : c.runHandCold(c.countCold, c.sizeCold)
649 2 : if c.handTest == nil {
650 2 : return
651 2 : }
652 : }
653 :
654 2 : e := c.handTest
655 2 : if e.ptype == etTest {
656 2 : c.sizeTest -= e.size
657 2 : c.countTest--
658 2 : c.coldTarget -= e.size
659 2 : if c.coldTarget < 0 {
660 2 : c.coldTarget = 0
661 2 : }
662 2 : c.metaDel(e).release()
663 2 : c.metaCheck(e)
664 2 : e.free()
665 : }
666 :
667 2 : c.handTest = c.handTest.next()
668 : }
669 :
670 : // Metrics holds metrics for the cache.
671 : type Metrics struct {
672 : // The number of bytes inuse by the cache.
673 : Size int64
674 : // The count of objects (blocks or tables) in the cache.
675 : Count int64
676 : // The number of cache hits.
677 : Hits int64
678 : // The number of cache misses.
679 : Misses int64
680 : }
681 :
682 : // Cache implements Pebble's sharded block cache. The Clock-PRO algorithm is
683 : // used for page replacement
684 : // (http://static.usenix.org/event/usenix05/tech/general/full_papers/jiang/jiang_html/html.html). In
685 : // order to provide better concurrency, 4 x NumCPUs shards are created, with
686 : // each shard being given 1/n of the target cache size. The Clock-PRO algorithm
687 : // is run independently on each shard.
688 : //
689 : // Blocks are keyed by an (id, fileNum, offset) triple. The ID is a namespace
690 : // for file numbers and allows a single Cache to be shared between multiple
691 : // Pebble instances. The fileNum and offset refer to an sstable file number and
692 : // the offset of the block within the file. Because sstables are immutable and
693 : // file numbers are never reused, (fileNum,offset) are unique for the lifetime
694 : // of a Pebble instance.
695 : //
696 : // In addition to maintaining a map from (fileNum,offset) to data, each shard
697 : // maintains a map of the cached blocks for a particular fileNum. This allows
698 : // efficient eviction of all of the blocks for a file which is used when an
699 : // sstable is deleted from disk.
700 : //
701 : // # Memory Management
702 : //
703 : // A normal implementation of the block cache would result in GC having to read
704 : // through all the structures and keep track of the liveness of many objects.
705 : // This was found to cause significant overhead in CRDB when compared to the
706 : // earlier use of RocksDB.
707 : //
708 : // In order to reduce pressure on the Go GC, manual memory management is
709 : // performed for the data stored in the cache. Manual memory management is
710 : // performed by calling into C.{malloc,free} to allocate memory; this memory is
711 : // outside the purview of the GC. Cache.Values are reference counted and the
712 : // memory backing a manual value is freed when the reference count drops to 0.
713 : //
714 : // Manual memory management brings the possibility of memory leaks. It is
715 : // imperative that every Handle returned by Cache.{Get,Set} is eventually
716 : // released. The "invariants" build tag enables a leak detection facility that
717 : // places a GC finalizer on cache.Value. When the cache.Value finalizer is run,
718 : // if the underlying buffer is still present a leak has occurred. The "tracing"
719 : // build tag enables tracing of cache.Value reference count manipulation and
720 : // eases finding where a leak has occurred. These two facilities are usually
721 : // used in combination by specifying `-tags invariants,tracing`. Note that
722 : // "tracing" produces a significant slowdown, while "invariants" does not.
723 : type Cache struct {
724 : refs atomic.Int64
725 : maxSize int64
726 : idAlloc atomic.Uint64
727 : shards []shard
728 :
729 : // Traces recorded by Cache.trace. Used for debugging.
730 : tr struct {
731 : sync.Mutex
732 : msgs []string
733 : }
734 : }
735 :
736 : // ID is a namespace for file numbers. It allows a single Cache to be shared
737 : // among multiple Pebble instances. NewID can be used to generate a new ID that
738 : // is unique in the context of this cache.
739 : type ID uint64
740 :
741 : // New creates a new cache of the specified size. Memory for the cache is
742 : // allocated on demand, not during initialization. The cache is created with a
743 : // reference count of 1. Each DB it is associated with adds a reference, so the
744 : // creator of the cache should usually release their reference after the DB is
745 : // created.
746 : //
747 : // c := cache.New(...)
748 : // defer c.Unref()
749 : // d, err := pebble.Open(pebble.Options{Cache: c})
750 2 : func New(size int64) *Cache {
751 2 : // How many cache shards should we create?
752 2 : //
753 2 : // Note that the probability two processors will try to access the same
754 2 : // shard at the same time increases superlinearly with the number of
755 2 : // processors (Eg, consider the brithday problem where each CPU is a person,
756 2 : // and each shard is a possible birthday).
757 2 : //
758 2 : // We could consider growing the number of shards superlinearly, but
759 2 : // increasing the shard count may reduce the effectiveness of the caching
760 2 : // algorithm if frequently-accessed blocks are insufficiently distributed
761 2 : // across shards.
762 2 : //
763 2 : // Experimentally, we've observed contention contributing to tail latencies
764 2 : // at 2 shards per processor. For now we use 4 shards per processor,
765 2 : // recognizing this may not be final word.
766 2 : m := 4 * runtime.GOMAXPROCS(0)
767 2 :
768 2 : // In tests we can use large CPU machines with small cache sizes and have
769 2 : // many caches in existence at a time. If sharding into m shards would
770 2 : // produce too small shards, constrain the number of shards to 4.
771 2 : const minimumShardSize = 4 << 20 // 4 MiB
772 2 : if m > 4 && int(size)/m < minimumShardSize {
773 2 : m = 4
774 2 : }
775 2 : return newShards(size, m)
776 : }
777 :
778 2 : func newShards(size int64, shards int) *Cache {
779 2 : c := &Cache{
780 2 : maxSize: size,
781 2 : shards: make([]shard, shards),
782 2 : }
783 2 : c.refs.Store(1)
784 2 : c.idAlloc.Store(1)
785 2 : c.trace("alloc", c.refs.Load())
786 2 : for i := range c.shards {
787 2 : c.shards[i].init(size / int64(len(c.shards)))
788 2 : }
789 :
790 : // Note: this is a no-op if invariants are disabled or race is enabled.
791 2 : invariants.SetFinalizer(c, func(obj interface{}) {
792 2 : c := obj.(*Cache)
793 2 : if v := c.refs.Load(); v != 0 {
794 0 : c.tr.Lock()
795 0 : fmt.Fprintf(os.Stderr,
796 0 : "pebble: cache (%p) has non-zero reference count: %d\n", c, v)
797 0 : if len(c.tr.msgs) > 0 {
798 0 : fmt.Fprintf(os.Stderr, "%s\n", strings.Join(c.tr.msgs, "\n"))
799 0 : }
800 0 : c.tr.Unlock()
801 0 : os.Exit(1)
802 : }
803 : })
804 2 : return c
805 : }
806 :
807 2 : func (c *Cache) getShard(id ID, fileNum base.DiskFileNum, offset uint64) *shard {
808 2 : if id == 0 {
809 0 : panic("pebble: 0 cache ID is invalid")
810 : }
811 :
812 : // Inlined version of fnv.New64 + Write.
813 2 : const offset64 = 14695981039346656037
814 2 : const prime64 = 1099511628211
815 2 :
816 2 : h := uint64(offset64)
817 2 : for i := 0; i < 8; i++ {
818 2 : h *= prime64
819 2 : h ^= uint64(id & 0xff)
820 2 : id >>= 8
821 2 : }
822 2 : fileNumVal := uint64(fileNum)
823 2 : for i := 0; i < 8; i++ {
824 2 : h *= prime64
825 2 : h ^= uint64(fileNumVal) & 0xff
826 2 : fileNumVal >>= 8
827 2 : }
828 2 : for i := 0; i < 8; i++ {
829 2 : h *= prime64
830 2 : h ^= uint64(offset & 0xff)
831 2 : offset >>= 8
832 2 : }
833 :
834 2 : return &c.shards[h%uint64(len(c.shards))]
835 : }
836 :
837 : // Ref adds a reference to the cache. The cache only remains valid as long a
838 : // reference is maintained to it.
839 2 : func (c *Cache) Ref() {
840 2 : v := c.refs.Add(1)
841 2 : if v <= 1 {
842 1 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
843 : }
844 2 : c.trace("ref", v)
845 : }
846 :
847 : // Unref releases a reference on the cache.
848 2 : func (c *Cache) Unref() {
849 2 : v := c.refs.Add(-1)
850 2 : c.trace("unref", v)
851 2 : switch {
852 0 : case v < 0:
853 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
854 2 : case v == 0:
855 2 : for i := range c.shards {
856 2 : c.shards[i].Free()
857 2 : }
858 : }
859 : }
860 :
861 : // Get retrieves the cache value for the specified file and offset, returning
862 : // nil if no value is present.
863 2 : func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle {
864 2 : h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry(
865 2 : id, fileNum, offset, false)
866 2 : if invariants.Enabled && re != nil {
867 0 : panic("readEntry should be nil")
868 : }
869 2 : return h
870 : }
871 :
872 : // GetWithReadHandle retrieves the cache value for the specified ID, fileNum
873 : // and offset. If found, a valid Handle is returned (with cacheHit set to
874 : // true), else a valid ReadHandle is returned.
875 : //
876 : // See the ReadHandle declaration for the contract the caller must satisfy
877 : // when getting a valid ReadHandle.
878 : //
879 : // This method can block before returning since multiple concurrent gets for
880 : // the same cache value will take turns getting a ReadHandle, which represents
881 : // permission to do the read. This blocking respects context cancellation, in
882 : // which case an error is returned (and not a valid ReadHandle).
883 : //
884 : // When blocking, the errorDuration return value can be non-zero and is
885 : // populated with the total duration that other readers that observed an error
886 : // (see ReadHandle.SetReadError) spent in doing the read. This duration can be
887 : // greater than the time spent blocked in this method, since some of these
888 : // errors could have occurred prior to this call. But it serves as a rough
889 : // indicator of whether turn taking could have caused higher latency due to
890 : // context cancellation of other readers.
891 : //
892 : // While waiting, someone else may successfully read the value, which results
893 : // in a valid Handle being returned. This is a case where cacheHit=false.
894 : func (c *Cache) GetWithReadHandle(
895 : ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64,
896 2 : ) (h Handle, rh ReadHandle, errorDuration time.Duration, cacheHit bool, err error) {
897 2 : h, re := c.getShard(id, fileNum, offset).getWithMaybeReadEntry(
898 2 : id, fileNum, offset, true)
899 2 : if h.Valid() {
900 2 : return h, ReadHandle{}, 0, true, nil
901 2 : }
902 2 : h, errorDuration, err = re.waitForReadPermissionOrHandle(ctx)
903 2 : if err != nil || h.Valid() {
904 2 : return h, ReadHandle{}, errorDuration, false, err
905 2 : }
906 2 : return Handle{}, ReadHandle{entry: re}, errorDuration, false, nil
907 : }
908 :
909 : // Set sets the cache value for the specified file and offset, overwriting an
910 : // existing value if present. A Handle is returned which provides faster
911 : // retrieval of the cached value than Get (lock-free and avoidance of the map
912 : // lookup). The value must have been allocated by Cache.Alloc.
913 1 : func (c *Cache) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle {
914 1 : return c.getShard(id, fileNum, offset).Set(id, fileNum, offset, value)
915 1 : }
916 :
917 : // Delete deletes the cached value for the specified file and offset.
918 2 : func (c *Cache) Delete(id ID, fileNum base.DiskFileNum, offset uint64) {
919 2 : c.getShard(id, fileNum, offset).Delete(id, fileNum, offset)
920 2 : }
921 :
922 : // EvictFile evicts all of the cache values for the specified file.
923 2 : func (c *Cache) EvictFile(id ID, fileNum base.DiskFileNum) {
924 2 : if id == 0 {
925 0 : panic("pebble: 0 cache ID is invalid")
926 : }
927 2 : for i := range c.shards {
928 2 : c.shards[i].EvictFile(id, fileNum)
929 2 : }
930 : }
931 :
932 : // MaxSize returns the max size of the cache.
933 2 : func (c *Cache) MaxSize() int64 {
934 2 : return c.maxSize
935 2 : }
936 :
937 : // Size returns the current space used by the cache.
938 1 : func (c *Cache) Size() int64 {
939 1 : var size int64
940 1 : for i := range c.shards {
941 1 : size += c.shards[i].Size()
942 1 : }
943 1 : return size
944 : }
945 :
946 : // Alloc allocates a byte slice of the specified size, possibly reusing
947 : // previously allocated but unused memory. The memory backing the value is
948 : // manually managed. The caller MUST either add the value to the cache (via
949 : // Cache.Set), or release the value (via Cache.Free). Failure to do so will
950 : // result in a memory leak.
951 2 : func Alloc(n int) *Value {
952 2 : return newValue(n)
953 2 : }
954 :
955 : // Free frees the specified value. The buffer associated with the value will
956 : // possibly be reused, making it invalid to use the buffer after calling
957 : // Free. Do not call Free on a value that has been added to the cache.
958 2 : func Free(v *Value) {
959 2 : if n := v.refs(); n > 1 {
960 0 : panic(fmt.Sprintf("pebble: Value has been added to the cache: refs=%d", n))
961 : }
962 2 : v.release()
963 : }
964 :
965 : // Reserve N bytes in the cache. This effectively shrinks the size of the cache
966 : // by N bytes, without actually consuming any memory. The returned closure
967 : // should be invoked to release the reservation.
968 2 : func (c *Cache) Reserve(n int) func() {
969 2 : // Round-up the per-shard reservation. Most reservations should be large, so
970 2 : // this probably doesn't matter in practice.
971 2 : shardN := (n + len(c.shards) - 1) / len(c.shards)
972 2 : for i := range c.shards {
973 2 : c.shards[i].Reserve(shardN)
974 2 : }
975 2 : return func() {
976 2 : if shardN == -1 {
977 1 : panic("pebble: cache reservation already released")
978 : }
979 2 : for i := range c.shards {
980 2 : c.shards[i].Reserve(-shardN)
981 2 : }
982 2 : shardN = -1
983 : }
984 : }
985 :
986 : // Metrics returns the metrics for the cache.
987 2 : func (c *Cache) Metrics() Metrics {
988 2 : var m Metrics
989 2 : for i := range c.shards {
990 2 : s := &c.shards[i]
991 2 : s.mu.RLock()
992 2 : m.Count += int64(s.blocks.Len())
993 2 : m.Size += s.sizeHot + s.sizeCold
994 2 : s.mu.RUnlock()
995 2 : m.Hits += s.hits.Load()
996 2 : m.Misses += s.misses.Load()
997 2 : }
998 2 : return m
999 : }
1000 :
1001 : // NewID returns a new ID to be used as a namespace for cached file
1002 : // blocks.
1003 2 : func (c *Cache) NewID() ID {
1004 2 : return ID(c.idAlloc.Add(1))
1005 2 : }
|