Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package objstorageprovider 6 : 7 : import ( 8 : "context" 9 : "fmt" 10 : "os" 11 : "sync" 12 : 13 : "github.com/cockroachdb/pebble/internal/invariants" 14 : "github.com/cockroachdb/pebble/objstorage" 15 : "github.com/cockroachdb/pebble/vfs" 16 : ) 17 : 18 : const fileMaxReadaheadSize = 256 * 1024 /* 256KB */ 19 : 20 : // fileReadable implements objstorage.Readable on top of a vfs.File. 21 : // 22 : // The implementation might use Prealloc and might reopen the file with 23 : // SequentialReadsOption. 24 : type fileReadable struct { 25 : file vfs.File 26 : size int64 27 : 28 : // The following fields are used to possibly open the file again using the 29 : // sequential reads option (see vfsReadHandle). 30 : filename string 31 : fs vfs.FS 32 : } 33 : 34 : var _ objstorage.Readable = (*fileReadable)(nil) 35 : 36 1 : func newFileReadable(file vfs.File, fs vfs.FS, filename string) (*fileReadable, error) { 37 1 : info, err := file.Stat() 38 1 : if err != nil { 39 0 : return nil, err 40 0 : } 41 1 : r := &fileReadable{ 42 1 : file: file, 43 1 : size: info.Size(), 44 1 : filename: filename, 45 1 : fs: fs, 46 1 : } 47 1 : invariants.SetFinalizer(r, func(obj interface{}) { 48 1 : if obj.(*fileReadable).file != nil { 49 0 : fmt.Fprintf(os.Stderr, "Readable was not closed") 50 0 : os.Exit(1) 51 0 : } 52 : }) 53 1 : return r, nil 54 : } 55 : 56 : // ReadAt is part of the objstorage.Readable interface. 57 1 : func (r *fileReadable) ReadAt(_ context.Context, p []byte, off int64) error { 58 1 : n, err := r.file.ReadAt(p, off) 59 1 : if invariants.Enabled && err == nil && n != len(p) { 60 0 : panic("short read") 61 : } 62 1 : return err 63 : } 64 : 65 : // Close is part of the objstorage.Readable interface. 66 1 : func (r *fileReadable) Close() error { 67 1 : defer func() { r.file = nil }() 68 1 : return r.file.Close() 69 : } 70 : 71 : // Size is part of the objstorage.Readable interface. 72 1 : func (r *fileReadable) Size() int64 { 73 1 : return r.size 74 1 : } 75 : 76 : // NewReadHandle is part of the objstorage.Readable interface. 77 1 : func (r *fileReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle { 78 1 : rh := readHandlePool.Get().(*vfsReadHandle) 79 1 : rh.r = r 80 1 : rh.rs = makeReadaheadState(fileMaxReadaheadSize) 81 1 : return rh 82 1 : } 83 : 84 : type vfsReadHandle struct { 85 : r *fileReadable 86 : rs readaheadState 87 : 88 : // sequentialFile holds a file descriptor to the same underlying File, 89 : // except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of 90 : // OS-level readahead. Once this is non-nil, the other variables in 91 : // readaheadState don't matter much as we defer to OS-level readahead. 92 : sequentialFile vfs.File 93 : } 94 : 95 : var _ objstorage.ReadHandle = (*vfsReadHandle)(nil) 96 : 97 : var readHandlePool = sync.Pool{ 98 1 : New: func() interface{} { 99 1 : i := &vfsReadHandle{} 100 1 : // Note: this is a no-op if invariants are disabled or race is enabled. 101 1 : invariants.SetFinalizer(i, func(obj interface{}) { 102 1 : if obj.(*vfsReadHandle).r != nil { 103 0 : fmt.Fprintf(os.Stderr, "ReadHandle was not closed") 104 0 : os.Exit(1) 105 0 : } 106 : }) 107 1 : return i 108 : }, 109 : } 110 : 111 : // Close is part of the objstorage.ReadHandle interface. 112 1 : func (rh *vfsReadHandle) Close() error { 113 1 : var err error 114 1 : if rh.sequentialFile != nil { 115 1 : err = rh.sequentialFile.Close() 116 1 : } 117 1 : *rh = vfsReadHandle{} 118 1 : readHandlePool.Put(rh) 119 1 : return err 120 : } 121 : 122 : // ReadAt is part of the objstorage.ReadHandle interface. 123 1 : func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error { 124 1 : var n int 125 1 : var err error 126 1 : if rh.sequentialFile != nil { 127 1 : // Use OS-level read-ahead. 128 1 : n, err = rh.sequentialFile.ReadAt(p, offset) 129 1 : } else { 130 1 : if readaheadSize := rh.rs.maybeReadahead(offset, int64(len(p))); readaheadSize > 0 { 131 1 : if readaheadSize >= fileMaxReadaheadSize { 132 0 : // We've reached the maximum readahead size. Beyond this point, rely on 133 0 : // OS-level readahead. 134 0 : rh.switchToOSReadahead() 135 1 : } else { 136 1 : _ = rh.r.file.Prefetch(offset, readaheadSize) 137 1 : } 138 : } 139 1 : n, err = rh.r.file.ReadAt(p, offset) 140 : } 141 1 : if invariants.Enabled && err == nil && n != len(p) { 142 0 : panic("short read") 143 : } 144 1 : return err 145 : } 146 : 147 : // SetupForCompaction is part of the objstorage.ReadHandle interface. 148 1 : func (rh *vfsReadHandle) SetupForCompaction() { 149 1 : rh.switchToOSReadahead() 150 1 : } 151 : 152 1 : func (rh *vfsReadHandle) switchToOSReadahead() { 153 1 : if rh.sequentialFile != nil { 154 0 : return 155 0 : } 156 : 157 : // TODO(radu): we could share the reopened file descriptor across multiple 158 : // handles. 159 1 : f, err := rh.r.fs.Open(rh.r.filename, vfs.SequentialReadsOption) 160 1 : if err == nil { 161 1 : rh.sequentialFile = f 162 1 : } 163 : } 164 : 165 : // RecordCacheHit is part of the objstorage.ReadHandle interface. 166 1 : func (rh *vfsReadHandle) RecordCacheHit(_ context.Context, offset, size int64) { 167 1 : if rh.sequentialFile != nil { 168 1 : // Using OS-level readahead, so do nothing. 169 1 : return 170 1 : } 171 1 : rh.rs.recordCacheHit(offset, size) 172 : } 173 : 174 : // TestingCheckMaxReadahead returns true if the ReadHandle has switched to 175 : // OS-level read-ahead. 176 0 : func TestingCheckMaxReadahead(rh objstorage.ReadHandle) bool { 177 0 : switch rh := rh.(type) { 178 0 : case *vfsReadHandle: 179 0 : return rh.sequentialFile != nil 180 0 : case *PreallocatedReadHandle: 181 0 : return rh.sequentialFile != nil 182 0 : default: 183 0 : panic("unknown ReadHandle type") 184 : } 185 : } 186 : 187 : // PreallocatedReadHandle is used to avoid an allocation in NewReadHandle; see 188 : // UsePreallocatedReadHandle. 189 : type PreallocatedReadHandle struct { 190 : vfsReadHandle 191 : } 192 : 193 : // Close is part of the objstorage.ReadHandle interface. 194 1 : func (rh *PreallocatedReadHandle) Close() error { 195 1 : var err error 196 1 : if rh.sequentialFile != nil { 197 1 : err = rh.sequentialFile.Close() 198 1 : } 199 1 : rh.vfsReadHandle = vfsReadHandle{} 200 1 : return err 201 : } 202 : 203 : // UsePreallocatedReadHandle is equivalent to calling readable.NewReadHandle() 204 : // but uses the existing storage of a PreallocatedReadHandle when possible 205 : // (currently this happens if we are reading from a local file). 206 : // The returned handle still needs to be closed. 207 : func UsePreallocatedReadHandle( 208 : ctx context.Context, readable objstorage.Readable, rh *PreallocatedReadHandle, 209 1 : ) objstorage.ReadHandle { 210 1 : if r, ok := readable.(*fileReadable); ok { 211 1 : // See fileReadable.NewReadHandle. 212 1 : rh.vfsReadHandle = vfsReadHandle{r: r} 213 1 : return rh 214 1 : } 215 1 : return readable.NewReadHandle(ctx) 216 : }