LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - remote_readable.go (source / functions) Hit Total Coverage
Test: 2023-09-28 08:18Z 725ebe29 - meta test only.lcov Lines: 75 87 86.2 %
Date: 2023-09-28 08:19:59 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "io"
      10             : 
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/objstorage"
      13             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      14             :         "github.com/cockroachdb/pebble/objstorage/remote"
      15             : )
      16             : 
      17             : const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */
      18             : 
      19             : // remoteReadable is a very simple implementation of Readable on top of the
      20             : // ReadCloser returned by remote.Storage.CreateObject.
      21             : type remoteReadable struct {
      22             :         objReader remote.ObjectReader
      23             :         size      int64
      24             :         fileNum   base.DiskFileNum
      25             :         provider  *provider
      26             : }
      27             : 
      28             : var _ objstorage.Readable = (*remoteReadable)(nil)
      29             : 
      30             : func (p *provider) newRemoteReadable(
      31             :         objReader remote.ObjectReader, size int64, fileNum base.DiskFileNum,
      32           1 : ) *remoteReadable {
      33           1 :         return &remoteReadable{
      34           1 :                 objReader: objReader,
      35           1 :                 size:      size,
      36           1 :                 fileNum:   fileNum,
      37           1 :                 provider:  p,
      38           1 :         }
      39           1 : }
      40             : 
      41             : // ReadAt is part of the objstorage.Readable interface.
      42           1 : func (r *remoteReadable) ReadAt(ctx context.Context, p []byte, offset int64) error {
      43           1 :         return r.readInternal(ctx, p, offset, false /* forCompaction */)
      44           1 : }
      45             : 
      46             : // readInternal performs a read for the object, using the cache when
      47             : // appropriate.
      48             : func (r *remoteReadable) readInternal(
      49             :         ctx context.Context, p []byte, offset int64, forCompaction bool,
      50           1 : ) error {
      51           1 :         if cache := r.provider.remote.cache; cache != nil {
      52           1 :                 flags := sharedcache.ReadFlags{
      53           1 :                         // Don't add data to the cache if this read is for a compaction.
      54           1 :                         ReadOnly: forCompaction,
      55           1 :                 }
      56           1 :                 return r.provider.remote.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
      57           1 :         }
      58           1 :         return r.objReader.ReadAt(ctx, p, offset)
      59             : }
      60             : 
      61           1 : func (r *remoteReadable) Close() error {
      62           1 :         defer func() { r.objReader = nil }()
      63           1 :         return r.objReader.Close()
      64             : }
      65             : 
      66           1 : func (r *remoteReadable) Size() int64 {
      67           1 :         return r.size
      68           1 : }
      69             : 
      70           1 : func (r *remoteReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
      71           1 :         // TODO(radu): use a pool.
      72           1 :         rh := &remoteReadHandle{readable: r}
      73           1 :         rh.readahead.state = makeReadaheadState(remoteMaxReadaheadSize)
      74           1 :         return rh
      75           1 : }
      76             : 
      77             : type remoteReadHandle struct {
      78             :         readable  *remoteReadable
      79             :         readahead struct {
      80             :                 state  readaheadState
      81             :                 data   []byte
      82             :                 offset int64
      83             :         }
      84             :         forCompaction bool
      85             : }
      86             : 
      87             : var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)
      88             : 
      89             : // ReadAt is part of the objstorage.ReadHandle interface.
      90           1 : func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
      91           1 :         readaheadSize := r.maybeReadahead(offset, len(p))
      92           1 : 
      93           1 :         // Check if we already have the data from a previous read-ahead.
      94           1 :         if rhSize := int64(len(r.readahead.data)); rhSize > 0 {
      95           1 :                 if r.readahead.offset <= offset && r.readahead.offset+rhSize > offset {
      96           1 :                         n := copy(p, r.readahead.data[offset-r.readahead.offset:])
      97           1 :                         if n == len(p) {
      98           1 :                                 // All data was available.
      99           1 :                                 return nil
     100           1 :                         }
     101             :                         // Use the data that we had and do a shorter read.
     102           0 :                         offset += int64(n)
     103           0 :                         p = p[n:]
     104           0 :                         readaheadSize -= n
     105             :                 }
     106             :         }
     107             : 
     108           1 :         if readaheadSize > len(p) {
     109           1 :                 // Don't try to read past EOF.
     110           1 :                 if offset+int64(readaheadSize) > r.readable.size {
     111           1 :                         readaheadSize = int(r.readable.size - offset)
     112           1 :                         if readaheadSize <= 0 {
     113           0 :                                 // This shouldn't happen in practice (Pebble should never try to read
     114           0 :                                 // past EOF).
     115           0 :                                 return io.EOF
     116           0 :                         }
     117             :                 }
     118           1 :                 r.readahead.offset = offset
     119           1 :                 // TODO(radu): we need to somehow account for this memory.
     120           1 :                 if cap(r.readahead.data) >= readaheadSize {
     121           0 :                         r.readahead.data = r.readahead.data[:readaheadSize]
     122           1 :                 } else {
     123           1 :                         r.readahead.data = make([]byte, readaheadSize)
     124           1 :                 }
     125             : 
     126           1 :                 if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil {
     127           0 :                         // Make sure we don't treat the data as valid next time.
     128           0 :                         r.readahead.data = r.readahead.data[:0]
     129           0 :                         return err
     130           0 :                 }
     131           1 :                 copy(p, r.readahead.data)
     132           1 :                 return nil
     133             :         }
     134             : 
     135           1 :         return r.readable.readInternal(ctx, p, offset, r.forCompaction)
     136             : }
     137             : 
     138           1 : func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
     139           1 :         if r.forCompaction {
     140           1 :                 return remoteMaxReadaheadSize
     141           1 :         }
     142           1 :         return int(r.readahead.state.maybeReadahead(offset, int64(len)))
     143             : }
     144             : 
     145             : // Close is part of the objstorage.ReadHandle interface.
     146           1 : func (r *remoteReadHandle) Close() error {
     147           1 :         r.readable = nil
     148           1 :         r.readahead.data = nil
     149           1 :         return nil
     150           1 : }
     151             : 
     152             : // SetupForCompaction is part of the objstorage.ReadHandle interface.
     153           1 : func (r *remoteReadHandle) SetupForCompaction() {
     154           1 :         r.forCompaction = true
     155           1 : }
     156             : 
     157             : // RecordCacheHit is part of the objstorage.ReadHandle interface.
     158           1 : func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
     159           1 :         if !r.forCompaction {
     160           1 :                 r.readahead.state.recordCacheHit(offset, size)
     161           1 :         }
     162             : }

Generated by: LCOV version 1.14