Line data Source code
1 : // Copyright 2018. All rights reserved. Use of this source code is governed by
2 : // an MIT-style license that can be found in the LICENSE file.
3 :
4 : // Package cache implements the CLOCK-Pro caching algorithm.
5 : //
6 : // CLOCK-Pro is a patent-free alternative to the Adaptive Replacement Cache,
7 : // https://en.wikipedia.org/wiki/Adaptive_replacement_cache.
8 : // It is an approximation of LIRS ( https://en.wikipedia.org/wiki/LIRS_caching_algorithm ),
9 : // much like the CLOCK page replacement algorithm is an approximation of LRU.
10 : //
11 : // This implementation is based on the python code from https://bitbucket.org/SamiLehtinen/pyclockpro .
12 : //
13 : // Slides describing the algorithm: http://fr.slideshare.net/huliang64/clockpro
14 : //
15 : // The original paper: http://static.usenix.org/event/usenix05/tech/general/full_papers/jiang/jiang_html/html.html
16 : //
17 : // It is MIT licensed, like the original.
18 : package cache // import "github.com/cockroachdb/pebble/internal/cache"
19 :
20 : import (
21 : "fmt"
22 : "os"
23 : "runtime"
24 : "runtime/debug"
25 : "strings"
26 : "sync"
27 : "sync/atomic"
28 :
29 : "github.com/cockroachdb/pebble/internal/base"
30 : "github.com/cockroachdb/pebble/internal/invariants"
31 : )
32 :
33 : type fileKey struct {
34 : // id is the namespace for fileNums.
35 : id uint64
36 : fileNum base.DiskFileNum
37 : }
38 :
39 : type key struct {
40 : fileKey
41 : offset uint64
42 : }
43 :
44 : // file returns the "file key" for the receiver. This is the key used for the
45 : // shard.files map.
46 1 : func (k key) file() key {
47 1 : k.offset = 0
48 1 : return k
49 1 : }
50 :
51 0 : func (k key) String() string {
52 0 : return fmt.Sprintf("%d/%d/%d", k.id, k.fileNum, k.offset)
53 0 : }
54 :
55 : // Handle provides a strong reference to a value in the cache. The reference
56 : // does not pin the value in the cache, but it does prevent the underlying byte
57 : // slice from being reused.
58 : type Handle struct {
59 : value *Value
60 : }
61 :
62 : // Get returns the value stored in handle.
63 1 : func (h Handle) Get() []byte {
64 1 : if h.value != nil {
65 1 : // NB: We don't increment shard.hits in this code path because we only want
66 1 : // to record a hit when the handle is retrieved from the cache.
67 1 : return h.value.buf
68 1 : }
69 1 : return nil
70 : }
71 :
72 : // Release releases the reference to the cache entry.
73 1 : func (h Handle) Release() {
74 1 : h.value.release()
75 1 : }
76 :
77 : type shard struct {
78 : hits atomic.Int64
79 : misses atomic.Int64
80 :
81 : mu sync.RWMutex
82 :
83 : reservedSize int64
84 : maxSize int64
85 : coldTarget int64
86 : blocks blockMap // fileNum+offset -> block
87 : files blockMap // fileNum -> list of blocks
88 :
89 : // The blocks and files maps store values in manually managed memory that is
90 : // invisible to the Go GC. This is fine for Value and entry objects that are
91 : // stored in manually managed memory, but when the "invariants" build tag is
92 : // set, all Value and entry objects are Go allocated and the entries map will
93 : // contain a reference to every entry.
94 : entries map[*entry]struct{}
95 :
96 : handHot *entry
97 : handCold *entry
98 : handTest *entry
99 :
100 : sizeHot int64
101 : sizeCold int64
102 : sizeTest int64
103 :
104 : // The count fields are used exclusively for asserting expectations.
105 : // We've seen infinite looping (cockroachdb/cockroach#70154) that
106 : // could be explained by a corrupted sizeCold. Through asserting on
107 : // these fields, we hope to gain more insight from any future
108 : // reproductions.
109 : countHot int64
110 : countCold int64
111 : countTest int64
112 : }
113 :
114 1 : func (c *shard) Get(id uint64, fileNum base.DiskFileNum, offset uint64) Handle {
115 1 : c.mu.RLock()
116 1 : var value *Value
117 1 : if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil {
118 1 : value = e.acquireValue()
119 1 : if value != nil {
120 1 : e.referenced.Store(true)
121 1 : }
122 : }
123 1 : c.mu.RUnlock()
124 1 : if value == nil {
125 1 : c.misses.Add(1)
126 1 : return Handle{}
127 1 : }
128 1 : c.hits.Add(1)
129 1 : return Handle{value: value}
130 : }
131 :
132 1 : func (c *shard) Set(id uint64, fileNum base.DiskFileNum, offset uint64, value *Value) Handle {
133 1 : if n := value.refs(); n != 1 {
134 0 : panic(fmt.Sprintf("pebble: Value has already been added to the cache: refs=%d", n))
135 : }
136 :
137 1 : c.mu.Lock()
138 1 : defer c.mu.Unlock()
139 1 :
140 1 : k := key{fileKey{id, fileNum}, offset}
141 1 : e, _ := c.blocks.Get(k)
142 1 :
143 1 : switch {
144 1 : case e == nil:
145 1 : // no cache entry? add it
146 1 : e = newEntry(c, k, int64(len(value.buf)))
147 1 : e.setValue(value)
148 1 : if c.metaAdd(k, e) {
149 1 : value.ref.trace("add-cold")
150 1 : c.sizeCold += e.size
151 1 : c.countCold++
152 1 : } else {
153 1 : value.ref.trace("skip-cold")
154 1 : e.free()
155 1 : e = nil
156 1 : }
157 :
158 1 : case e.peekValue() != nil:
159 1 : // cache entry was a hot or cold page
160 1 : e.setValue(value)
161 1 : e.referenced.Store(true)
162 1 : delta := int64(len(value.buf)) - e.size
163 1 : e.size = int64(len(value.buf))
164 1 : if e.ptype == etHot {
165 1 : value.ref.trace("add-hot")
166 1 : c.sizeHot += delta
167 1 : } else {
168 1 : value.ref.trace("add-cold")
169 1 : c.sizeCold += delta
170 1 : }
171 1 : c.evict()
172 :
173 1 : default:
174 1 : // cache entry was a test page
175 1 : c.sizeTest -= e.size
176 1 : c.countTest--
177 1 : c.metaDel(e).release()
178 1 : c.metaCheck(e)
179 1 :
180 1 : e.size = int64(len(value.buf))
181 1 : c.coldTarget += e.size
182 1 : if c.coldTarget > c.targetSize() {
183 1 : c.coldTarget = c.targetSize()
184 1 : }
185 :
186 1 : e.referenced.Store(false)
187 1 : e.setValue(value)
188 1 : e.ptype = etHot
189 1 : if c.metaAdd(k, e) {
190 1 : value.ref.trace("add-hot")
191 1 : c.sizeHot += e.size
192 1 : c.countHot++
193 1 : } else {
194 1 : value.ref.trace("skip-hot")
195 1 : e.free()
196 1 : e = nil
197 1 : }
198 : }
199 :
200 1 : c.checkConsistency()
201 1 :
202 1 : // Values are initialized with a reference count of 1. That reference count
203 1 : // is being transferred to the returned Handle.
204 1 : return Handle{value: value}
205 : }
206 :
207 1 : func (c *shard) checkConsistency() {
208 1 : // See the comment above the count{Hot,Cold,Test} fields.
209 1 : switch {
210 0 : case c.sizeHot < 0 || c.sizeCold < 0 || c.sizeTest < 0 || c.countHot < 0 || c.countCold < 0 || c.countTest < 0:
211 0 : panic(fmt.Sprintf("pebble: unexpected negative: %d (%d bytes) hot, %d (%d bytes) cold, %d (%d bytes) test",
212 0 : c.countHot, c.sizeHot, c.countCold, c.sizeCold, c.countTest, c.sizeTest))
213 0 : case c.sizeHot > 0 && c.countHot == 0:
214 0 : panic(fmt.Sprintf("pebble: mismatch %d hot size, %d hot count", c.sizeHot, c.countHot))
215 0 : case c.sizeCold > 0 && c.countCold == 0:
216 0 : panic(fmt.Sprintf("pebble: mismatch %d cold size, %d cold count", c.sizeCold, c.countCold))
217 0 : case c.sizeTest > 0 && c.countTest == 0:
218 0 : panic(fmt.Sprintf("pebble: mismatch %d test size, %d test count", c.sizeTest, c.countTest))
219 : }
220 : }
221 :
222 : // Delete deletes the cached value for the specified file and offset.
223 1 : func (c *shard) Delete(id uint64, fileNum base.DiskFileNum, offset uint64) {
224 1 : // The common case is there is nothing to delete, so do a quick check with
225 1 : // shared lock.
226 1 : k := key{fileKey{id, fileNum}, offset}
227 1 : c.mu.RLock()
228 1 : _, exists := c.blocks.Get(k)
229 1 : c.mu.RUnlock()
230 1 : if !exists {
231 1 : return
232 1 : }
233 :
234 0 : var deletedValue *Value
235 0 : func() {
236 0 : c.mu.Lock()
237 0 : defer c.mu.Unlock()
238 0 :
239 0 : e, _ := c.blocks.Get(k)
240 0 : if e == nil {
241 0 : return
242 0 : }
243 0 : deletedValue = c.metaEvict(e)
244 0 : c.checkConsistency()
245 : }()
246 : // Now that the mutex has been dropped, release the reference which will
247 : // potentially free the memory associated with the previous cached value.
248 0 : deletedValue.release()
249 : }
250 :
251 : // EvictFile evicts all of the cache values for the specified file.
252 1 : func (c *shard) EvictFile(id uint64, fileNum base.DiskFileNum) {
253 1 : fkey := key{fileKey{id, fileNum}, 0}
254 1 : for c.evictFileRun(fkey) {
255 1 : // Sched switch to give another goroutine an opportunity to acquire the
256 1 : // shard mutex.
257 1 : runtime.Gosched()
258 1 : }
259 : }
260 :
261 1 : func (c *shard) evictFileRun(fkey key) (moreRemaining bool) {
262 1 : // If most of the file's blocks are held in the block cache, evicting all
263 1 : // the blocks may take a while. We don't want to block the entire cache
264 1 : // shard, forcing concurrent readers to wait until we're finished. We drop
265 1 : // the mutex every [blocksPerMutexAcquisition] blocks to give other
266 1 : // goroutines an opportunity to make progress.
267 1 : const blocksPerMutexAcquisition = 5
268 1 : c.mu.Lock()
269 1 :
270 1 : // Releasing a value may result in free-ing it back to the memory allocator.
271 1 : // This can have a nontrivial cost that we'd prefer to not pay while holding
272 1 : // the shard mutex, so we collect the evicted values in a local slice and
273 1 : // only release them in a defer after dropping the cache mutex.
274 1 : var obsoleteValuesAlloc [blocksPerMutexAcquisition]*Value
275 1 : obsoleteValues := obsoleteValuesAlloc[:0]
276 1 : defer func() {
277 1 : c.mu.Unlock()
278 1 : for _, v := range obsoleteValues {
279 1 : v.release()
280 1 : }
281 : }()
282 :
283 1 : blocks, _ := c.files.Get(fkey)
284 1 : if blocks == nil {
285 1 : // No blocks for this file.
286 1 : return false
287 1 : }
288 :
289 : // b is the current head of the doubly linked list, and n is the entry after b.
290 1 : for b, n := blocks, (*entry)(nil); len(obsoleteValues) < cap(obsoleteValues); b = n {
291 1 : n = b.fileLink.next
292 1 : obsoleteValues = append(obsoleteValues, c.metaEvict(b))
293 1 : if b == n {
294 1 : // b == n represents the case where b was the last entry remaining
295 1 : // in the doubly linked list, which is why it pointed at itself. So
296 1 : // no more entries left.
297 1 : c.checkConsistency()
298 1 : return false
299 1 : }
300 : }
301 : // Exhausted blocksPerMutexAcquisition.
302 1 : return true
303 : }
304 :
305 1 : func (c *shard) Free() {
306 1 : c.mu.Lock()
307 1 : defer c.mu.Unlock()
308 1 :
309 1 : // NB: we use metaDel rather than metaEvict in order to avoid the expensive
310 1 : // metaCheck call when the "invariants" build tag is specified.
311 1 : for c.handHot != nil {
312 1 : e := c.handHot
313 1 : c.metaDel(c.handHot).release()
314 1 : e.free()
315 1 : }
316 :
317 1 : c.blocks.Close()
318 1 : c.files.Close()
319 : }
320 :
321 1 : func (c *shard) Reserve(n int) {
322 1 : c.mu.Lock()
323 1 : defer c.mu.Unlock()
324 1 : c.reservedSize += int64(n)
325 1 :
326 1 : // Changing c.reservedSize will either increase or decrease
327 1 : // the targetSize. But we want coldTarget to be in the range
328 1 : // [0, targetSize]. So, if c.targetSize decreases, make sure
329 1 : // that the coldTarget fits within the limits.
330 1 : targetSize := c.targetSize()
331 1 : if c.coldTarget > targetSize {
332 1 : c.coldTarget = targetSize
333 1 : }
334 :
335 1 : c.evict()
336 1 : c.checkConsistency()
337 : }
338 :
339 : // Size returns the current space used by the cache.
340 0 : func (c *shard) Size() int64 {
341 0 : c.mu.RLock()
342 0 : size := c.sizeHot + c.sizeCold
343 0 : c.mu.RUnlock()
344 0 : return size
345 0 : }
346 :
347 1 : func (c *shard) targetSize() int64 {
348 1 : target := c.maxSize - c.reservedSize
349 1 : // Always return a positive integer for targetSize. This is so that we don't
350 1 : // end up in an infinite loop in evict(), in cases where reservedSize is
351 1 : // greater than or equal to maxSize.
352 1 : if target < 1 {
353 1 : return 1
354 1 : }
355 1 : return target
356 : }
357 :
358 : // Add the entry to the cache, returning true if the entry was added and false
359 : // if it would not fit in the cache.
360 1 : func (c *shard) metaAdd(key key, e *entry) bool {
361 1 : c.evict()
362 1 : if e.size > c.targetSize() {
363 1 : // The entry is larger than the target cache size.
364 1 : return false
365 1 : }
366 :
367 1 : c.blocks.Put(key, e)
368 1 : if entriesGoAllocated {
369 1 : // Go allocated entries need to be referenced from Go memory. The entries
370 1 : // map provides that reference.
371 1 : c.entries[e] = struct{}{}
372 1 : }
373 :
374 1 : if c.handHot == nil {
375 1 : // first element
376 1 : c.handHot = e
377 1 : c.handCold = e
378 1 : c.handTest = e
379 1 : } else {
380 1 : c.handHot.link(e)
381 1 : }
382 :
383 1 : if c.handCold == c.handHot {
384 1 : c.handCold = c.handCold.prev()
385 1 : }
386 :
387 1 : fkey := key.file()
388 1 : if fileBlocks, _ := c.files.Get(fkey); fileBlocks == nil {
389 1 : c.files.Put(fkey, e)
390 1 : } else {
391 1 : fileBlocks.linkFile(e)
392 1 : }
393 1 : return true
394 : }
395 :
396 : // Remove the entry from the cache. This removes the entry from the blocks map,
397 : // the files map, and ensures that hand{Hot,Cold,Test} are not pointing at the
398 : // entry. Returns the deleted value that must be released, if any.
399 1 : func (c *shard) metaDel(e *entry) (deletedValue *Value) {
400 1 : if value := e.peekValue(); value != nil {
401 1 : value.ref.trace("metaDel")
402 1 : }
403 : // Remove the pointer to the value.
404 1 : deletedValue = e.val
405 1 : e.val = nil
406 1 :
407 1 : c.blocks.Delete(e.key)
408 1 : if entriesGoAllocated {
409 1 : // Go allocated entries need to be referenced from Go memory. The entries
410 1 : // map provides that reference.
411 1 : delete(c.entries, e)
412 1 : }
413 :
414 1 : if e == c.handHot {
415 1 : c.handHot = c.handHot.prev()
416 1 : }
417 1 : if e == c.handCold {
418 1 : c.handCold = c.handCold.prev()
419 1 : }
420 1 : if e == c.handTest {
421 1 : c.handTest = c.handTest.prev()
422 1 : }
423 :
424 1 : if e.unlink() == e {
425 1 : // This was the last entry in the cache.
426 1 : c.handHot = nil
427 1 : c.handCold = nil
428 1 : c.handTest = nil
429 1 : }
430 :
431 1 : fkey := e.key.file()
432 1 : if next := e.unlinkFile(); e == next {
433 1 : c.files.Delete(fkey)
434 1 : } else {
435 1 : c.files.Put(fkey, next)
436 1 : }
437 1 : return deletedValue
438 : }
439 :
440 : // Check that the specified entry is not referenced by the cache.
441 1 : func (c *shard) metaCheck(e *entry) {
442 1 : if invariants.Enabled {
443 1 : if _, ok := c.entries[e]; ok {
444 0 : fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in entries map\n%s",
445 0 : e, e.key, debug.Stack())
446 0 : os.Exit(1)
447 0 : }
448 1 : if c.blocks.findByValue(e) {
449 0 : fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in blocks map\n%#v\n%s",
450 0 : e, e.key, &c.blocks, debug.Stack())
451 0 : os.Exit(1)
452 0 : }
453 1 : if c.files.findByValue(e) {
454 0 : fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in files map\n%#v\n%s",
455 0 : e, e.key, &c.files, debug.Stack())
456 0 : os.Exit(1)
457 0 : }
458 : // NB: c.hand{Hot,Cold,Test} are pointers into a single linked list. We
459 : // only have to traverse one of them to check all of them.
460 1 : var countHot, countCold, countTest int64
461 1 : var sizeHot, sizeCold, sizeTest int64
462 1 : for t := c.handHot.next(); t != nil; t = t.next() {
463 1 : // Recompute count{Hot,Cold,Test} and size{Hot,Cold,Test}.
464 1 : switch t.ptype {
465 1 : case etHot:
466 1 : countHot++
467 1 : sizeHot += t.size
468 1 : case etCold:
469 1 : countCold++
470 1 : sizeCold += t.size
471 1 : case etTest:
472 1 : countTest++
473 1 : sizeTest += t.size
474 : }
475 1 : if e == t {
476 0 : fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in blocks list\n%s",
477 0 : e, e.key, debug.Stack())
478 0 : os.Exit(1)
479 0 : }
480 1 : if t == c.handHot {
481 1 : break
482 : }
483 : }
484 1 : if countHot != c.countHot || countCold != c.countCold || countTest != c.countTest ||
485 1 : sizeHot != c.sizeHot || sizeCold != c.sizeCold || sizeTest != c.sizeTest {
486 0 : fmt.Fprintf(os.Stderr, `divergence of Hot,Cold,Test statistics
487 0 : cache's statistics: hot %d, %d, cold %d, %d, test %d, %d
488 0 : recalculated statistics: hot %d, %d, cold %d, %d, test %d, %d\n%s`,
489 0 : c.countHot, c.sizeHot, c.countCold, c.sizeCold, c.countTest, c.sizeTest,
490 0 : countHot, sizeHot, countCold, sizeCold, countTest, sizeTest,
491 0 : debug.Stack())
492 0 : os.Exit(1)
493 0 : }
494 : }
495 : }
496 :
497 1 : func (c *shard) metaEvict(e *entry) (evictedValue *Value) {
498 1 : switch e.ptype {
499 1 : case etHot:
500 1 : c.sizeHot -= e.size
501 1 : c.countHot--
502 1 : case etCold:
503 1 : c.sizeCold -= e.size
504 1 : c.countCold--
505 1 : case etTest:
506 1 : c.sizeTest -= e.size
507 1 : c.countTest--
508 : }
509 1 : evictedValue = c.metaDel(e)
510 1 : c.metaCheck(e)
511 1 : e.free()
512 1 : return evictedValue
513 : }
514 :
515 1 : func (c *shard) evict() {
516 1 : for c.targetSize() <= c.sizeHot+c.sizeCold && c.handCold != nil {
517 1 : c.runHandCold(c.countCold, c.sizeCold)
518 1 : }
519 : }
520 :
521 1 : func (c *shard) runHandCold(countColdDebug, sizeColdDebug int64) {
522 1 : // countColdDebug and sizeColdDebug should equal c.countCold and
523 1 : // c.sizeCold. They're parameters only to aid in debugging of
524 1 : // cockroachdb/cockroach#70154. Since they're parameters, their
525 1 : // arguments will appear within stack traces should we encounter
526 1 : // a reproduction.
527 1 : if c.countCold != countColdDebug || c.sizeCold != sizeColdDebug {
528 0 : panic(fmt.Sprintf("runHandCold: cold count and size are %d, %d, arguments are %d and %d",
529 0 : c.countCold, c.sizeCold, countColdDebug, sizeColdDebug))
530 : }
531 :
532 1 : e := c.handCold
533 1 : if e.ptype == etCold {
534 1 : if e.referenced.Load() {
535 1 : e.referenced.Store(false)
536 1 : e.ptype = etHot
537 1 : c.sizeCold -= e.size
538 1 : c.countCold--
539 1 : c.sizeHot += e.size
540 1 : c.countHot++
541 1 : } else {
542 1 : e.setValue(nil)
543 1 : e.ptype = etTest
544 1 : c.sizeCold -= e.size
545 1 : c.countCold--
546 1 : c.sizeTest += e.size
547 1 : c.countTest++
548 1 : for c.targetSize() < c.sizeTest && c.handTest != nil {
549 1 : c.runHandTest()
550 1 : }
551 : }
552 : }
553 :
554 1 : c.handCold = c.handCold.next()
555 1 :
556 1 : for c.targetSize()-c.coldTarget <= c.sizeHot && c.handHot != nil {
557 1 : c.runHandHot()
558 1 : }
559 : }
560 :
561 1 : func (c *shard) runHandHot() {
562 1 : if c.handHot == c.handTest && c.handTest != nil {
563 1 : c.runHandTest()
564 1 : if c.handHot == nil {
565 1 : return
566 1 : }
567 : }
568 :
569 1 : e := c.handHot
570 1 : if e.ptype == etHot {
571 1 : if e.referenced.Load() {
572 1 : e.referenced.Store(false)
573 1 : } else {
574 1 : e.ptype = etCold
575 1 : c.sizeHot -= e.size
576 1 : c.countHot--
577 1 : c.sizeCold += e.size
578 1 : c.countCold++
579 1 : }
580 : }
581 :
582 1 : c.handHot = c.handHot.next()
583 : }
584 :
585 1 : func (c *shard) runHandTest() {
586 1 : if c.sizeCold > 0 && c.handTest == c.handCold && c.handCold != nil {
587 1 : // sizeCold is > 0, so assert that countCold == 0. See the
588 1 : // comment above count{Hot,Cold,Test}.
589 1 : if c.countCold == 0 {
590 0 : panic(fmt.Sprintf("pebble: mismatch %d cold size, %d cold count", c.sizeCold, c.countCold))
591 : }
592 :
593 1 : c.runHandCold(c.countCold, c.sizeCold)
594 1 : if c.handTest == nil {
595 1 : return
596 1 : }
597 : }
598 :
599 1 : e := c.handTest
600 1 : if e.ptype == etTest {
601 1 : c.sizeTest -= e.size
602 1 : c.countTest--
603 1 : c.coldTarget -= e.size
604 1 : if c.coldTarget < 0 {
605 1 : c.coldTarget = 0
606 1 : }
607 1 : c.metaDel(e).release()
608 1 : c.metaCheck(e)
609 1 : e.free()
610 : }
611 :
612 1 : c.handTest = c.handTest.next()
613 : }
614 :
615 : // Metrics holds metrics for the cache.
616 : type Metrics struct {
617 : // The number of bytes inuse by the cache.
618 : Size int64
619 : // The count of objects (blocks or tables) in the cache.
620 : Count int64
621 : // The number of cache hits.
622 : Hits int64
623 : // The number of cache misses.
624 : Misses int64
625 : }
626 :
627 : // Cache implements Pebble's sharded block cache. The Clock-PRO algorithm is
628 : // used for page replacement
629 : // (http://static.usenix.org/event/usenix05/tech/general/full_papers/jiang/jiang_html/html.html). In
630 : // order to provide better concurrency, 4 x NumCPUs shards are created, with
631 : // each shard being given 1/n of the target cache size. The Clock-PRO algorithm
632 : // is run independently on each shard.
633 : //
634 : // Blocks are keyed by an (id, fileNum, offset) triple. The ID is a namespace
635 : // for file numbers and allows a single Cache to be shared between multiple
636 : // Pebble instances. The fileNum and offset refer to an sstable file number and
637 : // the offset of the block within the file. Because sstables are immutable and
638 : // file numbers are never reused, (fileNum,offset) are unique for the lifetime
639 : // of a Pebble instance.
640 : //
641 : // In addition to maintaining a map from (fileNum,offset) to data, each shard
642 : // maintains a map of the cached blocks for a particular fileNum. This allows
643 : // efficient eviction of all of the blocks for a file which is used when an
644 : // sstable is deleted from disk.
645 : //
646 : // # Memory Management
647 : //
648 : // In order to reduce pressure on the Go GC, manual memory management is
649 : // performed for the data stored in the cache. Manual memory management is
650 : // performed by calling into C.{malloc,free} to allocate memory. Cache.Values
651 : // are reference counted and the memory backing a manual value is freed when
652 : // the reference count drops to 0.
653 : //
654 : // Manual memory management brings the possibility of memory leaks. It is
655 : // imperative that every Handle returned by Cache.{Get,Set} is eventually
656 : // released. The "invariants" build tag enables a leak detection facility that
657 : // places a GC finalizer on cache.Value. When the cache.Value finalizer is run,
658 : // if the underlying buffer is still present a leak has occurred. The "tracing"
659 : // build tag enables tracing of cache.Value reference count manipulation and
660 : // eases finding where a leak has occurred. These two facilities are usually
661 : // used in combination by specifying `-tags invariants,tracing`. Note that
662 : // "tracing" produces a significant slowdown, while "invariants" does not.
663 : type Cache struct {
664 : refs atomic.Int64
665 : maxSize int64
666 : idAlloc atomic.Uint64
667 : shards []shard
668 :
669 : // Traces recorded by Cache.trace. Used for debugging.
670 : tr struct {
671 : sync.Mutex
672 : msgs []string
673 : }
674 : }
675 :
676 : // New creates a new cache of the specified size. Memory for the cache is
677 : // allocated on demand, not during initialization. The cache is created with a
678 : // reference count of 1. Each DB it is associated with adds a reference, so the
679 : // creator of the cache should usually release their reference after the DB is
680 : // created.
681 : //
682 : // c := cache.New(...)
683 : // defer c.Unref()
684 : // d, err := pebble.Open(pebble.Options{Cache: c})
685 1 : func New(size int64) *Cache {
686 1 : // How many cache shards should we create?
687 1 : //
688 1 : // Note that the probability two processors will try to access the same
689 1 : // shard at the same time increases superlinearly with the number of
690 1 : // processors (Eg, consider the brithday problem where each CPU is a person,
691 1 : // and each shard is a possible birthday).
692 1 : //
693 1 : // We could consider growing the number of shards superlinearly, but
694 1 : // increasing the shard count may reduce the effectiveness of the caching
695 1 : // algorithm if frequently-accessed blocks are insufficiently distributed
696 1 : // across shards. If a shard's size is smaller than a single frequently
697 1 : // scanned sstable, then the shard will be unable to hold the entire
698 1 : // frequently-scanned table in memory despite other shards still holding
699 1 : // infrequently accessed blocks.
700 1 : //
701 1 : // Experimentally, we've observed contention contributing to tail latencies
702 1 : // at 2 shards per processor. For now we use 4 shards per processor,
703 1 : // recognizing this may not be final word.
704 1 : m := 4 * runtime.GOMAXPROCS(0)
705 1 :
706 1 : // In tests we can use large CPU machines with small cache sizes and have
707 1 : // many caches in existence at a time. If sharding into m shards would
708 1 : // produce too small shards, constrain the number of shards to 4.
709 1 : const minimumShardSize = 4 << 20 // 4 MiB
710 1 : if m > 4 && int(size)/m < minimumShardSize {
711 1 : m = 4
712 1 : }
713 1 : return newShards(size, m)
714 : }
715 :
716 1 : func newShards(size int64, shards int) *Cache {
717 1 : c := &Cache{
718 1 : maxSize: size,
719 1 : shards: make([]shard, shards),
720 1 : }
721 1 : c.refs.Store(1)
722 1 : c.idAlloc.Store(1)
723 1 : c.trace("alloc", c.refs.Load())
724 1 : for i := range c.shards {
725 1 : c.shards[i] = shard{
726 1 : maxSize: size / int64(len(c.shards)),
727 1 : coldTarget: size / int64(len(c.shards)),
728 1 : }
729 1 : if entriesGoAllocated {
730 1 : c.shards[i].entries = make(map[*entry]struct{})
731 1 : }
732 1 : c.shards[i].blocks.Init(16)
733 1 : c.shards[i].files.Init(16)
734 : }
735 :
736 : // Note: this is a no-op if invariants are disabled or race is enabled.
737 1 : invariants.SetFinalizer(c, func(obj interface{}) {
738 1 : c := obj.(*Cache)
739 1 : if v := c.refs.Load(); v != 0 {
740 0 : c.tr.Lock()
741 0 : fmt.Fprintf(os.Stderr,
742 0 : "pebble: cache (%p) has non-zero reference count: %d\n", c, v)
743 0 : if len(c.tr.msgs) > 0 {
744 0 : fmt.Fprintf(os.Stderr, "%s\n", strings.Join(c.tr.msgs, "\n"))
745 0 : }
746 0 : c.tr.Unlock()
747 0 : os.Exit(1)
748 : }
749 : })
750 1 : return c
751 : }
752 :
753 1 : func (c *Cache) getShard(id uint64, fileNum base.DiskFileNum, offset uint64) *shard {
754 1 : if id == 0 {
755 0 : panic("pebble: 0 cache ID is invalid")
756 : }
757 :
758 : // Inlined version of fnv.New64 + Write.
759 1 : const offset64 = 14695981039346656037
760 1 : const prime64 = 1099511628211
761 1 :
762 1 : h := uint64(offset64)
763 1 : for i := 0; i < 8; i++ {
764 1 : h *= prime64
765 1 : h ^= uint64(id & 0xff)
766 1 : id >>= 8
767 1 : }
768 1 : fileNumVal := uint64(fileNum)
769 1 : for i := 0; i < 8; i++ {
770 1 : h *= prime64
771 1 : h ^= uint64(fileNumVal) & 0xff
772 1 : fileNumVal >>= 8
773 1 : }
774 1 : for i := 0; i < 8; i++ {
775 1 : h *= prime64
776 1 : h ^= uint64(offset & 0xff)
777 1 : offset >>= 8
778 1 : }
779 :
780 1 : return &c.shards[h%uint64(len(c.shards))]
781 : }
782 :
783 : // Ref adds a reference to the cache. The cache only remains valid as long a
784 : // reference is maintained to it.
785 1 : func (c *Cache) Ref() {
786 1 : v := c.refs.Add(1)
787 1 : if v <= 1 {
788 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
789 : }
790 1 : c.trace("ref", v)
791 : }
792 :
793 : // Unref releases a reference on the cache.
794 1 : func (c *Cache) Unref() {
795 1 : v := c.refs.Add(-1)
796 1 : c.trace("unref", v)
797 1 : switch {
798 0 : case v < 0:
799 0 : panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
800 1 : case v == 0:
801 1 : for i := range c.shards {
802 1 : c.shards[i].Free()
803 1 : }
804 : }
805 : }
806 :
807 : // Get retrieves the cache value for the specified file and offset, returning
808 : // nil if no value is present.
809 1 : func (c *Cache) Get(id uint64, fileNum base.DiskFileNum, offset uint64) Handle {
810 1 : return c.getShard(id, fileNum, offset).Get(id, fileNum, offset)
811 1 : }
812 :
813 : // Set sets the cache value for the specified file and offset, overwriting an
814 : // existing value if present. A Handle is returned which provides faster
815 : // retrieval of the cached value than Get (lock-free and avoidance of the map
816 : // lookup). The value must have been allocated by Cache.Alloc.
817 1 : func (c *Cache) Set(id uint64, fileNum base.DiskFileNum, offset uint64, value *Value) Handle {
818 1 : return c.getShard(id, fileNum, offset).Set(id, fileNum, offset, value)
819 1 : }
820 :
821 : // Delete deletes the cached value for the specified file and offset.
822 1 : func (c *Cache) Delete(id uint64, fileNum base.DiskFileNum, offset uint64) {
823 1 : c.getShard(id, fileNum, offset).Delete(id, fileNum, offset)
824 1 : }
825 :
826 : // EvictFile evicts all of the cache values for the specified file.
827 1 : func (c *Cache) EvictFile(id uint64, fileNum base.DiskFileNum) {
828 1 : if id == 0 {
829 0 : panic("pebble: 0 cache ID is invalid")
830 : }
831 1 : for i := range c.shards {
832 1 : c.shards[i].EvictFile(id, fileNum)
833 1 : }
834 : }
835 :
836 : // MaxSize returns the max size of the cache.
837 1 : func (c *Cache) MaxSize() int64 {
838 1 : return c.maxSize
839 1 : }
840 :
841 : // Size returns the current space used by the cache.
842 0 : func (c *Cache) Size() int64 {
843 0 : var size int64
844 0 : for i := range c.shards {
845 0 : size += c.shards[i].Size()
846 0 : }
847 0 : return size
848 : }
849 :
850 : // Alloc allocates a byte slice of the specified size, possibly reusing
851 : // previously allocated but unused memory. The memory backing the value is
852 : // manually managed. The caller MUST either add the value to the cache (via
853 : // Cache.Set), or release the value (via Cache.Free). Failure to do so will
854 : // result in a memory leak.
855 1 : func Alloc(n int) *Value {
856 1 : return newValue(n)
857 1 : }
858 :
859 : // Free frees the specified value. The buffer associated with the value will
860 : // possibly be reused, making it invalid to use the buffer after calling
861 : // Free. Do not call Free on a value that has been added to the cache.
862 1 : func Free(v *Value) {
863 1 : if n := v.refs(); n > 1 {
864 0 : panic(fmt.Sprintf("pebble: Value has been added to the cache: refs=%d", n))
865 : }
866 1 : v.release()
867 : }
868 :
869 : // Reserve N bytes in the cache. This effectively shrinks the size of the cache
870 : // by N bytes, without actually consuming any memory. The returned closure
871 : // should be invoked to release the reservation.
872 1 : func (c *Cache) Reserve(n int) func() {
873 1 : // Round-up the per-shard reservation. Most reservations should be large, so
874 1 : // this probably doesn't matter in practice.
875 1 : shardN := (n + len(c.shards) - 1) / len(c.shards)
876 1 : for i := range c.shards {
877 1 : c.shards[i].Reserve(shardN)
878 1 : }
879 1 : return func() {
880 1 : if shardN == -1 {
881 0 : panic("pebble: cache reservation already released")
882 : }
883 1 : for i := range c.shards {
884 1 : c.shards[i].Reserve(-shardN)
885 1 : }
886 1 : shardN = -1
887 : }
888 : }
889 :
890 : // Metrics returns the metrics for the cache.
891 1 : func (c *Cache) Metrics() Metrics {
892 1 : var m Metrics
893 1 : for i := range c.shards {
894 1 : s := &c.shards[i]
895 1 : s.mu.RLock()
896 1 : m.Count += int64(s.blocks.Len())
897 1 : m.Size += s.sizeHot + s.sizeCold
898 1 : s.mu.RUnlock()
899 1 : m.Hits += s.hits.Load()
900 1 : m.Misses += s.misses.Load()
901 1 : }
902 1 : return m
903 : }
904 :
905 : // NewID returns a new ID to be used as a namespace for cached file
906 : // blocks.
907 1 : func (c *Cache) NewID() uint64 {
908 1 : return c.idAlloc.Add(1)
909 1 : }
|