LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - vfs_readable.go (source / functions) Hit Total Coverage
Test: 2024-01-31 08:15Z 59183a9d - tests only.lcov Lines: 97 109 89.0 %
Date: 2024-01-31 08:16:25 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "os"
      11             :         "sync"
      12             : 
      13             :         "github.com/cockroachdb/pebble/internal/invariants"
      14             :         "github.com/cockroachdb/pebble/objstorage"
      15             :         "github.com/cockroachdb/pebble/vfs"
      16             : )
      17             : 
      18             : const fileMaxReadaheadSize = 256 * 1024 /* 256KB */
      19             : 
      20             : // fileReadable implements objstorage.Readable on top of a vfs.File.
      21             : //
      22             : // The implementation might use Prealloc and might reopen the file with
      23             : // SequentialReadsOption.
      24             : type fileReadable struct {
      25             :         file vfs.File
      26             :         size int64
      27             : 
      28             :         // The following fields are used to possibly open the file again using the
      29             :         // sequential reads option (see vfsReadHandle).
      30             :         filename string
      31             :         fs       vfs.FS
      32             : }
      33             : 
      34             : var _ objstorage.Readable = (*fileReadable)(nil)
      35             : 
      36           1 : func newFileReadable(file vfs.File, fs vfs.FS, filename string) (*fileReadable, error) {
      37           1 :         info, err := file.Stat()
      38           1 :         if err != nil {
      39           1 :                 return nil, err
      40           1 :         }
      41           1 :         r := &fileReadable{
      42           1 :                 file:     file,
      43           1 :                 size:     info.Size(),
      44           1 :                 filename: filename,
      45           1 :                 fs:       fs,
      46           1 :         }
      47           1 :         invariants.SetFinalizer(r, func(obj interface{}) {
      48           1 :                 if obj.(*fileReadable).file != nil {
      49           0 :                         fmt.Fprintf(os.Stderr, "Readable was not closed")
      50           0 :                         os.Exit(1)
      51           0 :                 }
      52             :         })
      53           1 :         return r, nil
      54             : }
      55             : 
      56             : // ReadAt is part of the objstorage.Readable interface.
      57           1 : func (r *fileReadable) ReadAt(_ context.Context, p []byte, off int64) error {
      58           1 :         n, err := r.file.ReadAt(p, off)
      59           1 :         if invariants.Enabled && err == nil && n != len(p) {
      60           0 :                 panic("short read")
      61             :         }
      62           1 :         return err
      63             : }
      64             : 
      65             : // Close is part of the objstorage.Readable interface.
      66           1 : func (r *fileReadable) Close() error {
      67           1 :         defer func() { r.file = nil }()
      68           1 :         return r.file.Close()
      69             : }
      70             : 
      71             : // Size is part of the objstorage.Readable interface.
      72           1 : func (r *fileReadable) Size() int64 {
      73           1 :         return r.size
      74           1 : }
      75             : 
      76             : // NewReadHandle is part of the objstorage.Readable interface.
      77           1 : func (r *fileReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle {
      78           1 :         rh := readHandlePool.Get().(*vfsReadHandle)
      79           1 :         rh.r = r
      80           1 :         rh.rs = makeReadaheadState(fileMaxReadaheadSize)
      81           1 :         return rh
      82           1 : }
      83             : 
      84             : type vfsReadHandle struct {
      85             :         r  *fileReadable
      86             :         rs readaheadState
      87             : 
      88             :         // sequentialFile holds a file descriptor to the same underlying File,
      89             :         // except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of
      90             :         // OS-level readahead. Once this is non-nil, the other variables in
      91             :         // readaheadState don't matter much as we defer to OS-level readahead.
      92             :         sequentialFile vfs.File
      93             : }
      94             : 
      95             : var _ objstorage.ReadHandle = (*vfsReadHandle)(nil)
      96             : 
      97             : var readHandlePool = sync.Pool{
      98           1 :         New: func() interface{} {
      99           1 :                 i := &vfsReadHandle{}
     100           1 :                 // Note: this is a no-op if invariants are disabled or race is enabled.
     101           1 :                 invariants.SetFinalizer(i, func(obj interface{}) {
     102           1 :                         if obj.(*vfsReadHandle).r != nil {
     103           0 :                                 fmt.Fprintf(os.Stderr, "ReadHandle was not closed")
     104           0 :                                 os.Exit(1)
     105           0 :                         }
     106             :                 })
     107           1 :                 return i
     108             :         },
     109             : }
     110             : 
     111             : // Close is part of the objstorage.ReadHandle interface.
     112           1 : func (rh *vfsReadHandle) Close() error {
     113           1 :         var err error
     114           1 :         if rh.sequentialFile != nil {
     115           1 :                 err = rh.sequentialFile.Close()
     116           1 :         }
     117           1 :         *rh = vfsReadHandle{}
     118           1 :         readHandlePool.Put(rh)
     119           1 :         return err
     120             : }
     121             : 
     122             : // ReadAt is part of the objstorage.ReadHandle interface.
     123           1 : func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error {
     124           1 :         var n int
     125           1 :         var err error
     126           1 :         if rh.sequentialFile != nil {
     127           1 :                 // Use OS-level read-ahead.
     128           1 :                 n, err = rh.sequentialFile.ReadAt(p, offset)
     129           1 :         } else {
     130           1 :                 if readaheadSize := rh.rs.maybeReadahead(offset, int64(len(p))); readaheadSize > 0 {
     131           1 :                         if readaheadSize >= fileMaxReadaheadSize {
     132           1 :                                 // We've reached the maximum readahead size. Beyond this point, rely on
     133           1 :                                 // OS-level readahead.
     134           1 :                                 rh.switchToOSReadahead()
     135           1 :                         } else {
     136           1 :                                 _ = rh.r.file.Prefetch(offset, readaheadSize)
     137           1 :                         }
     138             :                 }
     139           1 :                 n, err = rh.r.file.ReadAt(p, offset)
     140             :         }
     141           1 :         if invariants.Enabled && err == nil && n != len(p) {
     142           0 :                 panic("short read")
     143             :         }
     144           1 :         return err
     145             : }
     146             : 
     147             : // SetupForCompaction is part of the objstorage.ReadHandle interface.
     148           1 : func (rh *vfsReadHandle) SetupForCompaction() {
     149           1 :         rh.switchToOSReadahead()
     150           1 : }
     151             : 
     152           1 : func (rh *vfsReadHandle) switchToOSReadahead() {
     153           1 :         if rh.sequentialFile != nil {
     154           0 :                 return
     155           0 :         }
     156             : 
     157             :         // TODO(radu): we could share the reopened file descriptor across multiple
     158             :         // handles.
     159           1 :         f, err := rh.r.fs.Open(rh.r.filename, vfs.SequentialReadsOption)
     160           1 :         if err == nil {
     161           1 :                 rh.sequentialFile = f
     162           1 :         }
     163             : }
     164             : 
     165             : // RecordCacheHit is part of the objstorage.ReadHandle interface.
     166           1 : func (rh *vfsReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
     167           1 :         if rh.sequentialFile != nil {
     168           1 :                 // Using OS-level readahead, so do nothing.
     169           1 :                 return
     170           1 :         }
     171           1 :         rh.rs.recordCacheHit(offset, size)
     172             : }
     173             : 
     174             : // TestingCheckMaxReadahead returns true if the ReadHandle has switched to
     175             : // OS-level read-ahead.
     176           1 : func TestingCheckMaxReadahead(rh objstorage.ReadHandle) bool {
     177           1 :         switch rh := rh.(type) {
     178           1 :         case *vfsReadHandle:
     179           1 :                 return rh.sequentialFile != nil
     180           1 :         case *PreallocatedReadHandle:
     181           1 :                 return rh.sequentialFile != nil
     182           0 :         default:
     183           0 :                 panic("unknown ReadHandle type")
     184             :         }
     185             : }
     186             : 
     187             : // PreallocatedReadHandle is used to avoid an allocation in NewReadHandle; see
     188             : // UsePreallocatedReadHandle.
     189             : type PreallocatedReadHandle struct {
     190             :         vfsReadHandle
     191             : }
     192             : 
     193             : // Close is part of the objstorage.ReadHandle interface.
     194           1 : func (rh *PreallocatedReadHandle) Close() error {
     195           1 :         var err error
     196           1 :         if rh.sequentialFile != nil {
     197           1 :                 err = rh.sequentialFile.Close()
     198           1 :         }
     199           1 :         rh.vfsReadHandle = vfsReadHandle{}
     200           1 :         return err
     201             : }
     202             : 
     203             : // UsePreallocatedReadHandle is equivalent to calling readable.NewReadHandle()
     204             : // but uses the existing storage of a PreallocatedReadHandle when possible
     205             : // (currently this happens if we are reading from a local file).
     206             : // The returned handle still needs to be closed.
     207             : func UsePreallocatedReadHandle(
     208             :         ctx context.Context, readable objstorage.Readable, rh *PreallocatedReadHandle,
     209           1 : ) objstorage.ReadHandle {
     210           1 :         if r, ok := readable.(*fileReadable); ok {
     211           1 :                 // See fileReadable.NewReadHandle.
     212           1 :                 rh.vfsReadHandle = vfsReadHandle{r: r}
     213           1 :                 return rh
     214           1 :         }
     215           1 :         return readable.NewReadHandle(ctx)
     216             : }

Generated by: LCOV version 1.14