LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - vfs_readable.go (source / functions) Hit Total Coverage
Test: 2024-07-09 08:16Z 7920d968 - tests + meta.lcov Lines: 104 120 86.7 %
Date: 2024-07-09 08:17:33 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "os"
      11             :         "sync"
      12             : 
      13             :         "github.com/cockroachdb/pebble/internal/invariants"
      14             :         "github.com/cockroachdb/pebble/objstorage"
      15             :         "github.com/cockroachdb/pebble/vfs"
      16             : )
      17             : 
      18             : const fileMaxReadaheadSize = 256 * 1024 /* 256KB */
      19             : 
      20             : // fileReadable implements objstorage.Readable on top of a vfs.File.
      21             : //
      22             : // The implementation might use Prealloc and might reopen the file with
      23             : // SequentialReadsOption.
      24             : type fileReadable struct {
      25             :         file vfs.File
      26             :         size int64
      27             : 
      28             :         readaheadConfig *ReadaheadConfig
      29             : 
      30             :         // The following fields are used to possibly open the file again using the
      31             :         // sequential reads option (see vfsReadHandle).
      32             :         filename string
      33             :         fs       vfs.FS
      34             : }
      35             : 
      36             : var _ objstorage.Readable = (*fileReadable)(nil)
      37             : 
      38             : func newFileReadable(
      39             :         file vfs.File, fs vfs.FS, readaheadConfig *ReadaheadConfig, filename string,
      40           2 : ) (*fileReadable, error) {
      41           2 :         info, err := file.Stat()
      42           2 :         if err != nil {
      43           1 :                 return nil, err
      44           1 :         }
      45           2 :         r := &fileReadable{
      46           2 :                 file:            file,
      47           2 :                 size:            info.Size(),
      48           2 :                 filename:        filename,
      49           2 :                 fs:              fs,
      50           2 :                 readaheadConfig: readaheadConfig,
      51           2 :         }
      52           2 :         invariants.SetFinalizer(r, func(obj interface{}) {
      53           2 :                 if obj.(*fileReadable).file != nil {
      54           0 :                         fmt.Fprintf(os.Stderr, "Readable was not closed")
      55           0 :                         os.Exit(1)
      56           0 :                 }
      57             :         })
      58           2 :         return r, nil
      59             : }
      60             : 
      61             : // ReadAt is part of the objstorage.Readable interface.
      62           2 : func (r *fileReadable) ReadAt(_ context.Context, p []byte, off int64) error {
      63           2 :         n, err := r.file.ReadAt(p, off)
      64           2 :         if invariants.Enabled && err == nil && n != len(p) {
      65           0 :                 panic("short read")
      66             :         }
      67           2 :         return err
      68             : }
      69             : 
      70             : // Close is part of the objstorage.Readable interface.
      71           2 : func (r *fileReadable) Close() error {
      72           2 :         defer func() { r.file = nil }()
      73           2 :         return r.file.Close()
      74             : }
      75             : 
      76             : // Size is part of the objstorage.Readable interface.
      77           2 : func (r *fileReadable) Size() int64 {
      78           2 :         return r.size
      79           2 : }
      80             : 
      81             : // NewReadHandle is part of the objstorage.Readable interface.
      82             : func (r *fileReadable) NewReadHandle(
      83             :         ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
      84           2 : ) objstorage.ReadHandle {
      85           2 :         rh := readHandlePool.Get().(*vfsReadHandle)
      86           2 :         rh.init(r)
      87           2 :         return rh
      88           2 : }
      89             : 
      90             : type vfsReadHandle struct {
      91             :         r             *fileReadable
      92             :         rs            readaheadState
      93             :         readaheadMode ReadaheadMode
      94             : 
      95             :         // sequentialFile holds a file descriptor to the same underlying File,
      96             :         // except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of
      97             :         // OS-level readahead. Once this is non-nil, the other variables in
      98             :         // readaheadState don't matter much as we defer to OS-level readahead.
      99             :         sequentialFile vfs.File
     100             : }
     101             : 
     102             : var _ objstorage.ReadHandle = (*vfsReadHandle)(nil)
     103             : 
     104             : var readHandlePool = sync.Pool{
     105           2 :         New: func() interface{} {
     106           2 :                 i := &vfsReadHandle{}
     107           2 :                 // Note: this is a no-op if invariants are disabled or race is enabled.
     108           2 :                 invariants.SetFinalizer(i, func(obj interface{}) {
     109           2 :                         if obj.(*vfsReadHandle).r != nil {
     110           0 :                                 fmt.Fprintf(os.Stderr, "ReadHandle was not closed")
     111           0 :                                 os.Exit(1)
     112           0 :                         }
     113             :                 })
     114           2 :                 return i
     115             :         },
     116             : }
     117             : 
     118           2 : func (rh *vfsReadHandle) init(r *fileReadable) {
     119           2 :         *rh = vfsReadHandle{
     120           2 :                 r:             r,
     121           2 :                 rs:            makeReadaheadState(fileMaxReadaheadSize),
     122           2 :                 readaheadMode: r.readaheadConfig.Speculative(),
     123           2 :         }
     124           2 : }
     125             : 
     126             : // Close is part of the objstorage.ReadHandle interface.
     127           2 : func (rh *vfsReadHandle) Close() error {
     128           2 :         var err error
     129           2 :         if rh.sequentialFile != nil {
     130           1 :                 err = rh.sequentialFile.Close()
     131           1 :         }
     132           2 :         *rh = vfsReadHandle{}
     133           2 :         readHandlePool.Put(rh)
     134           2 :         return err
     135             : }
     136             : 
     137             : // ReadAt is part of the objstorage.ReadHandle interface.
     138           2 : func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error {
     139           2 :         if rh.sequentialFile != nil {
     140           2 :                 // Use OS-level read-ahead.
     141           2 :                 n, err := rh.sequentialFile.ReadAt(p, offset)
     142           2 :                 if invariants.Enabled && err == nil && n != len(p) {
     143           0 :                         panic("short read")
     144             :                 }
     145           2 :                 return err
     146             :         }
     147           2 :         if rh.readaheadMode != NoReadahead {
     148           2 :                 if readaheadSize := rh.rs.maybeReadahead(offset, int64(len(p))); readaheadSize > 0 {
     149           2 :                         if rh.readaheadMode == FadviseSequential && readaheadSize >= fileMaxReadaheadSize {
     150           1 :                                 // We've reached the maximum readahead size. Beyond this point, rely on
     151           1 :                                 // OS-level readahead.
     152           1 :                                 rh.switchToOSReadahead()
     153           2 :                         } else {
     154           2 :                                 _ = rh.r.file.Prefetch(offset, readaheadSize)
     155           2 :                         }
     156             :                 }
     157             :         }
     158           2 :         n, err := rh.r.file.ReadAt(p, offset)
     159           2 :         if invariants.Enabled && err == nil && n != len(p) {
     160           0 :                 panic("short read")
     161             :         }
     162           2 :         return err
     163             : }
     164             : 
     165             : // SetupForCompaction is part of the objstorage.ReadHandle interface.
     166           2 : func (rh *vfsReadHandle) SetupForCompaction() {
     167           2 :         rh.readaheadMode = rh.r.readaheadConfig.Informed()
     168           2 :         if rh.readaheadMode == FadviseSequential {
     169           2 :                 rh.switchToOSReadahead()
     170           2 :         }
     171             : }
     172             : 
     173           2 : func (rh *vfsReadHandle) switchToOSReadahead() {
     174           2 :         if invariants.Enabled && rh.readaheadMode != FadviseSequential {
     175           0 :                 panic("readheadMode not respected")
     176             :         }
     177           2 :         if rh.sequentialFile != nil {
     178           0 :                 return
     179           0 :         }
     180             : 
     181             :         // TODO(radu): we could share the reopened file descriptor across multiple
     182             :         // handles.
     183           2 :         f, err := rh.r.fs.Open(rh.r.filename, vfs.SequentialReadsOption)
     184           2 :         if err == nil {
     185           2 :                 rh.sequentialFile = f
     186           2 :         }
     187             : }
     188             : 
     189             : // RecordCacheHit is part of the objstorage.ReadHandle interface.
     190           2 : func (rh *vfsReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
     191           2 :         if rh.sequentialFile != nil || rh.readaheadMode == NoReadahead {
     192           2 :                 // Using OS-level or no readahead, so do nothing.
     193           2 :                 return
     194           2 :         }
     195           2 :         rh.rs.recordCacheHit(offset, size)
     196             : }
     197             : 
     198             : // TestingCheckMaxReadahead returns true if the ReadHandle has switched to
     199             : // OS-level read-ahead.
     200           1 : func TestingCheckMaxReadahead(rh objstorage.ReadHandle) bool {
     201           1 :         switch rh := rh.(type) {
     202           0 :         case *vfsReadHandle:
     203           0 :                 return rh.sequentialFile != nil
     204           1 :         case *PreallocatedReadHandle:
     205           1 :                 return rh.sequentialFile != nil
     206           0 :         default:
     207           0 :                 panic("unknown ReadHandle type")
     208             :         }
     209             : }
     210             : 
     211             : // PreallocatedReadHandle is used to avoid an allocation in NewReadHandle; see
     212             : // UsePreallocatedReadHandle.
     213             : type PreallocatedReadHandle struct {
     214             :         vfsReadHandle
     215             : }
     216             : 
     217             : // Close is part of the objstorage.ReadHandle interface.
     218           2 : func (rh *PreallocatedReadHandle) Close() error {
     219           2 :         var err error
     220           2 :         if rh.sequentialFile != nil {
     221           2 :                 err = rh.sequentialFile.Close()
     222           2 :         }
     223           2 :         rh.vfsReadHandle = vfsReadHandle{}
     224           2 :         return err
     225             : }
     226             : 
     227             : // UsePreallocatedReadHandle is equivalent to calling readable.NewReadHandle()
     228             : // but uses the existing storage of a PreallocatedReadHandle when possible
     229             : // (currently this happens if we are reading from a local file).
     230             : // The returned handle still needs to be closed.
     231             : func UsePreallocatedReadHandle(
     232             :         ctx context.Context,
     233             :         readable objstorage.Readable,
     234             :         readBeforeSize objstorage.ReadBeforeSize,
     235             :         rh *PreallocatedReadHandle,
     236           2 : ) objstorage.ReadHandle {
     237           2 :         if r, ok := readable.(*fileReadable); ok {
     238           2 :                 rh.init(r)
     239           2 :                 return rh
     240           2 :         }
     241           2 :         return readable.NewReadHandle(ctx, readBeforeSize)
     242             : }

Generated by: LCOV version 1.14