Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "os"
11 : "sync"
12 :
13 : "github.com/cockroachdb/pebble/internal/invariants"
14 : "github.com/cockroachdb/pebble/objstorage"
15 : "github.com/cockroachdb/pebble/vfs"
16 : )
17 :
18 : const fileMaxReadaheadSize = 256 * 1024 /* 256KB */
19 :
20 : // fileReadable implements objstorage.Readable on top of a vfs.File.
21 : //
22 : // The implementation might use Prealloc and might reopen the file with
23 : // SequentialReadsOption.
24 : type fileReadable struct {
25 : file vfs.File
26 : size int64
27 :
28 : readaheadConfig *ReadaheadConfig
29 :
30 : // The following fields are used to possibly open the file again using the
31 : // sequential reads option (see vfsReadHandle).
32 : filename string
33 : fs vfs.FS
34 : }
35 :
36 : var _ objstorage.Readable = (*fileReadable)(nil)
37 :
38 : func newFileReadable(
39 : file vfs.File, fs vfs.FS, readaheadConfig *ReadaheadConfig, filename string,
40 2 : ) (*fileReadable, error) {
41 2 : info, err := file.Stat()
42 2 : if err != nil {
43 1 : return nil, err
44 1 : }
45 2 : r := &fileReadable{
46 2 : file: file,
47 2 : size: info.Size(),
48 2 : filename: filename,
49 2 : fs: fs,
50 2 : readaheadConfig: readaheadConfig,
51 2 : }
52 2 : invariants.SetFinalizer(r, func(obj interface{}) {
53 2 : if obj.(*fileReadable).file != nil {
54 0 : fmt.Fprintf(os.Stderr, "Readable was not closed")
55 0 : os.Exit(1)
56 0 : }
57 : })
58 2 : return r, nil
59 : }
60 :
61 : // ReadAt is part of the objstorage.Readable interface.
62 2 : func (r *fileReadable) ReadAt(_ context.Context, p []byte, off int64) error {
63 2 : n, err := r.file.ReadAt(p, off)
64 2 : if invariants.Enabled && err == nil && n != len(p) {
65 0 : panic("short read")
66 : }
67 2 : return err
68 : }
69 :
70 : // Close is part of the objstorage.Readable interface.
71 2 : func (r *fileReadable) Close() error {
72 2 : defer func() { r.file = nil }()
73 2 : return r.file.Close()
74 : }
75 :
76 : // Size is part of the objstorage.Readable interface.
77 2 : func (r *fileReadable) Size() int64 {
78 2 : return r.size
79 2 : }
80 :
81 : // NewReadHandle is part of the objstorage.Readable interface.
82 : func (r *fileReadable) NewReadHandle(
83 : ctx context.Context, readBeforeSize objstorage.ReadBeforeSize,
84 2 : ) objstorage.ReadHandle {
85 2 : rh := readHandlePool.Get().(*vfsReadHandle)
86 2 : rh.init(r)
87 2 : return rh
88 2 : }
89 :
90 : type vfsReadHandle struct {
91 : r *fileReadable
92 : rs readaheadState
93 : readaheadMode ReadaheadMode
94 :
95 : // sequentialFile holds a file descriptor to the same underlying File,
96 : // except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of
97 : // OS-level readahead. Once this is non-nil, the other variables in
98 : // readaheadState don't matter much as we defer to OS-level readahead.
99 : sequentialFile vfs.File
100 : }
101 :
102 : var _ objstorage.ReadHandle = (*vfsReadHandle)(nil)
103 :
104 : var readHandlePool = sync.Pool{
105 2 : New: func() interface{} {
106 2 : i := &vfsReadHandle{}
107 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
108 2 : invariants.SetFinalizer(i, func(obj interface{}) {
109 2 : if obj.(*vfsReadHandle).r != nil {
110 0 : fmt.Fprintf(os.Stderr, "ReadHandle was not closed")
111 0 : os.Exit(1)
112 0 : }
113 : })
114 2 : return i
115 : },
116 : }
117 :
118 2 : func (rh *vfsReadHandle) init(r *fileReadable) {
119 2 : *rh = vfsReadHandle{
120 2 : r: r,
121 2 : rs: makeReadaheadState(fileMaxReadaheadSize),
122 2 : readaheadMode: r.readaheadConfig.Speculative(),
123 2 : }
124 2 : }
125 :
126 : // Close is part of the objstorage.ReadHandle interface.
127 2 : func (rh *vfsReadHandle) Close() error {
128 2 : var err error
129 2 : if rh.sequentialFile != nil {
130 0 : err = rh.sequentialFile.Close()
131 0 : }
132 2 : *rh = vfsReadHandle{}
133 2 : readHandlePool.Put(rh)
134 2 : return err
135 : }
136 :
137 : // ReadAt is part of the objstorage.ReadHandle interface.
138 2 : func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error {
139 2 : if rh.sequentialFile != nil {
140 2 : // Use OS-level read-ahead.
141 2 : n, err := rh.sequentialFile.ReadAt(p, offset)
142 2 : if invariants.Enabled && err == nil && n != len(p) {
143 0 : panic("short read")
144 : }
145 2 : return err
146 : }
147 2 : if rh.readaheadMode != NoReadahead {
148 2 : if readaheadSize := rh.rs.maybeReadahead(offset, int64(len(p))); readaheadSize > 0 {
149 2 : if rh.readaheadMode == FadviseSequential && readaheadSize >= fileMaxReadaheadSize {
150 1 : // We've reached the maximum readahead size. Beyond this point, rely on
151 1 : // OS-level readahead.
152 1 : rh.switchToOSReadahead()
153 2 : } else {
154 2 : _ = rh.r.file.Prefetch(offset, readaheadSize)
155 2 : }
156 : }
157 : }
158 2 : n, err := rh.r.file.ReadAt(p, offset)
159 2 : if invariants.Enabled && err == nil && n != len(p) {
160 0 : panic("short read")
161 : }
162 2 : return err
163 : }
164 :
165 : // SetupForCompaction is part of the objstorage.ReadHandle interface.
166 2 : func (rh *vfsReadHandle) SetupForCompaction() {
167 2 : rh.readaheadMode = rh.r.readaheadConfig.Informed()
168 2 : if rh.readaheadMode == FadviseSequential {
169 2 : rh.switchToOSReadahead()
170 2 : }
171 : }
172 :
173 2 : func (rh *vfsReadHandle) switchToOSReadahead() {
174 2 : if invariants.Enabled && rh.readaheadMode != FadviseSequential {
175 0 : panic("readheadMode not respected")
176 : }
177 2 : if rh.sequentialFile != nil {
178 0 : return
179 0 : }
180 :
181 : // TODO(radu): we could share the reopened file descriptor across multiple
182 : // handles.
183 2 : f, err := rh.r.fs.Open(rh.r.filename, vfs.SequentialReadsOption)
184 2 : if err == nil {
185 2 : rh.sequentialFile = f
186 2 : }
187 : }
188 :
189 : // RecordCacheHit is part of the objstorage.ReadHandle interface.
190 2 : func (rh *vfsReadHandle) RecordCacheHit(_ context.Context, offset, size int64) {
191 2 : if rh.sequentialFile != nil || rh.readaheadMode == NoReadahead {
192 2 : // Using OS-level or no readahead, so do nothing.
193 2 : return
194 2 : }
195 2 : rh.rs.recordCacheHit(offset, size)
196 : }
197 :
198 : // TestingCheckMaxReadahead returns true if the ReadHandle has switched to
199 : // OS-level read-ahead.
200 1 : func TestingCheckMaxReadahead(rh objstorage.ReadHandle) bool {
201 1 : switch rh := rh.(type) {
202 0 : case *vfsReadHandle:
203 0 : return rh.sequentialFile != nil
204 1 : case *PreallocatedReadHandle:
205 1 : return rh.sequentialFile != nil
206 0 : default:
207 0 : panic("unknown ReadHandle type")
208 : }
209 : }
210 :
211 : // PreallocatedReadHandle is used to avoid an allocation in NewReadHandle; see
212 : // UsePreallocatedReadHandle.
213 : type PreallocatedReadHandle struct {
214 : vfsReadHandle
215 : }
216 :
217 : // Close is part of the objstorage.ReadHandle interface.
218 2 : func (rh *PreallocatedReadHandle) Close() error {
219 2 : var err error
220 2 : if rh.sequentialFile != nil {
221 2 : err = rh.sequentialFile.Close()
222 2 : }
223 2 : rh.vfsReadHandle = vfsReadHandle{}
224 2 : return err
225 : }
226 :
227 : // UsePreallocatedReadHandle is equivalent to calling readable.NewReadHandle()
228 : // but uses the existing storage of a PreallocatedReadHandle when possible
229 : // (currently this happens if we are reading from a local file).
230 : // The returned handle still needs to be closed.
231 : func UsePreallocatedReadHandle(
232 : ctx context.Context,
233 : readable objstorage.Readable,
234 : readBeforeSize objstorage.ReadBeforeSize,
235 : rh *PreallocatedReadHandle,
236 2 : ) objstorage.ReadHandle {
237 2 : if r, ok := readable.(*fileReadable); ok {
238 2 : rh.init(r)
239 2 : return rh
240 2 : }
241 2 : return readable.NewReadHandle(ctx, readBeforeSize)
242 : }
|