Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sharedcache
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "math/bits"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/objstorage/remote"
20 : "github.com/cockroachdb/pebble/vfs"
21 : "github.com/prometheus/client_golang/prometheus"
22 : )
23 :
24 : // Exported to enable exporting from package pebble to enable
25 : // exporting metrics with below buckets in CRDB.
26 : var (
27 : IOBuckets = prometheus.ExponentialBucketsRange(float64(time.Millisecond*1), float64(10*time.Second), 50)
28 : ChannelWriteBuckets = prometheus.ExponentialBucketsRange(float64(time.Microsecond*1), float64(10*time.Second), 50)
29 : )
30 :
31 : // Cache is a persistent cache backed by a local filesystem. It is intended
32 : // to cache data that is in slower shared storage (e.g. S3), hence the
33 : // package name 'sharedcache'.
34 : type Cache struct {
35 : shards []shard
36 : writeWorkers writeWorkers
37 :
38 : bm blockMath
39 : shardingBlockSize int64
40 :
41 : logger base.Logger
42 : metrics internalMetrics
43 : }
44 :
45 : // Metrics is a struct containing metrics exported by the secondary cache.
46 : // TODO(josh): Reconsider the set of metrics exported by the secondary cache
47 : // before we release the secondary cache to users. We choose to export many metrics
48 : // right now, so we learn a lot from the benchmarking we are doing over the 23.2
49 : // cycle.
50 : type Metrics struct {
51 : // The number of sstable bytes stored in the cache.
52 : Size int64
53 : // The count of cache blocks in the cache (not sstable blocks).
54 : Count int64
55 :
56 : // The number of calls to ReadAt.
57 : TotalReads int64
58 : // The number of calls to ReadAt that require reading data from 2+ shards.
59 : MultiShardReads int64
60 : // The number of calls to ReadAt that require reading data from 2+ cache blocks.
61 : MultiBlockReads int64
62 : // The number of calls to ReadAt where all data returned was read from the cache.
63 : ReadsWithFullHit int64
64 : // The number of calls to ReadAt where some data returned was read from the cache.
65 : ReadsWithPartialHit int64
66 : // The number of calls to ReadAt where no data returned was read from the cache.
67 : ReadsWithNoHit int64
68 :
69 : // The number of times a cache block was evicted from the cache.
70 : Evictions int64
71 : // The number of times writing a cache block to the cache failed.
72 : WriteBackFailures int64
73 :
74 : // The latency of calls to get some data from the cache.
75 : GetLatency prometheus.Histogram
76 : // The latency of reads of a single cache block from disk.
77 : DiskReadLatency prometheus.Histogram
78 : // The latency of writing data to write back to the cache to a channel.
79 : // Generally should be low, but if the channel is full, could be high.
80 : QueuePutLatency prometheus.Histogram
81 : // The latency of calls to put some data read from block storage into the cache.
82 : PutLatency prometheus.Histogram
83 : // The latency of writes of a single cache block to disk.
84 : DiskWriteLatency prometheus.Histogram
85 : }
86 :
87 : // See docs at Metrics.
88 : type internalMetrics struct {
89 : count atomic.Int64
90 :
91 : totalReads atomic.Int64
92 : multiShardReads atomic.Int64
93 : multiBlockReads atomic.Int64
94 : readsWithFullHit atomic.Int64
95 : readsWithPartialHit atomic.Int64
96 : readsWithNoHit atomic.Int64
97 :
98 : evictions atomic.Int64
99 : writeBackFailures atomic.Int64
100 :
101 : getLatency prometheus.Histogram
102 : diskReadLatency prometheus.Histogram
103 : queuePutLatency prometheus.Histogram
104 : putLatency prometheus.Histogram
105 : diskWriteLatency prometheus.Histogram
106 : }
107 :
108 : const (
109 : // writeWorkersPerShard is used to establish the number of worker goroutines
110 : // that perform writes to the cache.
111 : writeWorkersPerShard = 4
112 : // writeTaskPerWorker is used to establish how many tasks can be queued up
113 : // until we have to block.
114 : writeTasksPerWorker = 4
115 : )
116 :
117 : // Open opens a cache. If there is no existing cache at fsDir, a new one
118 : // is created.
119 : func Open(
120 : fs vfs.FS,
121 : logger base.Logger,
122 : fsDir string,
123 : blockSize int,
124 : // shardingBlockSize is the size of a shard block. The cache is split into contiguous
125 : // shardingBlockSize units. The units are distributed across multiple independent shards
126 : // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
127 : // policies operate at the level of shard, not whole cache. This is done to reduce lock
128 : // contention.
129 : shardingBlockSize int64,
130 : sizeBytes int64,
131 : numShards int,
132 1 : ) (*Cache, error) {
133 1 : if minSize := shardingBlockSize * int64(numShards); sizeBytes < minSize {
134 0 : // Up the size so that we have one block per shard. In practice, this should
135 0 : // only happen in tests.
136 0 : sizeBytes = minSize
137 0 : }
138 :
139 1 : c := &Cache{
140 1 : logger: logger,
141 1 : bm: makeBlockMath(blockSize),
142 1 : shardingBlockSize: shardingBlockSize,
143 1 : }
144 1 : c.shards = make([]shard, numShards)
145 1 : blocksPerShard := sizeBytes / int64(numShards) / int64(blockSize)
146 1 : for i := range c.shards {
147 1 : if err := c.shards[i].init(c, fs, fsDir, i, blocksPerShard, blockSize, shardingBlockSize); err != nil {
148 0 : return nil, err
149 0 : }
150 : }
151 :
152 1 : c.writeWorkers.Start(c, numShards*writeWorkersPerShard)
153 1 :
154 1 : c.metrics.getLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
155 1 : c.metrics.diskReadLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
156 1 : c.metrics.putLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
157 1 : c.metrics.diskWriteLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: IOBuckets})
158 1 :
159 1 : // Measures a channel write, so lower min.
160 1 : c.metrics.queuePutLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: ChannelWriteBuckets})
161 1 :
162 1 : return c, nil
163 : }
164 :
165 : // Close closes the cache. Methods such as ReadAt should not be called after Close is
166 : // called.
167 1 : func (c *Cache) Close() error {
168 1 : c.writeWorkers.Stop()
169 1 :
170 1 : var retErr error
171 1 : for i := range c.shards {
172 1 : if err := c.shards[i].close(); err != nil && retErr == nil {
173 0 : retErr = err
174 0 : }
175 : }
176 1 : c.shards = nil
177 1 : return retErr
178 : }
179 :
180 : // Metrics return metrics for the cache. Callers should not mutate
181 : // the returned histograms, which are pointer types.
182 1 : func (c *Cache) Metrics() Metrics {
183 1 : return Metrics{
184 1 : Count: c.metrics.count.Load(),
185 1 : Size: c.metrics.count.Load() * int64(c.bm.BlockSize()),
186 1 : TotalReads: c.metrics.totalReads.Load(),
187 1 : MultiShardReads: c.metrics.multiShardReads.Load(),
188 1 : MultiBlockReads: c.metrics.multiBlockReads.Load(),
189 1 : ReadsWithFullHit: c.metrics.readsWithFullHit.Load(),
190 1 : ReadsWithPartialHit: c.metrics.readsWithPartialHit.Load(),
191 1 : ReadsWithNoHit: c.metrics.readsWithNoHit.Load(),
192 1 : Evictions: c.metrics.evictions.Load(),
193 1 : WriteBackFailures: c.metrics.writeBackFailures.Load(),
194 1 : GetLatency: c.metrics.getLatency,
195 1 : DiskReadLatency: c.metrics.diskReadLatency,
196 1 : QueuePutLatency: c.metrics.queuePutLatency,
197 1 : PutLatency: c.metrics.putLatency,
198 1 : DiskWriteLatency: c.metrics.diskWriteLatency,
199 1 : }
200 1 : }
201 :
202 : // ReadFlags contains options for Cache.ReadAt.
203 : type ReadFlags struct {
204 : // ReadOnly instructs ReadAt to not write any new data into the cache; it is
205 : // used when the data is unlikely to be used again.
206 : ReadOnly bool
207 : }
208 :
209 : // ReadAt performs a read form an object, attempting to use cached data when
210 : // possible.
211 : func (c *Cache) ReadAt(
212 : ctx context.Context,
213 : fileNum base.DiskFileNum,
214 : p []byte,
215 : ofs int64,
216 : objReader remote.ObjectReader,
217 : objSize int64,
218 : flags ReadFlags,
219 1 : ) error {
220 1 : c.metrics.totalReads.Add(1)
221 1 : if ofs >= objSize {
222 0 : if invariants.Enabled {
223 0 : panic(fmt.Sprintf("invalid ReadAt offset %v %v", ofs, objSize))
224 : }
225 0 : return io.EOF
226 : }
227 : // TODO(radu): for compaction reads, we may not want to read from the cache at
228 : // all.
229 1 : {
230 1 : start := time.Now()
231 1 : n, err := c.get(fileNum, p, ofs)
232 1 : c.metrics.getLatency.Observe(float64(time.Since(start)))
233 1 : if err != nil {
234 0 : return err
235 0 : }
236 1 : if n == len(p) {
237 1 : // Everything was in cache!
238 1 : c.metrics.readsWithFullHit.Add(1)
239 1 : return nil
240 1 : }
241 1 : if n == 0 {
242 1 : c.metrics.readsWithNoHit.Add(1)
243 1 : } else {
244 0 : c.metrics.readsWithPartialHit.Add(1)
245 0 : }
246 :
247 : // Note this. The below code does not need the original ofs, as with the earlier
248 : // reading from the cache done, the relevant offset is ofs + int64(n). Same with p.
249 1 : ofs += int64(n)
250 1 : p = p[n:]
251 1 :
252 1 : if invariants.Enabled {
253 1 : if n != 0 && c.bm.Remainder(ofs) != 0 {
254 0 : panic(fmt.Sprintf("after non-zero read from cache, ofs is not block-aligned: %v %v", ofs, n))
255 : }
256 : }
257 : }
258 :
259 1 : if flags.ReadOnly {
260 1 : return objReader.ReadAt(ctx, p, ofs)
261 1 : }
262 :
263 : // We must do reads with offset & size that are multiples of the block size. Else
264 : // later cache hits may return incorrect zeroed results from the cache.
265 1 : firstBlockInd := c.bm.Block(ofs)
266 1 : adjustedOfs := c.bm.BlockOffset(firstBlockInd)
267 1 :
268 1 : // Take the length of what is left to read plus the length of the adjustment of
269 1 : // the offset plus the size of a block minus one and divide by the size of a block
270 1 : // to get the number of blocks to read from the object.
271 1 : sizeOfOffAdjustment := int(ofs - adjustedOfs)
272 1 : adjustedLen := int(c.bm.RoundUp(int64(len(p) + sizeOfOffAdjustment)))
273 1 : adjustedP := make([]byte, adjustedLen)
274 1 :
275 1 : // Read the rest from the object. We may need to cap the length to avoid past EOF reads.
276 1 : eofCap := int64(adjustedLen)
277 1 : if adjustedOfs+eofCap > objSize {
278 1 : eofCap = objSize - adjustedOfs
279 1 : }
280 1 : if err := objReader.ReadAt(ctx, adjustedP[:eofCap], adjustedOfs); err != nil {
281 0 : return err
282 0 : }
283 1 : copy(p, adjustedP[sizeOfOffAdjustment:])
284 1 :
285 1 : start := time.Now()
286 1 : c.writeWorkers.QueueWrite(fileNum, adjustedP, adjustedOfs)
287 1 : c.metrics.queuePutLatency.Observe(float64(time.Since(start)))
288 1 :
289 1 : return nil
290 : }
291 :
292 : // get attempts to read the requested data from the cache, if it is already
293 : // there.
294 : //
295 : // If all data is available, returns n = len(p).
296 : //
297 : // If data is partially available, a prefix of the data is read; returns n < len(p)
298 : // and no error. If no prefix is available, returns n = 0 and no error.
299 1 : func (c *Cache) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
300 1 : // The data extent might cross shard boundaries, hence the loop. In the hot
301 1 : // path, max two iterations of this loop will be executed, since reads are sized
302 1 : // in units of sstable block size.
303 1 : var multiShard bool
304 1 : for {
305 1 : shard := c.getShard(fileNum, ofs+int64(n))
306 1 : cappedLen := len(p[n:])
307 1 : if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
308 0 : cappedLen = toBoundary
309 0 : }
310 1 : numRead, err := shard.get(fileNum, p[n:n+cappedLen], ofs+int64(n))
311 1 : if err != nil {
312 0 : return n, err
313 0 : }
314 1 : n += numRead
315 1 : if numRead < cappedLen {
316 1 : // We only read a prefix from this shard.
317 1 : return n, nil
318 1 : }
319 1 : if n == len(p) {
320 1 : // We are done.
321 1 : return n, nil
322 1 : }
323 : // Data extent crosses shard boundary, continue with next shard.
324 0 : if !multiShard {
325 0 : c.metrics.multiShardReads.Add(1)
326 0 : multiShard = true
327 0 : }
328 : }
329 : }
330 :
331 : // set attempts to write the requested data to the cache. Both ofs & len(p) must
332 : // be multiples of the block size.
333 : //
334 : // If all of p is not written to the shard, set returns a non-nil error.
335 1 : func (c *Cache) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
336 1 : if invariants.Enabled {
337 1 : if c.bm.Remainder(ofs) != 0 || c.bm.Remainder(int64(len(p))) != 0 {
338 0 : panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
339 : }
340 : }
341 :
342 : // The data extent might cross shard boundaries, hence the loop. In the hot
343 : // path, max two iterations of this loop will be executed, since reads are sized
344 : // in units of sstable block size.
345 1 : n := 0
346 1 : for {
347 1 : shard := c.getShard(fileNum, ofs+int64(n))
348 1 : cappedLen := len(p[n:])
349 1 : if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
350 0 : cappedLen = toBoundary
351 0 : }
352 1 : err := shard.set(fileNum, p[n:n+cappedLen], ofs+int64(n))
353 1 : if err != nil {
354 0 : return err
355 0 : }
356 : // set returns an error if cappedLen bytes aren't written to the shard.
357 1 : n += cappedLen
358 1 : if n == len(p) {
359 1 : // We are done.
360 1 : return nil
361 1 : }
362 : // Data extent crosses shard boundary, continue with next shard.
363 : }
364 : }
365 :
366 1 : func (c *Cache) getShard(fileNum base.DiskFileNum, ofs int64) *shard {
367 1 : const prime64 = 1099511628211
368 1 : hash := uint64(fileNum)*prime64 + uint64(ofs/c.shardingBlockSize)
369 1 : // TODO(josh): Instance change ops are often run in production. Such an operation
370 1 : // updates len(c.shards); see openSharedCache. As a result, the behavior of this
371 1 : // function changes, and the cache empties out at restart time. We may want a better
372 1 : // story here eventually.
373 1 : return &c.shards[hash%uint64(len(c.shards))]
374 1 : }
375 :
376 : type shard struct {
377 : cache *Cache
378 : file vfs.File
379 : sizeInBlocks int64
380 : bm blockMath
381 : shardingBlockSize int64
382 : mu struct {
383 : sync.Mutex
384 : // TODO(josh): None of these datastructures are space-efficient.
385 : // Focusing on correctness to start.
386 : where whereMap
387 : blocks []cacheBlockState
388 : // Head of LRU list (doubly-linked circular).
389 : lruHead cacheBlockIndex
390 : // Head of free list (singly-linked chain).
391 : freeHead cacheBlockIndex
392 : }
393 : }
394 :
395 : type cacheBlockState struct {
396 : lock lockState
397 : logical logicalBlockID
398 :
399 : // next is the next block in the LRU or free list (or invalidBlockIndex if it
400 : // is the last block in the free list).
401 : next cacheBlockIndex
402 :
403 : // prev is the previous block in the LRU list. It is not used when the block
404 : // is in the free list.
405 : prev cacheBlockIndex
406 : }
407 :
408 : // Maps a logical block in an SST to an index of the cache block with the
409 : // file contents (to the "cache block index").
410 : type whereMap map[logicalBlockID]cacheBlockIndex
411 :
412 : type logicalBlockID struct {
413 : filenum base.DiskFileNum
414 : cacheBlockIdx cacheBlockIndex
415 : }
416 :
417 : type lockState int64
418 :
419 : const (
420 : unlocked lockState = 0
421 : // >0 lockState tracks the number of distinct readers of some cache block / logical block
422 : // which is in the secondary cache. It is used to ensure that a cache block is not evicted
423 : // and overwritten, while there are active readers.
424 : readLockTakenInc = 1
425 : // -1 lockState indicates that some cache block is currently being populated with data from
426 : // blob storage. It is used to ensure that a cache block is not read or evicted again, while
427 : // it is being populated.
428 : writeLockTaken = -1
429 : )
430 :
431 : func (s *shard) init(
432 : cache *Cache,
433 : fs vfs.FS,
434 : fsDir string,
435 : shardIdx int,
436 : sizeInBlocks int64,
437 : blockSize int,
438 : shardingBlockSize int64,
439 1 : ) error {
440 1 : *s = shard{
441 1 : cache: cache,
442 1 : sizeInBlocks: sizeInBlocks,
443 1 : }
444 1 : if blockSize < 1024 || shardingBlockSize%int64(blockSize) != 0 {
445 0 : return errors.Newf("invalid block size %d (must divide %d)", blockSize, shardingBlockSize)
446 0 : }
447 1 : s.bm = makeBlockMath(blockSize)
448 1 : s.shardingBlockSize = shardingBlockSize
449 1 : file, err := fs.OpenReadWrite(fs.PathJoin(fsDir, fmt.Sprintf("SHARED-CACHE-%03d", shardIdx)))
450 1 : if err != nil {
451 0 : return err
452 0 : }
453 : // TODO(radu): truncate file if necessary (especially important if we restart
454 : // with more shards).
455 1 : if err := file.Preallocate(0, int64(blockSize)*sizeInBlocks); err != nil {
456 0 : return err
457 0 : }
458 1 : s.file = file
459 1 :
460 1 : // TODO(josh): Right now, the secondary cache is not persistent. All existing
461 1 : // cache contents will be over-written, since all metadata is only stored in
462 1 : // memory.
463 1 : s.mu.where = make(whereMap)
464 1 : s.mu.blocks = make([]cacheBlockState, sizeInBlocks)
465 1 : s.mu.lruHead = invalidBlockIndex
466 1 : s.mu.freeHead = invalidBlockIndex
467 1 : for i := range s.mu.blocks {
468 1 : s.freePush(cacheBlockIndex(i))
469 1 : }
470 :
471 1 : return nil
472 : }
473 :
474 1 : func (s *shard) close() error {
475 1 : defer func() {
476 1 : s.file = nil
477 1 : }()
478 1 : return s.file.Close()
479 : }
480 :
481 : // freePush pushes a block to the front of the free list.
482 1 : func (s *shard) freePush(index cacheBlockIndex) {
483 1 : s.mu.blocks[index].next = s.mu.freeHead
484 1 : s.mu.freeHead = index
485 1 : }
486 :
487 : // freePop removes the block from the front of the free list. Must not be called
488 : // if the list is empty (i.e. freeHead = invalidBlockIndex).
489 1 : func (s *shard) freePop() cacheBlockIndex {
490 1 : index := s.mu.freeHead
491 1 : s.mu.freeHead = s.mu.blocks[index].next
492 1 : return index
493 1 : }
494 :
495 : // lruInsertFront inserts a block at the front of the LRU list.
496 1 : func (s *shard) lruInsertFront(index cacheBlockIndex) {
497 1 : b := &s.mu.blocks[index]
498 1 : if s.mu.lruHead == invalidBlockIndex {
499 1 : b.next = index
500 1 : b.prev = index
501 1 : } else {
502 1 : b.next = s.mu.lruHead
503 1 : h := &s.mu.blocks[s.mu.lruHead]
504 1 : b.prev = h.prev
505 1 : s.mu.blocks[h.prev].next = index
506 1 : h.prev = index
507 1 : }
508 1 : s.mu.lruHead = index
509 : }
510 :
511 1 : func (s *shard) lruNext(index cacheBlockIndex) cacheBlockIndex {
512 1 : return s.mu.blocks[index].next
513 1 : }
514 :
515 1 : func (s *shard) lruPrev(index cacheBlockIndex) cacheBlockIndex {
516 1 : return s.mu.blocks[index].prev
517 1 : }
518 :
519 : // lruUnlink removes a block from the LRU list.
520 1 : func (s *shard) lruUnlink(index cacheBlockIndex) {
521 1 : b := &s.mu.blocks[index]
522 1 : if b.next == index {
523 1 : s.mu.lruHead = invalidBlockIndex
524 1 : } else {
525 1 : s.mu.blocks[b.prev].next = b.next
526 1 : s.mu.blocks[b.next].prev = b.prev
527 1 : if s.mu.lruHead == index {
528 1 : s.mu.lruHead = b.next
529 1 : }
530 : }
531 1 : b.next, b.prev = invalidBlockIndex, invalidBlockIndex
532 : }
533 :
534 : // get attempts to read the requested data from the shard. The data must not
535 : // cross a shard boundary.
536 : //
537 : // If all data is available, returns n = len(p).
538 : //
539 : // If data is partially available, a prefix of the data is read; returns n < len(p)
540 : // and no error. If no prefix is available, returns n = 0 and no error.
541 : //
542 : // TODO(josh): Today, if there are two cache blocks needed to satisfy a read, and the
543 : // first block is not in the cache and the second one is, we will read both from
544 : // blob storage. We should fix this. This is not an unlikely scenario if we are doing
545 : // a reverse scan, since those iterate over sstable blocks in reverse order and due to
546 : // cache block aligned reads will have read the suffix of the sstable block that will
547 : // be needed next.
548 1 : func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
549 1 : if invariants.Enabled {
550 1 : if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
551 0 : panic(fmt.Sprintf("get crosses shard boundary: %v %v", ofs, len(p)))
552 : }
553 1 : s.assertShardStateIsConsistent()
554 : }
555 :
556 : // The data extent might cross cache block boundaries, hence the loop. In the hot
557 : // path, max two iterations of this loop will be executed, since reads are sized
558 : // in units of sstable block size.
559 1 : var multiBlock bool
560 1 : for {
561 1 : k := logicalBlockID{
562 1 : filenum: fileNum,
563 1 : cacheBlockIdx: s.bm.Block(ofs + int64(n)),
564 1 : }
565 1 : s.mu.Lock()
566 1 : cacheBlockIdx, ok := s.mu.where[k]
567 1 : // TODO(josh): Multiple reads within the same few milliseconds (anything that is smaller
568 1 : // than blob storage read latency) that miss on the same logical block ID will not necessarily
569 1 : // be rare. We may want to do only one read, with the later readers blocking on the first read
570 1 : // completing. This could be implemented either here or in the primary block cache. See
571 1 : // https://github.com/cockroachdb/pebble/pull/2586 for additional discussion.
572 1 : if !ok {
573 1 : s.mu.Unlock()
574 1 : return n, nil
575 1 : }
576 1 : if s.mu.blocks[cacheBlockIdx].lock == writeLockTaken {
577 1 : // In practice, if we have two reads of the same SST block in close succession, we
578 1 : // would expect the second to hit in the in-memory block cache. So it's not worth
579 1 : // optimizing this case here.
580 1 : s.mu.Unlock()
581 1 : return n, nil
582 1 : }
583 1 : s.mu.blocks[cacheBlockIdx].lock += readLockTakenInc
584 1 : // Move to front of the LRU list.
585 1 : s.lruUnlink(cacheBlockIdx)
586 1 : s.lruInsertFront(cacheBlockIdx)
587 1 : s.mu.Unlock()
588 1 :
589 1 : readAt := s.bm.BlockOffset(cacheBlockIdx)
590 1 : readSize := s.bm.BlockSize()
591 1 : if n == 0 { // if first read
592 1 : rem := s.bm.Remainder(ofs)
593 1 : readAt += rem
594 1 : readSize -= int(rem)
595 1 : }
596 :
597 1 : if len(p[n:]) <= readSize {
598 1 : start := time.Now()
599 1 : numRead, err := s.file.ReadAt(p[n:], readAt)
600 1 : s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
601 1 : s.dropReadLock(cacheBlockIdx)
602 1 : return n + numRead, err
603 1 : }
604 1 : start := time.Now()
605 1 : numRead, err := s.file.ReadAt(p[n:n+readSize], readAt)
606 1 : s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
607 1 : s.dropReadLock(cacheBlockIdx)
608 1 : if err != nil {
609 0 : return 0, err
610 0 : }
611 :
612 : // Note that numRead == readSize, since we checked for an error above.
613 1 : n += numRead
614 1 :
615 1 : if !multiBlock {
616 1 : s.cache.metrics.multiBlockReads.Add(1)
617 1 : multiBlock = true
618 1 : }
619 : }
620 : }
621 :
622 : // set attempts to write the requested data to the shard. The data must not
623 : // cross a shard boundary, and both ofs & len(p) must be multiples of the
624 : // block size.
625 : //
626 : // If all of p is not written to the shard, set returns a non-nil error.
627 1 : func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
628 1 : if invariants.Enabled {
629 1 : if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
630 0 : panic(fmt.Sprintf("set crosses shard boundary: %v %v", ofs, len(p)))
631 : }
632 1 : if s.bm.Remainder(ofs) != 0 || s.bm.Remainder(int64(len(p))) != 0 {
633 0 : panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
634 : }
635 1 : s.assertShardStateIsConsistent()
636 : }
637 :
638 : // The data extent might cross cache block boundaries, hence the loop. In the hot
639 : // path, max two iterations of this loop will be executed, since reads are sized
640 : // in units of sstable block size.
641 1 : n := 0
642 1 : for {
643 1 : if n == len(p) {
644 1 : return nil
645 1 : }
646 1 : if invariants.Enabled {
647 1 : if n > len(p) {
648 0 : panic(fmt.Sprintf("set with n greater than len(p): %v %v", n, len(p)))
649 : }
650 : }
651 :
652 : // If the logical block is already in the cache, we should skip doing a set.
653 1 : k := logicalBlockID{
654 1 : filenum: fileNum,
655 1 : cacheBlockIdx: s.bm.Block(ofs + int64(n)),
656 1 : }
657 1 : s.mu.Lock()
658 1 : if _, ok := s.mu.where[k]; ok {
659 1 : s.mu.Unlock()
660 1 : n += s.bm.BlockSize()
661 1 : continue
662 : }
663 :
664 1 : var cacheBlockIdx cacheBlockIndex
665 1 : if s.mu.freeHead == invalidBlockIndex {
666 1 : if invariants.Enabled && s.mu.lruHead == invalidBlockIndex {
667 0 : panic("both LRU and free lists empty")
668 : }
669 :
670 : // Find the last element in the LRU list which is not locked.
671 1 : for idx := s.lruPrev(s.mu.lruHead); ; idx = s.lruPrev(idx) {
672 1 : if lock := s.mu.blocks[idx].lock; lock == unlocked {
673 1 : cacheBlockIdx = idx
674 1 : break
675 : }
676 0 : if idx == s.mu.lruHead {
677 0 : // No unlocked block to evict.
678 0 : //
679 0 : // TODO(josh): We may want to block until a block frees up, instead of returning
680 0 : // an error here. But I think we can do that later on, e.g. after running some production
681 0 : // experiments.
682 0 : s.mu.Unlock()
683 0 : return errors.New("no block to evict so skipping write to cache")
684 0 : }
685 : }
686 1 : s.cache.metrics.evictions.Add(1)
687 1 : s.lruUnlink(cacheBlockIdx)
688 1 : delete(s.mu.where, s.mu.blocks[cacheBlockIdx].logical)
689 1 : } else {
690 1 : s.cache.metrics.count.Add(1)
691 1 : cacheBlockIdx = s.freePop()
692 1 : }
693 :
694 1 : s.lruInsertFront(cacheBlockIdx)
695 1 : s.mu.where[k] = cacheBlockIdx
696 1 : s.mu.blocks[cacheBlockIdx].logical = k
697 1 : s.mu.blocks[cacheBlockIdx].lock = writeLockTaken
698 1 : s.mu.Unlock()
699 1 :
700 1 : writeAt := s.bm.BlockOffset(cacheBlockIdx)
701 1 :
702 1 : writeSize := s.bm.BlockSize()
703 1 : if len(p[n:]) <= writeSize {
704 1 : writeSize = len(p[n:])
705 1 : }
706 :
707 1 : start := time.Now()
708 1 : _, err := s.file.WriteAt(p[n:n+writeSize], writeAt)
709 1 : s.cache.metrics.diskWriteLatency.Observe(float64(time.Since(start)))
710 1 : if err != nil {
711 0 : // Free the block.
712 0 : s.mu.Lock()
713 0 : defer s.mu.Unlock()
714 0 :
715 0 : delete(s.mu.where, k)
716 0 : s.lruUnlink(cacheBlockIdx)
717 0 : s.freePush(cacheBlockIdx)
718 0 : return err
719 0 : }
720 1 : s.dropWriteLock(cacheBlockIdx)
721 1 : n += writeSize
722 : }
723 : }
724 :
725 : // Doesn't inline currently. This might be okay, but something to keep in mind.
726 1 : func (s *shard) dropReadLock(cacheBlockInd cacheBlockIndex) {
727 1 : s.mu.Lock()
728 1 : s.mu.blocks[cacheBlockInd].lock -= readLockTakenInc
729 1 : if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock < 0 {
730 0 : panic(fmt.Sprintf("unexpected lock state %v in dropReadLock", s.mu.blocks[cacheBlockInd].lock))
731 : }
732 1 : s.mu.Unlock()
733 : }
734 :
735 : // Doesn't inline currently. This might be okay, but something to keep in mind.
736 1 : func (s *shard) dropWriteLock(cacheBlockInd cacheBlockIndex) {
737 1 : s.mu.Lock()
738 1 : if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock != writeLockTaken {
739 0 : panic(fmt.Sprintf("unexpected lock state %v in dropWriteLock", s.mu.blocks[cacheBlockInd].lock))
740 : }
741 1 : s.mu.blocks[cacheBlockInd].lock = unlocked
742 1 : s.mu.Unlock()
743 : }
744 :
745 1 : func (s *shard) assertShardStateIsConsistent() {
746 1 : s.mu.Lock()
747 1 : defer s.mu.Unlock()
748 1 :
749 1 : lruLen := 0
750 1 : if s.mu.lruHead != invalidBlockIndex {
751 1 : for b := s.mu.lruHead; ; {
752 1 : lruLen++
753 1 : if idx, ok := s.mu.where[s.mu.blocks[b].logical]; !ok || idx != b {
754 0 : panic("block in LRU list with no entry in where map")
755 : }
756 1 : b = s.lruNext(b)
757 1 : if b == s.mu.lruHead {
758 1 : break
759 : }
760 : }
761 : }
762 1 : if lruLen != len(s.mu.where) {
763 0 : panic(fmt.Sprintf("lru list len is %d but where map has %d entries", lruLen, len(s.mu.where)))
764 : }
765 1 : freeLen := 0
766 1 : for n := s.mu.freeHead; n != invalidBlockIndex; n = s.mu.blocks[n].next {
767 1 : freeLen++
768 1 : }
769 :
770 1 : if lruLen+freeLen != int(s.sizeInBlocks) {
771 0 : panic(fmt.Sprintf("%d lru blocks and %d free blocks don't add up to %d", lruLen, freeLen, s.sizeInBlocks))
772 : }
773 1 : for i := range s.mu.blocks {
774 1 : if state := s.mu.blocks[i].lock; state < writeLockTaken {
775 0 : panic(fmt.Sprintf("lock state %v is not allowed", state))
776 : }
777 : }
778 : }
779 :
780 : // cacheBlockIndex is the index of a blockSize-aligned cache block.
781 : type cacheBlockIndex int64
782 :
783 : // invalidBlockIndex is used for the head of a list when the list is empty.
784 : const invalidBlockIndex cacheBlockIndex = -1
785 :
786 : // blockMath is a helper type for performing conversions between offsets and
787 : // block indexes.
788 : type blockMath struct {
789 : blockSizeBits int8
790 : }
791 :
792 1 : func makeBlockMath(blockSize int) blockMath {
793 1 : bm := blockMath{
794 1 : blockSizeBits: int8(bits.Len64(uint64(blockSize)) - 1),
795 1 : }
796 1 : if blockSize != (1 << bm.blockSizeBits) {
797 0 : panic(fmt.Sprintf("blockSize %d is not a power of 2", blockSize))
798 : }
799 1 : return bm
800 : }
801 :
802 1 : func (bm blockMath) mask() int64 {
803 1 : return (1 << bm.blockSizeBits) - 1
804 1 : }
805 :
806 : // BlockSize returns the block size.
807 1 : func (bm blockMath) BlockSize() int {
808 1 : return 1 << bm.blockSizeBits
809 1 : }
810 :
811 : // Block returns the block index containing the given offset.
812 1 : func (bm blockMath) Block(offset int64) cacheBlockIndex {
813 1 : return cacheBlockIndex(offset >> bm.blockSizeBits)
814 1 : }
815 :
816 : // Remainder returns the offset relative to the start of the cache block.
817 1 : func (bm blockMath) Remainder(offset int64) int64 {
818 1 : return offset & bm.mask()
819 1 : }
820 :
821 : // BlockOffset returns the object offset where the given block starts.
822 1 : func (bm blockMath) BlockOffset(block cacheBlockIndex) int64 {
823 1 : return int64(block) << bm.blockSizeBits
824 1 : }
825 :
826 : // RoundUp rounds up the given value to the closest multiple of block size.
827 1 : func (bm blockMath) RoundUp(x int64) int64 {
828 1 : return (x + bm.mask()) & ^(bm.mask())
829 1 : }
830 :
831 : type writeWorkers struct {
832 : doneCh chan struct{}
833 : doneWaitGroup sync.WaitGroup
834 :
835 : numWorkers int
836 : tasksCh chan writeTask
837 : }
838 :
839 : type writeTask struct {
840 : fileNum base.DiskFileNum
841 : p []byte
842 : offset int64
843 : }
844 :
845 : // Start starts the worker goroutines.
846 1 : func (w *writeWorkers) Start(c *Cache, numWorkers int) {
847 1 : doneCh := make(chan struct{})
848 1 : tasksCh := make(chan writeTask, numWorkers*writeTasksPerWorker)
849 1 :
850 1 : w.numWorkers = numWorkers
851 1 : w.doneCh = doneCh
852 1 : w.tasksCh = tasksCh
853 1 : w.doneWaitGroup.Add(numWorkers)
854 1 : for i := 0; i < numWorkers; i++ {
855 1 : go func() {
856 1 : defer w.doneWaitGroup.Done()
857 1 : for {
858 1 : select {
859 1 : case <-doneCh:
860 1 : return
861 1 : case task, ok := <-tasksCh:
862 1 : if !ok {
863 0 : // The tasks channel was closed; this is used in testing code to
864 0 : // ensure all writes are completed.
865 0 : return
866 0 : }
867 : // TODO(radu): set() can perform multiple writes; perhaps each one
868 : // should be its own task.
869 1 : start := time.Now()
870 1 : err := c.set(task.fileNum, task.p, task.offset)
871 1 : c.metrics.putLatency.Observe(float64(time.Since(start)))
872 1 : if err != nil {
873 0 : c.metrics.writeBackFailures.Add(1)
874 0 : // TODO(radu): throttle logs.
875 0 : c.logger.Errorf("writing back to cache after miss failed: %v", err)
876 0 : }
877 : }
878 : }
879 : }()
880 : }
881 : }
882 :
883 : // Stop waits for any in-progress writes to complete and stops the worker
884 : // goroutines and waits for any in-pro. Any queued writes not yet started are
885 : // discarded.
886 1 : func (w *writeWorkers) Stop() {
887 1 : close(w.doneCh)
888 1 : w.doneCh = nil
889 1 : w.tasksCh = nil
890 1 : w.doneWaitGroup.Wait()
891 1 : }
892 :
893 : // QueueWrite adds a write task to the queue. Can block if the queue is full.
894 1 : func (w *writeWorkers) QueueWrite(fileNum base.DiskFileNum, p []byte, offset int64) {
895 1 : w.tasksCh <- writeTask{
896 1 : fileNum: fileNum,
897 1 : p: p,
898 1 : offset: offset,
899 1 : }
900 1 : }
|