LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - vfs_readable.go (source / functions) Hit Total Coverage
Test: 2024-08-11 08:16Z 791b3749 - tests only.lcov Lines: 104 120 86.7 %
Date: 2024-08-11 08:16:58 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "os"
      11             :         "sync"
      12             : 
      13             :         "github.com/cockroachdb/pebble/internal/invariants"
      14             :         "github.com/cockroachdb/pebble/objstorage"
      15             :         "github.com/cockroachdb/pebble/vfs"
      16             : )
      17             : 
      18             : const fileMaxReadaheadSize = 256 * 1024 /* 256KB */
      19             : 
      20             : // fileReadable implements objstorage.Readable on top of a vfs.File.
      21             : //
      22             : // The implementation might use Prealloc and might reopen the file with
      23             : // SequentialReadsOption.
      24             : type fileReadable struct {
      25             :         file vfs.File
      26             :         size int64
      27             : 
      28             :         readaheadConfig *ReadaheadConfig
      29             : 
      30             :         // The following fields are used to possibly open the file again using the
      31             :         // sequential reads option (see vfsReadHandle).
      32             :         filename string
      33             :         fs       vfs.FS
      34             : }
      35             : 
      36             : var _ objstorage.Readable = (*fileReadable)(nil)
      37             : 
      38             : func newFileReadable(
      39             :         file vfs.File, fs vfs.FS, readaheadConfig *ReadaheadConfig, filename string,
      40           1 : ) (*fileReadable, error) {
      41           1 :         info, err := file.Stat()
      42           1 :         if err != nil {
      43           1 :                 return nil, err
      44           1 :         }
      45           1 :         r := &fileReadable{
      46           1 :                 file:            file,
      47           1 :                 size:            info.Size(),
      48           1 :                 filename:        filename,
      49           1 :                 fs:              fs,
      50           1 :                 readaheadConfig: readaheadConfig,
      51           1 :         }
      52           1 :         invariants.SetFinalizer(r, func(obj interface{}) {
      53           1 :                 if obj.(*fileReadable).file != nil {
      54           0 :                         fmt.Fprintf(os.Stderr, "Readable was not closed")
      55           0 :                         os.Exit(1)
      56           0 :                 }
      57             :         })
      58           1 :         return r, nil
      59             : }
      60             : 
      61             : // ReadAt is part of the objstorage.Readable interface.
      62           1 : func (r *fileReadable) ReadAt(_ context.Context, p []byte, off int64) error {
      63           1 :         n, err := r.file.ReadAt(p, off)
      64           1 :         if invariants.Enabled && err == nil && n != len(p) {
      65           0 :                 panic("short read")
      66             :         }
      67           1 :         return err
      68             : }
      69             : 
      70             : // Close is part of the objstorage.Readable interface.
      71           1 : func (r *fileReadable) Close() error {
      72           1 :         defer func() { r.file = nil }()
      73           1 :         return r.file.Close()
      74             : }
      75             : 
      76             : // Size is part of the objstorage.Readable interface.
      77           1 : func (r *fileReadable) Size() int64 {
      78           1 :         return r.size
      79           1 : }
      80             : 
      81             : // NewReadHandle is part of the objstorage.Readable interface.
      82             : func (r *fileReadable) NewReadHandle(
      83             :         readBeforeSize objstorage.ReadBeforeSize,
      84           1 : ) objstorage.ReadHandle {
      85           1 :         rh := readHandlePool.Get().(*vfsReadHandle)
      86           1 :         rh.init(r)
      87           1 :         return rh
      88           1 : }
      89             : 
      90             : type vfsReadHandle struct {
      91             :         r             *fileReadable
      92             :         rs            readaheadState
      93             :         readaheadMode ReadaheadMode
      94             : 
      95             :         // sequentialFile holds a file descriptor to the same underlying File,
      96             :         // except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of
      97             :         // OS-level readahead. Once this is non-nil, the other variables in
      98             :         // readaheadState don't matter much as we defer to OS-level readahead.
      99             :         sequentialFile vfs.File
     100             : }
     101             : 
     102             : var _ objstorage.ReadHandle = (*vfsReadHandle)(nil)
     103             : 
     104             : var readHandlePool = sync.Pool{
     105           1 :         New: func() interface{} {
     106           1 :                 i := &vfsReadHandle{}
     107           1 :                 // Note: this is a no-op if invariants are disabled or race is enabled.
     108           1 :                 invariants.SetFinalizer(i, func(obj interface{}) {
     109           1 :                         if obj.(*vfsReadHandle).r != nil {
     110           0 :                                 fmt.Fprintf(os.Stderr, "ReadHandle was not closed")
     111           0 :                                 os.Exit(1)
     112           0 :                         }
     113             :                 })
     114           1 :                 return i
     115             :         },
     116             : }
     117             : 
     118           1 : func (rh *vfsReadHandle) init(r *fileReadable) {
     119           1 :         *rh = vfsReadHandle{
     120           1 :                 r:             r,
     121           1 :                 rs:            makeReadaheadState(fileMaxReadaheadSize),
     122           1 :                 readaheadMode: r.readaheadConfig.Speculative(),
     123           1 :         }
     124           1 : }
     125             : 
     126             : // Close is part of the objstorage.ReadHandle interface.
     127           1 : func (rh *vfsReadHandle) Close() error {
     128           1 :         var err error
     129           1 :         if rh.sequentialFile != nil {
     130           1 :                 err = rh.sequentialFile.Close()
     131           1 :         }
     132           1 :         *rh = vfsReadHandle{}
     133           1 :         readHandlePool.Put(rh)
     134           1 :         return err
     135             : }
     136             : 
     137             : // ReadAt is part of the objstorage.ReadHandle interface.
     138           1 : func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error {
     139           1 :         if rh.sequentialFile != nil {
     140           1 :                 // Use OS-level read-ahead.
     141           1 :                 n, err := rh.sequentialFile.ReadAt(p, offset)
     142           1 :                 if invariants.Enabled && err == nil && n != len(p) {
     143           0 :                         panic("short read")
     144             :                 }
     145           1 :                 return err
     146             :         }
     147           1 :         if rh.readaheadMode != NoReadahead {
     148           1 :                 if readaheadSize := rh.rs.maybeReadahead(offset, int64(len(p))); readaheadSize > 0 {
     149           1 :                         if rh.readaheadMode == FadviseSequential && readaheadSize >= fileMaxReadaheadSize {
     150           1 :                                 // We've reached the maximum readahead size. Beyond this point, rely on
     151           1 :                                 // OS-level readahead.
     152           1 :                                 rh.switchToOSReadahead()
     153           1 :                         } else {
     154           1 :                                 _ = rh.r.file.Prefetch(offset, readaheadSize)
     155           1 :                         }
     156             :                 }
     157             :         }
     158           1 :         n, err := rh.r.file.ReadAt(p, offset)
     159           1 :         if invariants.Enabled && err == nil && n != len(p) {
     160           0 :                 panic("short read")
     161             :         }
     162           1 :         return err
     163             : }
     164             : 
     165             : // SetupForCompaction is part of the objstorage.ReadHandle interface.
     166           1 : func (rh *vfsReadHandle) SetupForCompaction() {
     167           1 :         rh.readaheadMode = rh.r.readaheadConfig.Informed()
     168           1 :         if rh.readaheadMode == FadviseSequential {
     169           1 :                 rh.switchToOSReadahead()
     170           1 :         }
     171             : }
     172             : 
     173           1 : func (rh *vfsReadHandle) switchToOSReadahead() {
     174           1 :         if invariants.Enabled && rh.readaheadMode != FadviseSequential {
     175           0 :                 panic("readheadMode not respected")
     176             :         }
     177           1 :         if rh.sequentialFile != nil {
     178           0 :                 return
     179           0 :         }
     180             : 
     181             :         // TODO(radu): we could share the reopened file descriptor across multiple
     182             :         // handles.
     183           1 :         f, err := rh.r.fs.Open(rh.r.filename, vfs.SequentialReadsOption)
     184           1 :         if err == nil {
     185           1 :                 rh.sequentialFile = f
     186           1 :         }
     187             : }
     188             : 
     189             : // RecordCacheHit is part of the objstorage.ReadHandle interface.
     190           1 : func (rh *vfsReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
     191           1 :         if rh.sequentialFile != nil || rh.readaheadMode == NoReadahead {
     192           1 :                 // Using OS-level or no readahead, so do nothing.
     193           1 :                 return
     194           1 :         }
     195           1 :         rh.rs.recordCacheHit(offset, size)
     196             : }
     197             : 
     198             : // TestingCheckMaxReadahead returns true if the ReadHandle has switched to
     199             : // OS-level read-ahead.
     200           1 : func TestingCheckMaxReadahead(rh objstorage.ReadHandle) bool {
     201           1 :         switch rh := rh.(type) {
     202           0 :         case *vfsReadHandle:
     203           0 :                 return rh.sequentialFile != nil
     204           1 :         case *PreallocatedReadHandle:
     205           1 :                 return rh.sequentialFile != nil
     206           0 :         default:
     207           0 :                 panic("unknown ReadHandle type")
     208             :         }
     209             : }
     210             : 
     211             : // PreallocatedReadHandle is used to avoid an allocation in NewReadHandle; see
     212             : // UsePreallocatedReadHandle.
     213             : type PreallocatedReadHandle struct {
     214             :         vfsReadHandle
     215             : }
     216             : 
     217             : // Close is part of the objstorage.ReadHandle interface.
     218           1 : func (rh *PreallocatedReadHandle) Close() error {
     219           1 :         var err error
     220           1 :         if rh.sequentialFile != nil {
     221           1 :                 err = rh.sequentialFile.Close()
     222           1 :         }
     223           1 :         rh.vfsReadHandle = vfsReadHandle{}
     224           1 :         return err
     225             : }
     226             : 
     227             : // UsePreallocatedReadHandle is equivalent to calling readable.NewReadHandle()
     228             : // but uses the existing storage of a PreallocatedReadHandle when possible
     229             : // (currently this happens if we are reading from a local file).
     230             : // The returned handle still needs to be closed.
     231             : func UsePreallocatedReadHandle(
     232             :         readable objstorage.Readable,
     233             :         readBeforeSize objstorage.ReadBeforeSize,
     234             :         rh *PreallocatedReadHandle,
     235           1 : ) objstorage.ReadHandle {
     236           1 :         if r, ok := readable.(*fileReadable); ok {
     237           1 :                 rh.init(r)
     238           1 :                 return rh
     239           1 :         }
     240           1 :         return readable.NewReadHandle(readBeforeSize)
     241             : }

Generated by: LCOV version 1.14