Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sharedcache
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "math/bits"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/objstorage/remote"
20 : "github.com/cockroachdb/pebble/vfs"
21 : "github.com/prometheus/client_golang/prometheus"
22 : )
23 :
24 : // Cache is a persistent cache backed by a local filesystem. It is intended
25 : // to cache data that is in slower shared storage (e.g. S3), hence the
26 : // package name 'sharedcache'.
27 : type Cache struct {
28 : shards []shard
29 : writeWorkers writeWorkers
30 :
31 : bm blockMath
32 : shardingBlockSize int64
33 :
34 : logger base.Logger
35 : metrics internalMetrics
36 : }
37 :
38 : // Metrics is a struct containing metrics exported by the secondary cache.
39 : // TODO(josh): Reconsider the set of metrics exported by the secondary cache
40 : // before we release the secondary cache to users. We choose to export many metrics
41 : // right now, so we learn a lot from the benchmarking we are doing over the 23.2
42 : // cycle.
43 : type Metrics struct {
44 : // The number of sstable bytes stored in the cache.
45 : Size int64
46 : // The count of cache blocks in the cache (not sstable blocks).
47 : Count int64
48 :
49 : // The number of calls to ReadAt.
50 : TotalReads int64
51 : // The number of calls to ReadAt that require reading data from 2+ shards.
52 : MultiShardReads int64
53 : // The number of calls to ReadAt that require reading data from 2+ cache blocks.
54 : MultiBlockReads int64
55 : // The number of calls to ReadAt where all data returned was read from the cache.
56 : ReadsWithFullHit int64
57 : // The number of calls to ReadAt where some data returned was read from the cache.
58 : ReadsWithPartialHit int64
59 : // The number of calls to ReadAt where no data returned was read from the cache.
60 : ReadsWithNoHit int64
61 :
62 : // The number of times a cache block was evicted from the cache.
63 : Evictions int64
64 : // The number of times writing a cache block to the cache failed.
65 : WriteBackFailures int64
66 :
67 : // The latency of calls to get some data from the cache.
68 : GetLatency prometheus.Histogram
69 : // The latency of reads of a single cache block from disk.
70 : DiskReadLatency prometheus.Histogram
71 : // The latency of writing data to write back to the cache to a channel.
72 : // Generally should be low, but if the channel is full, could be high.
73 : QueuePutLatency prometheus.Histogram
74 : // The latency of calls to put some data read from block storage into the cache.
75 : PutLatency prometheus.Histogram
76 : // The latency of writes of a single cache block to disk.
77 : DiskWriteLatency prometheus.Histogram
78 : }
79 :
80 : // See docs at Metrics.
81 : type internalMetrics struct {
82 : count atomic.Int64
83 :
84 : totalReads atomic.Int64
85 : multiShardReads atomic.Int64
86 : multiBlockReads atomic.Int64
87 : readsWithFullHit atomic.Int64
88 : readsWithPartialHit atomic.Int64
89 : readsWithNoHit atomic.Int64
90 :
91 : evictions atomic.Int64
92 : writeBackFailures atomic.Int64
93 :
94 : getLatency prometheus.Histogram
95 : diskReadLatency prometheus.Histogram
96 : queuePutLatency prometheus.Histogram
97 : putLatency prometheus.Histogram
98 : diskWriteLatency prometheus.Histogram
99 : }
100 :
101 : const (
102 : // writeWorkersPerShard is used to establish the number of worker goroutines
103 : // that perform writes to the cache.
104 : writeWorkersPerShard = 4
105 : // writeTaskPerWorker is used to establish how many tasks can be queued up
106 : // until we have to block.
107 : writeTasksPerWorker = 4
108 : )
109 :
110 : // Open opens a cache. If there is no existing cache at fsDir, a new one
111 : // is created.
112 : func Open(
113 : fs vfs.FS,
114 : logger base.Logger,
115 : fsDir string,
116 : blockSize int,
117 : // shardingBlockSize is the size of a shard block. The cache is split into contiguous
118 : // shardingBlockSize units. The units are distributed across multiple independent shards
119 : // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
120 : // policies operate at the level of shard, not whole cache. This is done to reduce lock
121 : // contention.
122 : shardingBlockSize int64,
123 : sizeBytes int64,
124 : numShards int,
125 1 : ) (*Cache, error) {
126 1 : min := shardingBlockSize * int64(numShards)
127 1 : if sizeBytes < min {
128 0 : return nil, errors.Errorf("cache size %d lower than min %d", sizeBytes, min)
129 0 : }
130 :
131 1 : c := &Cache{
132 1 : logger: logger,
133 1 : bm: makeBlockMath(blockSize),
134 1 : shardingBlockSize: shardingBlockSize,
135 1 : }
136 1 : c.shards = make([]shard, numShards)
137 1 : blocksPerShard := sizeBytes / int64(numShards) / int64(blockSize)
138 1 : for i := range c.shards {
139 1 : if err := c.shards[i].init(c, fs, fsDir, i, blocksPerShard, blockSize, shardingBlockSize); err != nil {
140 0 : return nil, err
141 0 : }
142 : }
143 :
144 1 : c.writeWorkers.Start(c, numShards*writeWorkersPerShard)
145 1 :
146 1 : buckets := prometheus.ExponentialBucketsRange(float64(time.Millisecond*1), float64(10*time.Second), 50)
147 1 : c.metrics.getLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
148 1 : c.metrics.diskReadLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
149 1 : c.metrics.putLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
150 1 : c.metrics.diskWriteLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
151 1 :
152 1 : // Measures a channel write, so lower min.
153 1 : buckets = prometheus.ExponentialBucketsRange(float64(time.Microsecond*1), float64(10*time.Second), 50)
154 1 : c.metrics.queuePutLatency = prometheus.NewHistogram(prometheus.HistogramOpts{Buckets: buckets})
155 1 :
156 1 : return c, nil
157 : }
158 :
159 : // Close closes the cache. Methods such as ReadAt should not be called after Close is
160 : // called.
161 1 : func (c *Cache) Close() error {
162 1 : c.writeWorkers.Stop()
163 1 :
164 1 : var retErr error
165 1 : for i := range c.shards {
166 1 : if err := c.shards[i].close(); err != nil && retErr == nil {
167 0 : retErr = err
168 0 : }
169 : }
170 1 : c.shards = nil
171 1 : return retErr
172 : }
173 :
174 : // Metrics return metrics for the cache. Callers should not mutate
175 : // the returned histograms, which are pointer types.
176 1 : func (c *Cache) Metrics() Metrics {
177 1 : return Metrics{
178 1 : Count: c.metrics.count.Load(),
179 1 : Size: c.metrics.count.Load() * int64(c.bm.BlockSize()),
180 1 : TotalReads: c.metrics.totalReads.Load(),
181 1 : MultiShardReads: c.metrics.multiShardReads.Load(),
182 1 : MultiBlockReads: c.metrics.multiBlockReads.Load(),
183 1 : ReadsWithFullHit: c.metrics.readsWithFullHit.Load(),
184 1 : ReadsWithPartialHit: c.metrics.readsWithPartialHit.Load(),
185 1 : ReadsWithNoHit: c.metrics.readsWithNoHit.Load(),
186 1 : Evictions: c.metrics.evictions.Load(),
187 1 : WriteBackFailures: c.metrics.writeBackFailures.Load(),
188 1 : GetLatency: c.metrics.getLatency,
189 1 : DiskReadLatency: c.metrics.diskReadLatency,
190 1 : QueuePutLatency: c.metrics.queuePutLatency,
191 1 : PutLatency: c.metrics.putLatency,
192 1 : DiskWriteLatency: c.metrics.diskWriteLatency,
193 1 : }
194 1 : }
195 :
196 : // ReadFlags contains options for Cache.ReadAt.
197 : type ReadFlags struct {
198 : // ReadOnly instructs ReadAt to not write any new data into the cache; it is
199 : // used when the data is unlikely to be used again.
200 : ReadOnly bool
201 : }
202 :
203 : // ReadAt performs a read form an object, attempting to use cached data when
204 : // possible.
205 : func (c *Cache) ReadAt(
206 : ctx context.Context,
207 : fileNum base.DiskFileNum,
208 : p []byte,
209 : ofs int64,
210 : objReader remote.ObjectReader,
211 : objSize int64,
212 : flags ReadFlags,
213 1 : ) error {
214 1 : c.metrics.totalReads.Add(1)
215 1 : if ofs >= objSize {
216 0 : if invariants.Enabled {
217 0 : panic(fmt.Sprintf("invalid ReadAt offset %v %v", ofs, objSize))
218 : }
219 0 : return io.EOF
220 : }
221 : // TODO(radu): for compaction reads, we may not want to read from the cache at
222 : // all.
223 1 : {
224 1 : start := time.Now()
225 1 : n, err := c.get(fileNum, p, ofs)
226 1 : c.metrics.getLatency.Observe(float64(time.Since(start)))
227 1 : if err != nil {
228 0 : return err
229 0 : }
230 1 : if n == len(p) {
231 1 : // Everything was in cache!
232 1 : c.metrics.readsWithFullHit.Add(1)
233 1 : return nil
234 1 : }
235 1 : if n == 0 {
236 1 : c.metrics.readsWithNoHit.Add(1)
237 1 : } else {
238 1 : c.metrics.readsWithPartialHit.Add(1)
239 1 : }
240 :
241 : // Note this. The below code does not need the original ofs, as with the earlier
242 : // reading from the cache done, the relevant offset is ofs + int64(n). Same with p.
243 1 : ofs += int64(n)
244 1 : p = p[n:]
245 1 :
246 1 : if invariants.Enabled {
247 0 : if n != 0 && c.bm.Remainder(ofs) != 0 {
248 0 : panic(fmt.Sprintf("after non-zero read from cache, ofs is not block-aligned: %v %v", ofs, n))
249 : }
250 : }
251 : }
252 :
253 1 : if flags.ReadOnly {
254 1 : return objReader.ReadAt(ctx, p, ofs)
255 1 : }
256 :
257 : // We must do reads with offset & size that are multiples of the block size. Else
258 : // later cache hits may return incorrect zeroed results from the cache.
259 1 : firstBlockInd := c.bm.Block(ofs)
260 1 : adjustedOfs := c.bm.BlockOffset(firstBlockInd)
261 1 :
262 1 : // Take the length of what is left to read plus the length of the adjustment of
263 1 : // the offset plus the size of a block minus one and divide by the size of a block
264 1 : // to get the number of blocks to read from the object.
265 1 : sizeOfOffAdjustment := int(ofs - adjustedOfs)
266 1 : adjustedLen := int(c.bm.RoundUp(int64(len(p) + sizeOfOffAdjustment)))
267 1 : adjustedP := make([]byte, adjustedLen)
268 1 :
269 1 : // Read the rest from the object. We may need to cap the length to avoid past EOF reads.
270 1 : eofCap := int64(adjustedLen)
271 1 : if adjustedOfs+eofCap > objSize {
272 1 : eofCap = objSize - adjustedOfs
273 1 : }
274 1 : if err := objReader.ReadAt(ctx, adjustedP[:eofCap], adjustedOfs); err != nil {
275 0 : return err
276 0 : }
277 1 : copy(p, adjustedP[sizeOfOffAdjustment:])
278 1 :
279 1 : start := time.Now()
280 1 : c.writeWorkers.QueueWrite(fileNum, adjustedP, adjustedOfs)
281 1 : c.metrics.queuePutLatency.Observe(float64(time.Since(start)))
282 1 :
283 1 : return nil
284 : }
285 :
286 : // get attempts to read the requested data from the cache, if it is already
287 : // there.
288 : //
289 : // If all data is available, returns n = len(p).
290 : //
291 : // If data is partially available, a prefix of the data is read; returns n < len(p)
292 : // and no error. If no prefix is available, returns n = 0 and no error.
293 1 : func (c *Cache) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
294 1 : // The data extent might cross shard boundaries, hence the loop. In the hot
295 1 : // path, max two iterations of this loop will be executed, since reads are sized
296 1 : // in units of sstable block size.
297 1 : var multiShard bool
298 1 : for {
299 1 : shard := c.getShard(fileNum, ofs+int64(n))
300 1 : cappedLen := len(p[n:])
301 1 : if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
302 1 : cappedLen = toBoundary
303 1 : }
304 1 : numRead, err := shard.get(fileNum, p[n:n+cappedLen], ofs+int64(n))
305 1 : if err != nil {
306 0 : return n, err
307 0 : }
308 1 : n += numRead
309 1 : if numRead < cappedLen {
310 1 : // We only read a prefix from this shard.
311 1 : return n, nil
312 1 : }
313 1 : if n == len(p) {
314 1 : // We are done.
315 1 : return n, nil
316 1 : }
317 : // Data extent crosses shard boundary, continue with next shard.
318 1 : if !multiShard {
319 1 : c.metrics.multiShardReads.Add(1)
320 1 : multiShard = true
321 1 : }
322 : }
323 : }
324 :
325 : // set attempts to write the requested data to the cache. Both ofs & len(p) must
326 : // be multiples of the block size.
327 : //
328 : // If all of p is not written to the shard, set returns a non-nil error.
329 1 : func (c *Cache) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
330 1 : if invariants.Enabled {
331 0 : if c.bm.Remainder(ofs) != 0 || c.bm.Remainder(int64(len(p))) != 0 {
332 0 : panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
333 : }
334 : }
335 :
336 : // The data extent might cross shard boundaries, hence the loop. In the hot
337 : // path, max two iterations of this loop will be executed, since reads are sized
338 : // in units of sstable block size.
339 1 : n := 0
340 1 : for {
341 1 : shard := c.getShard(fileNum, ofs+int64(n))
342 1 : cappedLen := len(p[n:])
343 1 : if toBoundary := int(c.shardingBlockSize - ((ofs + int64(n)) % c.shardingBlockSize)); cappedLen > toBoundary {
344 1 : cappedLen = toBoundary
345 1 : }
346 1 : err := shard.set(fileNum, p[n:n+cappedLen], ofs+int64(n))
347 1 : if err != nil {
348 0 : return err
349 0 : }
350 : // set returns an error if cappedLen bytes aren't written to the shard.
351 1 : n += cappedLen
352 1 : if n == len(p) {
353 1 : // We are done.
354 1 : return nil
355 1 : }
356 : // Data extent crosses shard boundary, continue with next shard.
357 : }
358 : }
359 :
360 1 : func (c *Cache) getShard(fileNum base.DiskFileNum, ofs int64) *shard {
361 1 : const prime64 = 1099511628211
362 1 : hash := uint64(fileNum.FileNum())*prime64 + uint64(ofs/c.shardingBlockSize)
363 1 : // TODO(josh): Instance change ops are often run in production. Such an operation
364 1 : // updates len(c.shards); see openSharedCache. As a result, the behavior of this
365 1 : // function changes, and the cache empties out at restart time. We may want a better
366 1 : // story here eventually.
367 1 : return &c.shards[hash%uint64(len(c.shards))]
368 1 : }
369 :
370 : type shard struct {
371 : cache *Cache
372 : file vfs.File
373 : sizeInBlocks int64
374 : bm blockMath
375 : shardingBlockSize int64
376 : mu struct {
377 : sync.Mutex
378 : // TODO(josh): None of these datastructures are space-efficient.
379 : // Focusing on correctness to start.
380 : where whereMap
381 : blocks []cacheBlockState
382 : // Head of LRU list (doubly-linked circular).
383 : lruHead cacheBlockIndex
384 : // Head of free list (singly-linked chain).
385 : freeHead cacheBlockIndex
386 : }
387 : }
388 :
389 : type cacheBlockState struct {
390 : lock lockState
391 : logical logicalBlockID
392 :
393 : // next is the next block in the LRU or free list (or invalidBlockIndex if it
394 : // is the last block in the free list).
395 : next cacheBlockIndex
396 :
397 : // prev is the previous block in the LRU list. It is not used when the block
398 : // is in the free list.
399 : prev cacheBlockIndex
400 : }
401 :
402 : // Maps a logical block in an SST to an index of the cache block with the
403 : // file contents (to the "cache block index").
404 : type whereMap map[logicalBlockID]cacheBlockIndex
405 :
406 : type logicalBlockID struct {
407 : filenum base.DiskFileNum
408 : cacheBlockIdx cacheBlockIndex
409 : }
410 :
411 : type lockState int64
412 :
413 : const (
414 : unlocked lockState = 0
415 : // >0 lockState tracks the number of distinct readers of some cache block / logical block
416 : // which is in the secondary cache. It is used to ensure that a cache block is not evicted
417 : // and overwritten, while there are active readers.
418 : readLockTakenInc = 1
419 : // -1 lockState indicates that some cache block is currently being populated with data from
420 : // blob storage. It is used to ensure that a cache block is not read or evicted again, while
421 : // it is being populated.
422 : writeLockTaken = -1
423 : )
424 :
425 : func (s *shard) init(
426 : cache *Cache,
427 : fs vfs.FS,
428 : fsDir string,
429 : shardIdx int,
430 : sizeInBlocks int64,
431 : blockSize int,
432 : shardingBlockSize int64,
433 1 : ) error {
434 1 : *s = shard{
435 1 : cache: cache,
436 1 : sizeInBlocks: sizeInBlocks,
437 1 : }
438 1 : if blockSize < 1024 || shardingBlockSize%int64(blockSize) != 0 {
439 0 : return errors.Newf("invalid block size %d (must divide %d)", blockSize, shardingBlockSize)
440 0 : }
441 1 : s.bm = makeBlockMath(blockSize)
442 1 : s.shardingBlockSize = shardingBlockSize
443 1 : file, err := fs.OpenReadWrite(fs.PathJoin(fsDir, fmt.Sprintf("SHARED-CACHE-%03d", shardIdx)))
444 1 : if err != nil {
445 0 : return err
446 0 : }
447 : // TODO(radu): truncate file if necessary (especially important if we restart
448 : // with more shards).
449 1 : if err := file.Preallocate(0, int64(blockSize)*sizeInBlocks); err != nil {
450 0 : return err
451 0 : }
452 1 : s.file = file
453 1 :
454 1 : // TODO(josh): Right now, the secondary cache is not persistent. All existing
455 1 : // cache contents will be over-written, since all metadata is only stored in
456 1 : // memory.
457 1 : s.mu.where = make(whereMap)
458 1 : s.mu.blocks = make([]cacheBlockState, sizeInBlocks)
459 1 : s.mu.lruHead = invalidBlockIndex
460 1 : s.mu.freeHead = invalidBlockIndex
461 1 : for i := range s.mu.blocks {
462 1 : s.freePush(cacheBlockIndex(i))
463 1 : }
464 :
465 1 : return nil
466 : }
467 :
468 1 : func (s *shard) close() error {
469 1 : defer func() {
470 1 : s.file = nil
471 1 : }()
472 1 : return s.file.Close()
473 : }
474 :
475 : // freePush pushes a block to the front of the free list.
476 1 : func (s *shard) freePush(index cacheBlockIndex) {
477 1 : s.mu.blocks[index].next = s.mu.freeHead
478 1 : s.mu.freeHead = index
479 1 : }
480 :
481 : // freePop removes the block from the front of the free list. Must not be called
482 : // if the list is empty (i.e. freeHead = invalidBlockIndex).
483 1 : func (s *shard) freePop() cacheBlockIndex {
484 1 : index := s.mu.freeHead
485 1 : s.mu.freeHead = s.mu.blocks[index].next
486 1 : return index
487 1 : }
488 :
489 : // lruInsertFront inserts a block at the front of the LRU list.
490 1 : func (s *shard) lruInsertFront(index cacheBlockIndex) {
491 1 : b := &s.mu.blocks[index]
492 1 : if s.mu.lruHead == invalidBlockIndex {
493 1 : b.next = index
494 1 : b.prev = index
495 1 : } else {
496 1 : b.next = s.mu.lruHead
497 1 : h := &s.mu.blocks[s.mu.lruHead]
498 1 : b.prev = h.prev
499 1 : s.mu.blocks[h.prev].next = index
500 1 : h.prev = index
501 1 : }
502 1 : s.mu.lruHead = index
503 : }
504 :
505 1 : func (s *shard) lruNext(index cacheBlockIndex) cacheBlockIndex {
506 1 : return s.mu.blocks[index].next
507 1 : }
508 :
509 1 : func (s *shard) lruPrev(index cacheBlockIndex) cacheBlockIndex {
510 1 : return s.mu.blocks[index].prev
511 1 : }
512 :
513 : // lruUnlink removes a block from the LRU list.
514 1 : func (s *shard) lruUnlink(index cacheBlockIndex) {
515 1 : b := &s.mu.blocks[index]
516 1 : if b.next == index {
517 1 : s.mu.lruHead = invalidBlockIndex
518 1 : } else {
519 1 : s.mu.blocks[b.prev].next = b.next
520 1 : s.mu.blocks[b.next].prev = b.prev
521 1 : if s.mu.lruHead == index {
522 1 : s.mu.lruHead = b.next
523 1 : }
524 : }
525 1 : b.next, b.prev = invalidBlockIndex, invalidBlockIndex
526 : }
527 :
528 : // get attempts to read the requested data from the shard. The data must not
529 : // cross a shard boundary.
530 : //
531 : // If all data is available, returns n = len(p).
532 : //
533 : // If data is partially available, a prefix of the data is read; returns n < len(p)
534 : // and no error. If no prefix is available, returns n = 0 and no error.
535 : //
536 : // TODO(josh): Today, if there are two cache blocks needed to satisfy a read, and the
537 : // first block is not in the cache and the second one is, we will read both from
538 : // blob storage. We should fix this. This is not an unlikely scenario if we are doing
539 : // a reverse scan, since those iterate over sstable blocks in reverse order and due to
540 : // cache block aligned reads will have read the suffix of the sstable block that will
541 : // be needed next.
542 1 : func (s *shard) get(fileNum base.DiskFileNum, p []byte, ofs int64) (n int, _ error) {
543 1 : if invariants.Enabled {
544 0 : if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
545 0 : panic(fmt.Sprintf("get crosses shard boundary: %v %v", ofs, len(p)))
546 : }
547 0 : s.assertShardStateIsConsistent()
548 : }
549 :
550 : // The data extent might cross cache block boundaries, hence the loop. In the hot
551 : // path, max two iterations of this loop will be executed, since reads are sized
552 : // in units of sstable block size.
553 1 : var multiBlock bool
554 1 : for {
555 1 : k := logicalBlockID{
556 1 : filenum: fileNum,
557 1 : cacheBlockIdx: s.bm.Block(ofs + int64(n)),
558 1 : }
559 1 : s.mu.Lock()
560 1 : cacheBlockIdx, ok := s.mu.where[k]
561 1 : // TODO(josh): Multiple reads within the same few milliseconds (anything that is smaller
562 1 : // than blob storage read latency) that miss on the same logical block ID will not necessarily
563 1 : // be rare. We may want to do only one read, with the later readers blocking on the first read
564 1 : // completing. This could be implemented either here or in the primary block cache. See
565 1 : // https://github.com/cockroachdb/pebble/pull/2586 for additional discussion.
566 1 : if !ok {
567 1 : s.mu.Unlock()
568 1 : return n, nil
569 1 : }
570 1 : if s.mu.blocks[cacheBlockIdx].lock == writeLockTaken {
571 1 : // In practice, if we have two reads of the same SST block in close succession, we
572 1 : // would expect the second to hit in the in-memory block cache. So it's not worth
573 1 : // optimizing this case here.
574 1 : s.mu.Unlock()
575 1 : return n, nil
576 1 : }
577 1 : s.mu.blocks[cacheBlockIdx].lock += readLockTakenInc
578 1 : // Move to front of the LRU list.
579 1 : s.lruUnlink(cacheBlockIdx)
580 1 : s.lruInsertFront(cacheBlockIdx)
581 1 : s.mu.Unlock()
582 1 :
583 1 : readAt := s.bm.BlockOffset(cacheBlockIdx)
584 1 : readSize := s.bm.BlockSize()
585 1 : if n == 0 { // if first read
586 1 : rem := s.bm.Remainder(ofs)
587 1 : readAt += rem
588 1 : readSize -= int(rem)
589 1 : }
590 :
591 1 : if len(p[n:]) <= readSize {
592 1 : start := time.Now()
593 1 : numRead, err := s.file.ReadAt(p[n:], readAt)
594 1 : s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
595 1 : s.dropReadLock(cacheBlockIdx)
596 1 : return n + numRead, err
597 1 : }
598 1 : start := time.Now()
599 1 : numRead, err := s.file.ReadAt(p[n:n+readSize], readAt)
600 1 : s.cache.metrics.diskReadLatency.Observe(float64(time.Since(start)))
601 1 : s.dropReadLock(cacheBlockIdx)
602 1 : if err != nil {
603 0 : return 0, err
604 0 : }
605 :
606 : // Note that numRead == readSize, since we checked for an error above.
607 1 : n += numRead
608 1 :
609 1 : if !multiBlock {
610 1 : s.cache.metrics.multiBlockReads.Add(1)
611 1 : multiBlock = true
612 1 : }
613 : }
614 : }
615 :
616 : // set attempts to write the requested data to the shard. The data must not
617 : // cross a shard boundary, and both ofs & len(p) must be multiples of the
618 : // block size.
619 : //
620 : // If all of p is not written to the shard, set returns a non-nil error.
621 1 : func (s *shard) set(fileNum base.DiskFileNum, p []byte, ofs int64) error {
622 1 : if invariants.Enabled {
623 0 : if ofs/s.shardingBlockSize != (ofs+int64(len(p))-1)/s.shardingBlockSize {
624 0 : panic(fmt.Sprintf("set crosses shard boundary: %v %v", ofs, len(p)))
625 : }
626 0 : if s.bm.Remainder(ofs) != 0 || s.bm.Remainder(int64(len(p))) != 0 {
627 0 : panic(fmt.Sprintf("set with ofs & len not multiples of block size: %v %v", ofs, len(p)))
628 : }
629 0 : s.assertShardStateIsConsistent()
630 : }
631 :
632 : // The data extent might cross cache block boundaries, hence the loop. In the hot
633 : // path, max two iterations of this loop will be executed, since reads are sized
634 : // in units of sstable block size.
635 1 : n := 0
636 1 : for {
637 1 : if n == len(p) {
638 1 : return nil
639 1 : }
640 1 : if invariants.Enabled {
641 0 : if n > len(p) {
642 0 : panic(fmt.Sprintf("set with n greater than len(p): %v %v", n, len(p)))
643 : }
644 : }
645 :
646 : // If the logical block is already in the cache, we should skip doing a set.
647 1 : k := logicalBlockID{
648 1 : filenum: fileNum,
649 1 : cacheBlockIdx: s.bm.Block(ofs + int64(n)),
650 1 : }
651 1 : s.mu.Lock()
652 1 : if _, ok := s.mu.where[k]; ok {
653 1 : s.mu.Unlock()
654 1 : n += s.bm.BlockSize()
655 1 : continue
656 : }
657 :
658 1 : var cacheBlockIdx cacheBlockIndex
659 1 : if s.mu.freeHead == invalidBlockIndex {
660 1 : if invariants.Enabled && s.mu.lruHead == invalidBlockIndex {
661 0 : panic("both LRU and free lists empty")
662 : }
663 :
664 : // Find the last element in the LRU list which is not locked.
665 1 : for idx := s.lruPrev(s.mu.lruHead); ; idx = s.lruPrev(idx) {
666 1 : if lock := s.mu.blocks[idx].lock; lock == unlocked {
667 1 : cacheBlockIdx = idx
668 1 : break
669 : }
670 0 : if idx == s.mu.lruHead {
671 0 : // No unlocked block to evict.
672 0 : //
673 0 : // TODO(josh): We may want to block until a block frees up, instead of returning
674 0 : // an error here. But I think we can do that later on, e.g. after running some production
675 0 : // experiments.
676 0 : s.mu.Unlock()
677 0 : return errors.New("no block to evict so skipping write to cache")
678 0 : }
679 : }
680 1 : s.cache.metrics.evictions.Add(1)
681 1 : s.lruUnlink(cacheBlockIdx)
682 1 : delete(s.mu.where, s.mu.blocks[cacheBlockIdx].logical)
683 1 : } else {
684 1 : s.cache.metrics.count.Add(1)
685 1 : cacheBlockIdx = s.freePop()
686 1 : }
687 :
688 1 : s.lruInsertFront(cacheBlockIdx)
689 1 : s.mu.where[k] = cacheBlockIdx
690 1 : s.mu.blocks[cacheBlockIdx].logical = k
691 1 : s.mu.blocks[cacheBlockIdx].lock = writeLockTaken
692 1 : s.mu.Unlock()
693 1 :
694 1 : writeAt := s.bm.BlockOffset(cacheBlockIdx)
695 1 :
696 1 : writeSize := s.bm.BlockSize()
697 1 : if len(p[n:]) <= writeSize {
698 1 : writeSize = len(p[n:])
699 1 : }
700 :
701 1 : start := time.Now()
702 1 : _, err := s.file.WriteAt(p[n:n+writeSize], writeAt)
703 1 : s.cache.metrics.diskWriteLatency.Observe(float64(time.Since(start)))
704 1 : if err != nil {
705 0 : // Free the block.
706 0 : s.mu.Lock()
707 0 : defer s.mu.Unlock()
708 0 :
709 0 : delete(s.mu.where, k)
710 0 : s.lruUnlink(cacheBlockIdx)
711 0 : s.freePush(cacheBlockIdx)
712 0 : return err
713 0 : }
714 1 : s.dropWriteLock(cacheBlockIdx)
715 1 : n += writeSize
716 : }
717 : }
718 :
719 : // Doesn't inline currently. This might be okay, but something to keep in mind.
720 1 : func (s *shard) dropReadLock(cacheBlockInd cacheBlockIndex) {
721 1 : s.mu.Lock()
722 1 : s.mu.blocks[cacheBlockInd].lock -= readLockTakenInc
723 1 : if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock < 0 {
724 0 : panic(fmt.Sprintf("unexpected lock state %v in dropReadLock", s.mu.blocks[cacheBlockInd].lock))
725 : }
726 1 : s.mu.Unlock()
727 : }
728 :
729 : // Doesn't inline currently. This might be okay, but something to keep in mind.
730 1 : func (s *shard) dropWriteLock(cacheBlockInd cacheBlockIndex) {
731 1 : s.mu.Lock()
732 1 : if invariants.Enabled && s.mu.blocks[cacheBlockInd].lock != writeLockTaken {
733 0 : panic(fmt.Sprintf("unexpected lock state %v in dropWriteLock", s.mu.blocks[cacheBlockInd].lock))
734 : }
735 1 : s.mu.blocks[cacheBlockInd].lock = unlocked
736 1 : s.mu.Unlock()
737 : }
738 :
739 0 : func (s *shard) assertShardStateIsConsistent() {
740 0 : s.mu.Lock()
741 0 : defer s.mu.Unlock()
742 0 :
743 0 : lruLen := 0
744 0 : if s.mu.lruHead != invalidBlockIndex {
745 0 : for b := s.mu.lruHead; ; {
746 0 : lruLen++
747 0 : if idx, ok := s.mu.where[s.mu.blocks[b].logical]; !ok || idx != b {
748 0 : panic("block in LRU list with no entry in where map")
749 : }
750 0 : b = s.lruNext(b)
751 0 : if b == s.mu.lruHead {
752 0 : break
753 : }
754 : }
755 : }
756 0 : if lruLen != len(s.mu.where) {
757 0 : panic(fmt.Sprintf("lru list len is %d but where map has %d entries", lruLen, len(s.mu.where)))
758 : }
759 0 : freeLen := 0
760 0 : for n := s.mu.freeHead; n != invalidBlockIndex; n = s.mu.blocks[n].next {
761 0 : freeLen++
762 0 : }
763 :
764 0 : if lruLen+freeLen != int(s.sizeInBlocks) {
765 0 : panic(fmt.Sprintf("%d lru blocks and %d free blocks don't add up to %d", lruLen, freeLen, s.sizeInBlocks))
766 : }
767 0 : for i := range s.mu.blocks {
768 0 : if state := s.mu.blocks[i].lock; state < writeLockTaken {
769 0 : panic(fmt.Sprintf("lock state %v is not allowed", state))
770 : }
771 : }
772 : }
773 :
774 : // cacheBlockIndex is the index of a blockSize-aligned cache block.
775 : type cacheBlockIndex int64
776 :
777 : // invalidBlockIndex is used for the head of a list when the list is empty.
778 : const invalidBlockIndex cacheBlockIndex = -1
779 :
780 : // blockMath is a helper type for performing conversions between offsets and
781 : // block indexes.
782 : type blockMath struct {
783 : blockSizeBits int8
784 : }
785 :
786 1 : func makeBlockMath(blockSize int) blockMath {
787 1 : bm := blockMath{
788 1 : blockSizeBits: int8(bits.Len64(uint64(blockSize)) - 1),
789 1 : }
790 1 : if blockSize != (1 << bm.blockSizeBits) {
791 0 : panic(fmt.Sprintf("blockSize %d is not a power of 2", blockSize))
792 : }
793 1 : return bm
794 : }
795 :
796 1 : func (bm blockMath) mask() int64 {
797 1 : return (1 << bm.blockSizeBits) - 1
798 1 : }
799 :
800 : // BlockSize returns the block size.
801 1 : func (bm blockMath) BlockSize() int {
802 1 : return 1 << bm.blockSizeBits
803 1 : }
804 :
805 : // Block returns the block index containing the given offset.
806 1 : func (bm blockMath) Block(offset int64) cacheBlockIndex {
807 1 : return cacheBlockIndex(offset >> bm.blockSizeBits)
808 1 : }
809 :
810 : // Remainder returns the offset relative to the start of the cache block.
811 1 : func (bm blockMath) Remainder(offset int64) int64 {
812 1 : return offset & bm.mask()
813 1 : }
814 :
815 : // BlockOffset returns the object offset where the given block starts.
816 1 : func (bm blockMath) BlockOffset(block cacheBlockIndex) int64 {
817 1 : return int64(block) << bm.blockSizeBits
818 1 : }
819 :
820 : // RoundUp rounds up the given value to the closest multiple of block size.
821 1 : func (bm blockMath) RoundUp(x int64) int64 {
822 1 : return (x + bm.mask()) & ^(bm.mask())
823 1 : }
824 :
825 : type writeWorkers struct {
826 : doneCh chan struct{}
827 : doneWaitGroup sync.WaitGroup
828 :
829 : numWorkers int
830 : tasksCh chan writeTask
831 : }
832 :
833 : type writeTask struct {
834 : fileNum base.DiskFileNum
835 : p []byte
836 : offset int64
837 : }
838 :
839 : // Start starts the worker goroutines.
840 1 : func (w *writeWorkers) Start(c *Cache, numWorkers int) {
841 1 : doneCh := make(chan struct{})
842 1 : tasksCh := make(chan writeTask, numWorkers*writeTasksPerWorker)
843 1 :
844 1 : w.numWorkers = numWorkers
845 1 : w.doneCh = doneCh
846 1 : w.tasksCh = tasksCh
847 1 : w.doneWaitGroup.Add(numWorkers)
848 1 : for i := 0; i < numWorkers; i++ {
849 1 : go func() {
850 1 : defer w.doneWaitGroup.Done()
851 1 : for {
852 1 : select {
853 1 : case <-doneCh:
854 1 : return
855 1 : case task, ok := <-tasksCh:
856 1 : if !ok {
857 1 : // The tasks channel was closed; this is used in testing code to
858 1 : // ensure all writes are completed.
859 1 : return
860 1 : }
861 : // TODO(radu): set() can perform multiple writes; perhaps each one
862 : // should be its own task.
863 1 : start := time.Now()
864 1 : err := c.set(task.fileNum, task.p, task.offset)
865 1 : c.metrics.putLatency.Observe(float64(time.Since(start)))
866 1 : if err != nil {
867 0 : c.metrics.writeBackFailures.Add(1)
868 0 : // TODO(radu): throttle logs.
869 0 : c.logger.Infof("writing back to cache after miss failed: %v", err)
870 0 : }
871 : }
872 : }
873 : }()
874 : }
875 : }
876 :
877 : // Stop waits for any in-progress writes to complete and stops the worker
878 : // goroutines and waits for any in-pro. Any queued writes not yet started are
879 : // discarded.
880 1 : func (w *writeWorkers) Stop() {
881 1 : close(w.doneCh)
882 1 : w.doneCh = nil
883 1 : w.tasksCh = nil
884 1 : w.doneWaitGroup.Wait()
885 1 : }
886 :
887 : // QueueWrite adds a write task to the queue. Can block if the queue is full.
888 1 : func (w *writeWorkers) QueueWrite(fileNum base.DiskFileNum, p []byte, offset int64) {
889 1 : w.tasksCh <- writeTask{
890 1 : fileNum: fileNum,
891 1 : p: p,
892 1 : offset: offset,
893 1 : }
894 1 : }
|