LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - remote_readable.go (source / functions) Hit Total Coverage
Test: 2024-03-11 08:16Z 8df4320c - tests only.lcov Lines: 79 93 84.9 %
Date: 2024-03-11 08:16:47 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "io"
      10             : 
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/objstorage"
      13             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      14             :         "github.com/cockroachdb/pebble/objstorage/remote"
      15             : )
      16             : 
      17             : // NewRemoteReadable creates an objstorage.Readable out of a remote.ObjectReader.
      18           1 : func NewRemoteReadable(objReader remote.ObjectReader, size int64) objstorage.Readable {
      19           1 :         return &remoteReadable{
      20           1 :                 objReader: objReader,
      21           1 :                 size:      size,
      22           1 :         }
      23           1 : }
      24             : 
      25             : const remoteMaxReadaheadSize = 1024 * 1024 /* 1MB */
      26             : 
      27             : // remoteReadable is a very simple implementation of Readable on top of the
      28             : // ReadCloser returned by remote.Storage.CreateObject.
      29             : type remoteReadable struct {
      30             :         objReader remote.ObjectReader
      31             :         size      int64
      32             :         fileNum   base.DiskFileNum
      33             :         cache     *sharedcache.Cache
      34             : }
      35             : 
      36             : var _ objstorage.Readable = (*remoteReadable)(nil)
      37             : 
      38             : func (p *provider) newRemoteReadable(
      39             :         objReader remote.ObjectReader, size int64, fileNum base.DiskFileNum,
      40           1 : ) *remoteReadable {
      41           1 :         return &remoteReadable{
      42           1 :                 objReader: objReader,
      43           1 :                 size:      size,
      44           1 :                 fileNum:   fileNum,
      45           1 :                 cache:     p.remote.cache,
      46           1 :         }
      47           1 : }
      48             : 
      49             : // ReadAt is part of the objstorage.Readable interface.
      50           1 : func (r *remoteReadable) ReadAt(ctx context.Context, p []byte, offset int64) error {
      51           1 :         return r.readInternal(ctx, p, offset, false /* forCompaction */)
      52           1 : }
      53             : 
      54             : // readInternal performs a read for the object, using the cache when
      55             : // appropriate.
      56             : func (r *remoteReadable) readInternal(
      57             :         ctx context.Context, p []byte, offset int64, forCompaction bool,
      58           1 : ) error {
      59           1 :         if r.cache != nil {
      60           0 :                 flags := sharedcache.ReadFlags{
      61           0 :                         // Don't add data to the cache if this read is for a compaction.
      62           0 :                         ReadOnly: forCompaction,
      63           0 :                 }
      64           0 :                 return r.cache.ReadAt(ctx, r.fileNum, p, offset, r.objReader, r.size, flags)
      65           0 :         }
      66           1 :         return r.objReader.ReadAt(ctx, p, offset)
      67             : }
      68             : 
      69           1 : func (r *remoteReadable) Close() error {
      70           1 :         defer func() { r.objReader = nil }()
      71           1 :         return r.objReader.Close()
      72             : }
      73             : 
      74           1 : func (r *remoteReadable) Size() int64 {
      75           1 :         return r.size
      76           1 : }
      77             : 
      78           1 : func (r *remoteReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
      79           1 :         // TODO(radu): use a pool.
      80           1 :         rh := &remoteReadHandle{readable: r}
      81           1 :         rh.readahead.state = makeReadaheadState(remoteMaxReadaheadSize)
      82           1 :         return rh
      83           1 : }
      84             : 
      85             : type remoteReadHandle struct {
      86             :         readable  *remoteReadable
      87             :         readahead struct {
      88             :                 state  readaheadState
      89             :                 data   []byte
      90             :                 offset int64
      91             :         }
      92             :         forCompaction bool
      93             : }
      94             : 
      95             : var _ objstorage.ReadHandle = (*remoteReadHandle)(nil)
      96             : 
      97             : // ReadAt is part of the objstorage.ReadHandle interface.
      98           1 : func (r *remoteReadHandle) ReadAt(ctx context.Context, p []byte, offset int64) error {
      99           1 :         readaheadSize := r.maybeReadahead(offset, len(p))
     100           1 : 
     101           1 :         // Check if we already have the data from a previous read-ahead.
     102           1 :         if rhSize := int64(len(r.readahead.data)); rhSize > 0 {
     103           1 :                 if r.readahead.offset <= offset && r.readahead.offset+rhSize > offset {
     104           1 :                         n := copy(p, r.readahead.data[offset-r.readahead.offset:])
     105           1 :                         if n == len(p) {
     106           1 :                                 // All data was available.
     107           1 :                                 return nil
     108           1 :                         }
     109             :                         // Use the data that we had and do a shorter read.
     110           1 :                         offset += int64(n)
     111           1 :                         p = p[n:]
     112           1 :                         readaheadSize -= n
     113             :                 }
     114             :         }
     115             : 
     116           1 :         if readaheadSize > len(p) {
     117           1 :                 // Don't try to read past EOF.
     118           1 :                 if offset+int64(readaheadSize) > r.readable.size {
     119           1 :                         readaheadSize = int(r.readable.size - offset)
     120           1 :                         if readaheadSize <= 0 {
     121           0 :                                 // This shouldn't happen in practice (Pebble should never try to read
     122           0 :                                 // past EOF).
     123           0 :                                 return io.EOF
     124           0 :                         }
     125             :                 }
     126           1 :                 r.readahead.offset = offset
     127           1 :                 // TODO(radu): we need to somehow account for this memory.
     128           1 :                 if cap(r.readahead.data) >= readaheadSize {
     129           1 :                         r.readahead.data = r.readahead.data[:readaheadSize]
     130           1 :                 } else {
     131           1 :                         r.readahead.data = make([]byte, readaheadSize)
     132           1 :                 }
     133             : 
     134           1 :                 if err := r.readable.readInternal(ctx, r.readahead.data, offset, r.forCompaction); err != nil {
     135           0 :                         // Make sure we don't treat the data as valid next time.
     136           0 :                         r.readahead.data = r.readahead.data[:0]
     137           0 :                         return err
     138           0 :                 }
     139           1 :                 copy(p, r.readahead.data)
     140           1 :                 return nil
     141             :         }
     142             : 
     143           1 :         return r.readable.readInternal(ctx, p, offset, r.forCompaction)
     144             : }
     145             : 
     146           1 : func (r *remoteReadHandle) maybeReadahead(offset int64, len int) int {
     147           1 :         if r.forCompaction {
     148           1 :                 return remoteMaxReadaheadSize
     149           1 :         }
     150           1 :         return int(r.readahead.state.maybeReadahead(offset, int64(len)))
     151             : }
     152             : 
     153             : // Close is part of the objstorage.ReadHandle interface.
     154           1 : func (r *remoteReadHandle) Close() error {
     155           1 :         r.readable = nil
     156           1 :         r.readahead.data = nil
     157           1 :         return nil
     158           1 : }
     159             : 
     160             : // SetupForCompaction is part of the objstorage.ReadHandle interface.
     161           1 : func (r *remoteReadHandle) SetupForCompaction() {
     162           1 :         r.forCompaction = true
     163           1 : }
     164             : 
     165             : // RecordCacheHit is part of the objstorage.ReadHandle interface.
     166           1 : func (r *remoteReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
     167           1 :         if !r.forCompaction {
     168           1 :                 r.readahead.state.recordCacheHit(offset, size)
     169           1 :         }
     170             : }

Generated by: LCOV version 1.14