Line data Source code
1 : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package vfs
6 :
7 : import (
8 : "io"
9 : "os"
10 : "sync"
11 : "sync/atomic"
12 : "syscall"
13 :
14 : "github.com/cockroachdb/errors"
15 : )
16 :
17 : // OnDiskFull wraps the provided FS with an FS that examines returned errors,
18 : // looking for ENOSPC errors. It invokes the provided callback when the
19 : // underlying filesystem returns an error signifying the storage is out of
20 : // disk space.
21 : //
22 : // All new writes to the filesystem are blocked while the callback executes,
23 : // so care must be taken to avoid expensive work from within the callback.
24 : //
25 : // Once the callback completes, any write-oriented operations that encountered
26 : // ENOSPC are retried exactly once. Once the callback completes, it will not
27 : // be invoked again until a new operation that began after the callback
28 : // returned encounters an ENOSPC error.
29 : //
30 : // OnDiskFull may be used to automatically manage a ballast file, which is
31 : // removed from the filesystem from within the callback. Note that if managing
32 : // a ballast, the caller should maintain a reference to the inner FS and
33 : // remove the ballast on the unwrapped FS.
34 1 : func OnDiskFull(fs FS, fn func()) FS {
35 1 : newFS := &enospcFS{inner: fs}
36 1 : newFS.mu.Cond.L = &newFS.mu.Mutex
37 1 : newFS.mu.onDiskFull = fn
38 1 : return newFS
39 1 : }
40 :
41 : type enospcFS struct {
42 : inner FS
43 : // generation is a monotonically increasing number that encodes the
44 : // current state of ENOSPC error handling. Incoming writes are
45 : // organized into generations to provide strong guarantees on when the
46 : // disk full callback is invoked. The callback is invoked once per
47 : // write generation.
48 : //
49 : // Special significance is given to the parity of this generation
50 : // field to optimize incoming writes in the normal state, which only
51 : // need to perform a single atomic load. If generation is odd, an
52 : // ENOSPC error is being actively handled. The generations associated
53 : // with writes are always even.
54 : //
55 : // The lifecycle of a write is:
56 : //
57 : // 1. Atomically load the current generation.
58 : // a. If it's even, this is the write's generation number.
59 : // b. If it's odd, an ENOSPC was recently encountered and the
60 : // corresponding invocation of the disk full callback has not
61 : // yet completed. The write must wait until the callback has
62 : // completed and generation is updated to an even number, which
63 : // becomes the write's generation number.
64 : // 2. Perform the write. If it encounters no error or an error other
65 : // than ENOSPC, the write returns and proceeds no further in this
66 : // lifecycle.
67 : // 3. Handle ENOSPC. If the write encounters ENOSPC, the callback must
68 : // be invoked for the write's generation. The write's goroutine
69 : // acquires the FS's mutex.
70 : // a. If the FS's current generation is still equal to the write's
71 : // generation, the write is the first write of its generation to
72 : // encounter ENOSPC. It increments the FS's current generation
73 : // to an odd number, signifying that an ENOSPC is being handled
74 : // and invokes the callback.
75 : // b. If the FS's current generation has changed, some other write
76 : // from the same generation encountered an ENOSPC first. This
77 : // write waits on the condition variable until the FS's current
78 : // generation is updated indicating that the generation's
79 : // callback invocation has completed.
80 : // 3. Retry the write once. The callback for the write's generation
81 : // has completed, either by this write's goroutine or another's.
82 : // The write may proceed with the expectation that the callback
83 : // remedied the full disk by freeing up disk space and an ENOSPC
84 : // should not be encountered again for at least a few minutes. If
85 : // we do encounter another ENOSPC on the retry, the callback was
86 : // unable to remedy the full disk and another retry won't be
87 : // useful. Any error, including ENOSPC, during the retry is
88 : // returned without further handling. None of the retries invoke
89 : // the callback.
90 : //
91 : // This scheme has a few nice properties:
92 : // * Once the disk-full callback completes, it won't be invoked
93 : // again unless a write that started strictly later encounters an
94 : // ENOSPC. This is convenient if the callback strives to 'fix' the
95 : // full disk, for example, by removing a ballast file. A new
96 : // invocation of the callback guarantees a new problem.
97 : // * Incoming writes block if there's an unhandled ENOSPC. Some
98 : // writes, like WAL or MANIFEST fsyncs, are fatal if they encounter
99 : // an ENOSPC.
100 : generation atomic.Uint32
101 : mu struct {
102 : sync.Mutex
103 : sync.Cond
104 : onDiskFull func()
105 : }
106 : }
107 :
108 : // Unwrap returns the underlying FS. This may be called by vfs.Root to access
109 : // the underlying filesystem.
110 0 : func (fs *enospcFS) Unwrap() FS {
111 0 : return fs.inner
112 0 : }
113 :
114 : // waitUntilReady is called before every FS or File operation that
115 : // might return ENOSPC. If an ENOSPC was encountered and the corresponding
116 : // invocation of the `onDiskFull` callback has not yet returned,
117 : // waitUntilReady blocks until the callback returns. The returned generation
118 : // is always even.
119 1 : func (fs *enospcFS) waitUntilReady() uint32 {
120 1 : gen := fs.generation.Load()
121 1 : if gen%2 == 0 {
122 1 : // An even generation indicates that we're not currently handling an
123 1 : // ENOSPC. Allow the write to proceed.
124 1 : return gen
125 1 : }
126 :
127 : // We're currently handling an ENOSPC error. Wait on the condition
128 : // variable until we're not handling an ENOSPC.
129 0 : fs.mu.Lock()
130 0 : defer fs.mu.Unlock()
131 0 :
132 0 : // Load the generation again with fs.mu locked.
133 0 : gen = fs.generation.Load()
134 0 : for gen%2 == 1 {
135 0 : fs.mu.Wait()
136 0 : gen = fs.generation.Load()
137 0 : }
138 0 : return gen
139 : }
140 :
141 1 : func (fs *enospcFS) handleENOSPC(gen uint32) {
142 1 : fs.mu.Lock()
143 1 : defer fs.mu.Unlock()
144 1 :
145 1 : currentGeneration := fs.generation.Load()
146 1 :
147 1 : // If the current generation is still `gen`, this is the first goroutine
148 1 : // to hit an ENOSPC within this write generation, so this goroutine is
149 1 : // responsible for invoking the callback.
150 1 : if currentGeneration == gen {
151 1 : // Increment the generation to an odd number, indicating that the FS
152 1 : // is out-of-disk space and incoming writes should pause and wait for
153 1 : // the next generation before continuing.
154 1 : fs.generation.Store(gen + 1)
155 1 :
156 1 : func() {
157 1 : // Drop the mutex while we invoke the callback, re-acquiring
158 1 : // afterwards.
159 1 : fs.mu.Unlock()
160 1 : defer fs.mu.Lock()
161 1 : fs.mu.onDiskFull()
162 1 : }()
163 :
164 : // Update the current generation again to an even number, indicating
165 : // that the callback has completed for the write generation `gen`.
166 1 : fs.generation.Store(gen + 2)
167 1 : fs.mu.Broadcast()
168 1 : return
169 : }
170 :
171 : // The current generation has already been incremented, so either the
172 : // callback is currently being run by another goroutine or it's already
173 : // completed. Wait for it complete if it hasn't already.
174 : //
175 : // The current generation may be updated multiple times, including to an
176 : // odd number signifying a later write generation has already encountered
177 : // ENOSPC. In that case, the callback was not able to remedy the full disk
178 : // and waiting is unlikely to be helpful. Continuing to wait risks
179 : // blocking an unbounded number of generations. Retrying and bubbling the
180 : // ENOSPC up might be helpful if we can abort a large compaction that
181 : // started before we became more selective about compaction picking, so
182 : // this loop only waits for this write generation's callback and no
183 : // subsequent generations' callbacks.
184 1 : for currentGeneration == gen+1 {
185 0 : fs.mu.Wait()
186 0 : currentGeneration = fs.generation.Load()
187 0 : }
188 : }
189 :
190 1 : func (fs *enospcFS) Create(name string) (File, error) {
191 1 : gen := fs.waitUntilReady()
192 1 :
193 1 : f, err := fs.inner.Create(name)
194 1 :
195 1 : if err != nil && isENOSPC(err) {
196 1 : fs.handleENOSPC(gen)
197 1 : f, err = fs.inner.Create(name)
198 1 : }
199 1 : if f != nil {
200 1 : f = &enospcFile{
201 1 : fs: fs,
202 1 : inner: f,
203 1 : }
204 1 : }
205 1 : return f, err
206 : }
207 :
208 1 : func (fs *enospcFS) Link(oldname, newname string) error {
209 1 : gen := fs.waitUntilReady()
210 1 :
211 1 : err := fs.inner.Link(oldname, newname)
212 1 :
213 1 : if err != nil && isENOSPC(err) {
214 1 : fs.handleENOSPC(gen)
215 1 : err = fs.inner.Link(oldname, newname)
216 1 : }
217 1 : return err
218 : }
219 :
220 0 : func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) {
221 0 : f, err := fs.inner.Open(name, opts...)
222 0 : if f != nil {
223 0 : f = &enospcFile{
224 0 : fs: fs,
225 0 : inner: f,
226 0 : }
227 0 : }
228 0 : return f, err
229 : }
230 :
231 0 : func (fs *enospcFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) {
232 0 : f, err := fs.inner.OpenReadWrite(name, opts...)
233 0 : if f != nil {
234 0 : f = &enospcFile{
235 0 : fs: fs,
236 0 : inner: f,
237 0 : }
238 0 : }
239 0 : return f, err
240 : }
241 :
242 0 : func (fs *enospcFS) OpenDir(name string) (File, error) {
243 0 : f, err := fs.inner.OpenDir(name)
244 0 : if f != nil {
245 0 : f = &enospcFile{
246 0 : fs: fs,
247 0 : inner: f,
248 0 : }
249 0 : }
250 0 : return f, err
251 : }
252 :
253 1 : func (fs *enospcFS) Remove(name string) error {
254 1 : gen := fs.waitUntilReady()
255 1 :
256 1 : err := fs.inner.Remove(name)
257 1 :
258 1 : if err != nil && isENOSPC(err) {
259 1 : fs.handleENOSPC(gen)
260 1 : err = fs.inner.Remove(name)
261 1 : }
262 1 : return err
263 : }
264 :
265 1 : func (fs *enospcFS) RemoveAll(name string) error {
266 1 : gen := fs.waitUntilReady()
267 1 :
268 1 : err := fs.inner.RemoveAll(name)
269 1 :
270 1 : if err != nil && isENOSPC(err) {
271 1 : fs.handleENOSPC(gen)
272 1 : err = fs.inner.RemoveAll(name)
273 1 : }
274 1 : return err
275 : }
276 :
277 1 : func (fs *enospcFS) Rename(oldname, newname string) error {
278 1 : gen := fs.waitUntilReady()
279 1 :
280 1 : err := fs.inner.Rename(oldname, newname)
281 1 :
282 1 : if err != nil && isENOSPC(err) {
283 1 : fs.handleENOSPC(gen)
284 1 : err = fs.inner.Rename(oldname, newname)
285 1 : }
286 1 : return err
287 : }
288 :
289 1 : func (fs *enospcFS) ReuseForWrite(oldname, newname string) (File, error) {
290 1 : gen := fs.waitUntilReady()
291 1 :
292 1 : f, err := fs.inner.ReuseForWrite(oldname, newname)
293 1 :
294 1 : if err != nil && isENOSPC(err) {
295 1 : fs.handleENOSPC(gen)
296 1 : f, err = fs.inner.ReuseForWrite(oldname, newname)
297 1 : }
298 :
299 1 : if f != nil {
300 1 : f = &enospcFile{
301 1 : fs: fs,
302 1 : inner: f,
303 1 : }
304 1 : }
305 1 : return f, err
306 : }
307 :
308 1 : func (fs *enospcFS) MkdirAll(dir string, perm os.FileMode) error {
309 1 : gen := fs.waitUntilReady()
310 1 :
311 1 : err := fs.inner.MkdirAll(dir, perm)
312 1 :
313 1 : if err != nil && isENOSPC(err) {
314 1 : fs.handleENOSPC(gen)
315 1 : err = fs.inner.MkdirAll(dir, perm)
316 1 : }
317 1 : return err
318 : }
319 :
320 1 : func (fs *enospcFS) Lock(name string) (io.Closer, error) {
321 1 : gen := fs.waitUntilReady()
322 1 :
323 1 : closer, err := fs.inner.Lock(name)
324 1 :
325 1 : if err != nil && isENOSPC(err) {
326 1 : fs.handleENOSPC(gen)
327 1 : closer, err = fs.inner.Lock(name)
328 1 : }
329 1 : return closer, err
330 : }
331 :
332 0 : func (fs *enospcFS) List(dir string) ([]string, error) {
333 0 : return fs.inner.List(dir)
334 0 : }
335 :
336 0 : func (fs *enospcFS) Stat(name string) (os.FileInfo, error) {
337 0 : return fs.inner.Stat(name)
338 0 : }
339 :
340 0 : func (fs *enospcFS) PathBase(path string) string {
341 0 : return fs.inner.PathBase(path)
342 0 : }
343 :
344 0 : func (fs *enospcFS) PathJoin(elem ...string) string {
345 0 : return fs.inner.PathJoin(elem...)
346 0 : }
347 :
348 0 : func (fs *enospcFS) PathDir(path string) string {
349 0 : return fs.inner.PathDir(path)
350 0 : }
351 :
352 0 : func (fs *enospcFS) GetDiskUsage(path string) (DiskUsage, error) {
353 0 : return fs.inner.GetDiskUsage(path)
354 0 : }
355 :
356 : type enospcFile struct {
357 : fs *enospcFS
358 : inner File
359 : }
360 :
361 : var _ File = (*enospcFile)(nil)
362 :
363 0 : func (f *enospcFile) Close() error {
364 0 : return f.inner.Close()
365 0 : }
366 :
367 0 : func (f *enospcFile) Read(p []byte) (n int, err error) {
368 0 : return f.inner.Read(p)
369 0 : }
370 :
371 0 : func (f *enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
372 0 : return f.inner.ReadAt(p, off)
373 0 : }
374 :
375 1 : func (f *enospcFile) Write(p []byte) (n int, err error) {
376 1 : gen := f.fs.waitUntilReady()
377 1 :
378 1 : n, err = f.inner.Write(p)
379 1 :
380 1 : if err != nil && isENOSPC(err) {
381 1 : f.fs.handleENOSPC(gen)
382 1 : var n2 int
383 1 : n2, err = f.inner.Write(p[n:])
384 1 : n += n2
385 1 : }
386 1 : return n, err
387 : }
388 :
389 0 : func (f *enospcFile) WriteAt(p []byte, ofs int64) (n int, err error) {
390 0 : gen := f.fs.waitUntilReady()
391 0 :
392 0 : n, err = f.inner.WriteAt(p, ofs)
393 0 :
394 0 : if err != nil && isENOSPC(err) {
395 0 : f.fs.handleENOSPC(gen)
396 0 : var n2 int
397 0 : n2, err = f.inner.WriteAt(p[n:], ofs+int64(n))
398 0 : n += n2
399 0 : }
400 0 : return n, err
401 : }
402 :
403 0 : func (f *enospcFile) Prefetch(offset, length int64) error {
404 0 : return f.inner.Prefetch(offset, length)
405 0 : }
406 :
407 0 : func (f *enospcFile) Preallocate(offset, length int64) error {
408 0 : return f.inner.Preallocate(offset, length)
409 0 : }
410 :
411 0 : func (f *enospcFile) Stat() (os.FileInfo, error) {
412 0 : return f.inner.Stat()
413 0 : }
414 :
415 1 : func (f *enospcFile) Sync() error {
416 1 : gen := f.fs.waitUntilReady()
417 1 :
418 1 : err := f.inner.Sync()
419 1 :
420 1 : if err != nil && isENOSPC(err) {
421 1 : f.fs.handleENOSPC(gen)
422 1 :
423 1 : // NB: It is NOT safe to retry the Sync. See the PostgreSQL
424 1 : // 'fsyncgate' discussion. A successful Sync after a failed one does
425 1 : // not provide any guarantees and (always?) loses the unsynced writes.
426 1 : // We need to bubble the error up and hope we weren't syncing a WAL or
427 1 : // MANIFEST, because we'll have no choice but to crash. Errors while
428 1 : // syncing an sstable will result in a failed flush/compaction, and
429 1 : // the relevant sstable(s) will be marked as obsolete and deleted.
430 1 : // See: https://lwn.net/Articles/752063/
431 1 : }
432 1 : return err
433 : }
434 :
435 0 : func (f *enospcFile) SyncData() error {
436 0 : return f.inner.SyncData()
437 0 : }
438 :
439 0 : func (f *enospcFile) SyncTo(length int64) (fullSync bool, err error) {
440 0 : return f.inner.SyncTo(length)
441 0 : }
442 :
443 0 : func (f *enospcFile) Fd() uintptr {
444 0 : return f.inner.Fd()
445 0 : }
446 :
447 : var _ FS = (*enospcFS)(nil)
448 :
449 1 : func isENOSPC(err error) bool {
450 1 : err = errors.UnwrapAll(err)
451 1 : e, ok := err.(syscall.Errno)
452 1 : return ok && e == syscall.ENOSPC
453 1 : }
|