Line data Source code
1 : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package vfs
6 :
7 : import (
8 : "io"
9 : "os"
10 : "sync"
11 : "sync/atomic"
12 : "syscall"
13 :
14 : "github.com/cockroachdb/errors"
15 : )
16 :
17 : // OnDiskFull wraps the provided FS with an FS that examines returned errors,
18 : // looking for ENOSPC errors. It invokes the provided callback when the
19 : // underlying filesystem returns an error signifying the storage is out of
20 : // disk space.
21 : //
22 : // All new writes to the filesystem are blocked while the callback executes,
23 : // so care must be taken to avoid expensive work from within the callback.
24 : //
25 : // Once the callback completes, any write-oriented operations that encountered
26 : // ENOSPC are retried exactly once. Once the callback completes, it will not
27 : // be invoked again until a new operation that began after the callback
28 : // returned encounters an ENOSPC error.
29 : //
30 : // OnDiskFull may be used to automatically manage a ballast file, which is
31 : // removed from the filesystem from within the callback. Note that if managing
32 : // a ballast, the caller should maintain a reference to the inner FS and
33 : // remove the ballast on the unwrapped FS.
34 1 : func OnDiskFull(fs FS, fn func()) FS {
35 1 : newFS := &enospcFS{inner: fs}
36 1 : newFS.mu.Cond.L = &newFS.mu.Mutex
37 1 : newFS.mu.onDiskFull = fn
38 1 : return newFS
39 1 : }
40 :
41 : type enospcFS struct {
42 : inner FS
43 : // generation is a monotonically increasing number that encodes the
44 : // current state of ENOSPC error handling. Incoming writes are
45 : // organized into generations to provide strong guarantees on when the
46 : // disk full callback is invoked. The callback is invoked once per
47 : // write generation.
48 : //
49 : // Special significance is given to the parity of this generation
50 : // field to optimize incoming writes in the normal state, which only
51 : // need to perform a single atomic load. If generation is odd, an
52 : // ENOSPC error is being actively handled. The generations associated
53 : // with writes are always even.
54 : //
55 : // The lifecycle of a write is:
56 : //
57 : // 1. Atomically load the current generation.
58 : // a. If it's even, this is the write's generation number.
59 : // b. If it's odd, an ENOSPC was recently encountered and the
60 : // corresponding invocation of the disk full callback has not
61 : // yet completed. The write must wait until the callback has
62 : // completed and generation is updated to an even number, which
63 : // becomes the write's generation number.
64 : // 2. Perform the write. If it encounters no error or an error other
65 : // than ENOSPC, the write returns and proceeds no further in this
66 : // lifecycle.
67 : // 3. Handle ENOSPC. If the write encounters ENOSPC, the callback must
68 : // be invoked for the write's generation. The write's goroutine
69 : // acquires the FS's mutex.
70 : // a. If the FS's current generation is still equal to the write's
71 : // generation, the write is the first write of its generation to
72 : // encounter ENOSPC. It increments the FS's current generation
73 : // to an odd number, signifying that an ENOSPC is being handled
74 : // and invokes the callback.
75 : // b. If the FS's current generation has changed, some other write
76 : // from the same generation encountered an ENOSPC first. This
77 : // write waits on the condition variable until the FS's current
78 : // generation is updated indicating that the generation's
79 : // callback invocation has completed.
80 : // 3. Retry the write once. The callback for the write's generation
81 : // has completed, either by this write's goroutine or another's.
82 : // The write may proceed with the expectation that the callback
83 : // remedied the full disk by freeing up disk space and an ENOSPC
84 : // should not be encountered again for at least a few minutes. If
85 : // we do encounter another ENOSPC on the retry, the callback was
86 : // unable to remedy the full disk and another retry won't be
87 : // useful. Any error, including ENOSPC, during the retry is
88 : // returned without further handling. None of the retries invoke
89 : // the callback.
90 : //
91 : // This scheme has a few nice properties:
92 : // * Once the disk-full callback completes, it won't be invoked
93 : // again unless a write that started strictly later encounters an
94 : // ENOSPC. This is convenient if the callback strives to 'fix' the
95 : // full disk, for example, by removing a ballast file. A new
96 : // invocation of the callback guarantees a new problem.
97 : // * Incoming writes block if there's an unhandled ENOSPC. Some
98 : // writes, like WAL or MANIFEST fsyncs, are fatal if they encounter
99 : // an ENOSPC.
100 : generation atomic.Uint32
101 : mu struct {
102 : sync.Mutex
103 : sync.Cond
104 : onDiskFull func()
105 : }
106 : }
107 :
108 : // Unwrap returns the underlying FS. This may be called by vfs.Root to access
109 : // the underlying filesystem.
110 0 : func (fs *enospcFS) Unwrap() FS {
111 0 : return fs.inner
112 0 : }
113 :
114 : // waitUntilReady is called before every FS or File operation that
115 : // might return ENOSPC. If an ENOSPC was encountered and the corresponding
116 : // invocation of the `onDiskFull` callback has not yet returned,
117 : // waitUntilReady blocks until the callback returns. The returned generation
118 : // is always even.
119 1 : func (fs *enospcFS) waitUntilReady() uint32 {
120 1 : gen := fs.generation.Load()
121 1 : if gen%2 == 0 {
122 1 : // An even generation indicates that we're not currently handling an
123 1 : // ENOSPC. Allow the write to proceed.
124 1 : return gen
125 1 : }
126 :
127 : // We're currently handling an ENOSPC error. Wait on the condition
128 : // variable until we're not handling an ENOSPC.
129 0 : fs.mu.Lock()
130 0 : defer fs.mu.Unlock()
131 0 :
132 0 : // Load the generation again with fs.mu locked.
133 0 : gen = fs.generation.Load()
134 0 : for gen%2 == 1 {
135 0 : fs.mu.Wait()
136 0 : gen = fs.generation.Load()
137 0 : }
138 0 : return gen
139 : }
140 :
141 1 : func (fs *enospcFS) handleENOSPC(gen uint32) {
142 1 : fs.mu.Lock()
143 1 : defer fs.mu.Unlock()
144 1 :
145 1 : currentGeneration := fs.generation.Load()
146 1 :
147 1 : // If the current generation is still `gen`, this is the first goroutine
148 1 : // to hit an ENOSPC within this write generation, so this goroutine is
149 1 : // responsible for invoking the callback.
150 1 : if currentGeneration == gen {
151 1 : // Increment the generation to an odd number, indicating that the FS
152 1 : // is out-of-disk space and incoming writes should pause and wait for
153 1 : // the next generation before continuing.
154 1 : fs.generation.Store(gen + 1)
155 1 :
156 1 : func() {
157 1 : // Drop the mutex while we invoke the callback, re-acquiring
158 1 : // afterwards.
159 1 : fs.mu.Unlock()
160 1 : defer fs.mu.Lock()
161 1 : fs.mu.onDiskFull()
162 1 : }()
163 :
164 : // Update the current generation again to an even number, indicating
165 : // that the callback has completed for the write generation `gen`.
166 1 : fs.generation.Store(gen + 2)
167 1 : fs.mu.Broadcast()
168 1 : return
169 : }
170 :
171 : // The current generation has already been incremented, so either the
172 : // callback is currently being run by another goroutine or it's already
173 : // completed. Wait for it complete if it hasn't already.
174 : //
175 : // The current generation may be updated multiple times, including to an
176 : // odd number signifying a later write generation has already encountered
177 : // ENOSPC. In that case, the callback was not able to remedy the full disk
178 : // and waiting is unlikely to be helpful. Continuing to wait risks
179 : // blocking an unbounded number of generations. Retrying and bubbling the
180 : // ENOSPC up might be helpful if we can abort a large compaction that
181 : // started before we became more selective about compaction picking, so
182 : // this loop only waits for this write generation's callback and no
183 : // subsequent generations' callbacks.
184 1 : for currentGeneration == gen+1 {
185 0 : fs.mu.Wait()
186 0 : currentGeneration = fs.generation.Load()
187 0 : }
188 : }
189 :
190 1 : func (fs *enospcFS) Create(name string, category DiskWriteCategory) (File, error) {
191 1 : gen := fs.waitUntilReady()
192 1 :
193 1 : f, err := fs.inner.Create(name, category)
194 1 :
195 1 : if err != nil && isENOSPC(err) {
196 1 : fs.handleENOSPC(gen)
197 1 : f, err = fs.inner.Create(name, category)
198 1 : }
199 1 : if f != nil {
200 1 : f = &enospcFile{
201 1 : fs: fs,
202 1 : inner: f,
203 1 : }
204 1 : }
205 1 : return f, err
206 : }
207 :
208 1 : func (fs *enospcFS) Link(oldname, newname string) error {
209 1 : gen := fs.waitUntilReady()
210 1 :
211 1 : err := fs.inner.Link(oldname, newname)
212 1 :
213 1 : if err != nil && isENOSPC(err) {
214 1 : fs.handleENOSPC(gen)
215 1 : err = fs.inner.Link(oldname, newname)
216 1 : }
217 1 : return err
218 : }
219 :
220 0 : func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) {
221 0 : f, err := fs.inner.Open(name, opts...)
222 0 : if f != nil {
223 0 : f = &enospcFile{
224 0 : fs: fs,
225 0 : inner: f,
226 0 : }
227 0 : }
228 0 : return f, err
229 : }
230 :
231 : func (fs *enospcFS) OpenReadWrite(
232 : name string, category DiskWriteCategory, opts ...OpenOption,
233 0 : ) (File, error) {
234 0 : f, err := fs.inner.OpenReadWrite(name, category, opts...)
235 0 : if f != nil {
236 0 : f = &enospcFile{
237 0 : fs: fs,
238 0 : inner: f,
239 0 : }
240 0 : }
241 0 : return f, err
242 : }
243 :
244 0 : func (fs *enospcFS) OpenDir(name string) (File, error) {
245 0 : f, err := fs.inner.OpenDir(name)
246 0 : if f != nil {
247 0 : f = &enospcFile{
248 0 : fs: fs,
249 0 : inner: f,
250 0 : }
251 0 : }
252 0 : return f, err
253 : }
254 :
255 1 : func (fs *enospcFS) Remove(name string) error {
256 1 : gen := fs.waitUntilReady()
257 1 :
258 1 : err := fs.inner.Remove(name)
259 1 :
260 1 : if err != nil && isENOSPC(err) {
261 1 : fs.handleENOSPC(gen)
262 1 : err = fs.inner.Remove(name)
263 1 : }
264 1 : return err
265 : }
266 :
267 1 : func (fs *enospcFS) RemoveAll(name string) error {
268 1 : gen := fs.waitUntilReady()
269 1 :
270 1 : err := fs.inner.RemoveAll(name)
271 1 :
272 1 : if err != nil && isENOSPC(err) {
273 1 : fs.handleENOSPC(gen)
274 1 : err = fs.inner.RemoveAll(name)
275 1 : }
276 1 : return err
277 : }
278 :
279 1 : func (fs *enospcFS) Rename(oldname, newname string) error {
280 1 : gen := fs.waitUntilReady()
281 1 :
282 1 : err := fs.inner.Rename(oldname, newname)
283 1 :
284 1 : if err != nil && isENOSPC(err) {
285 1 : fs.handleENOSPC(gen)
286 1 : err = fs.inner.Rename(oldname, newname)
287 1 : }
288 1 : return err
289 : }
290 :
291 : func (fs *enospcFS) ReuseForWrite(
292 : oldname, newname string, category DiskWriteCategory,
293 1 : ) (File, error) {
294 1 : gen := fs.waitUntilReady()
295 1 :
296 1 : f, err := fs.inner.ReuseForWrite(oldname, newname, category)
297 1 :
298 1 : if err != nil && isENOSPC(err) {
299 1 : fs.handleENOSPC(gen)
300 1 : f, err = fs.inner.ReuseForWrite(oldname, newname, category)
301 1 : }
302 :
303 1 : if f != nil {
304 1 : f = &enospcFile{
305 1 : fs: fs,
306 1 : inner: f,
307 1 : }
308 1 : }
309 1 : return f, err
310 : }
311 :
312 1 : func (fs *enospcFS) MkdirAll(dir string, perm os.FileMode) error {
313 1 : gen := fs.waitUntilReady()
314 1 :
315 1 : err := fs.inner.MkdirAll(dir, perm)
316 1 :
317 1 : if err != nil && isENOSPC(err) {
318 1 : fs.handleENOSPC(gen)
319 1 : err = fs.inner.MkdirAll(dir, perm)
320 1 : }
321 1 : return err
322 : }
323 :
324 1 : func (fs *enospcFS) Lock(name string) (io.Closer, error) {
325 1 : gen := fs.waitUntilReady()
326 1 :
327 1 : closer, err := fs.inner.Lock(name)
328 1 :
329 1 : if err != nil && isENOSPC(err) {
330 1 : fs.handleENOSPC(gen)
331 1 : closer, err = fs.inner.Lock(name)
332 1 : }
333 1 : return closer, err
334 : }
335 :
336 0 : func (fs *enospcFS) List(dir string) ([]string, error) {
337 0 : return fs.inner.List(dir)
338 0 : }
339 :
340 0 : func (fs *enospcFS) Stat(name string) (FileInfo, error) {
341 0 : return fs.inner.Stat(name)
342 0 : }
343 :
344 0 : func (fs *enospcFS) PathBase(path string) string {
345 0 : return fs.inner.PathBase(path)
346 0 : }
347 :
348 0 : func (fs *enospcFS) PathJoin(elem ...string) string {
349 0 : return fs.inner.PathJoin(elem...)
350 0 : }
351 :
352 0 : func (fs *enospcFS) PathDir(path string) string {
353 0 : return fs.inner.PathDir(path)
354 0 : }
355 :
356 0 : func (fs *enospcFS) GetDiskUsage(path string) (DiskUsage, error) {
357 0 : return fs.inner.GetDiskUsage(path)
358 0 : }
359 :
360 : type enospcFile struct {
361 : fs *enospcFS
362 : inner File
363 : }
364 :
365 : var _ File = (*enospcFile)(nil)
366 :
367 0 : func (f *enospcFile) Close() error {
368 0 : return f.inner.Close()
369 0 : }
370 :
371 0 : func (f *enospcFile) Read(p []byte) (n int, err error) {
372 0 : return f.inner.Read(p)
373 0 : }
374 :
375 0 : func (f *enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
376 0 : return f.inner.ReadAt(p, off)
377 0 : }
378 :
379 1 : func (f *enospcFile) Write(p []byte) (n int, err error) {
380 1 : gen := f.fs.waitUntilReady()
381 1 :
382 1 : n, err = f.inner.Write(p)
383 1 :
384 1 : if err != nil && isENOSPC(err) {
385 1 : f.fs.handleENOSPC(gen)
386 1 : var n2 int
387 1 : n2, err = f.inner.Write(p[n:])
388 1 : n += n2
389 1 : }
390 1 : return n, err
391 : }
392 :
393 0 : func (f *enospcFile) WriteAt(p []byte, ofs int64) (n int, err error) {
394 0 : gen := f.fs.waitUntilReady()
395 0 :
396 0 : n, err = f.inner.WriteAt(p, ofs)
397 0 :
398 0 : if err != nil && isENOSPC(err) {
399 0 : f.fs.handleENOSPC(gen)
400 0 : var n2 int
401 0 : n2, err = f.inner.WriteAt(p[n:], ofs+int64(n))
402 0 : n += n2
403 0 : }
404 0 : return n, err
405 : }
406 :
407 0 : func (f *enospcFile) Prefetch(offset, length int64) error {
408 0 : return f.inner.Prefetch(offset, length)
409 0 : }
410 :
411 0 : func (f *enospcFile) Preallocate(offset, length int64) error {
412 0 : return f.inner.Preallocate(offset, length)
413 0 : }
414 :
415 0 : func (f *enospcFile) Stat() (FileInfo, error) {
416 0 : return f.inner.Stat()
417 0 : }
418 :
419 1 : func (f *enospcFile) Sync() error {
420 1 : gen := f.fs.waitUntilReady()
421 1 :
422 1 : err := f.inner.Sync()
423 1 :
424 1 : if err != nil && isENOSPC(err) {
425 1 : f.fs.handleENOSPC(gen)
426 1 :
427 1 : // NB: It is NOT safe to retry the Sync. See the PostgreSQL
428 1 : // 'fsyncgate' discussion. A successful Sync after a failed one does
429 1 : // not provide any guarantees and (always?) loses the unsynced writes.
430 1 : // We need to bubble the error up and hope we weren't syncing a WAL or
431 1 : // MANIFEST, because we'll have no choice but to crash. Errors while
432 1 : // syncing an sstable will result in a failed flush/compaction, and
433 1 : // the relevant sstable(s) will be marked as obsolete and deleted.
434 1 : // See: https://lwn.net/Articles/752063/
435 1 : }
436 1 : return err
437 : }
438 :
439 0 : func (f *enospcFile) SyncData() error {
440 0 : return f.inner.SyncData()
441 0 : }
442 :
443 0 : func (f *enospcFile) SyncTo(length int64) (fullSync bool, err error) {
444 0 : return f.inner.SyncTo(length)
445 0 : }
446 :
447 0 : func (f *enospcFile) Fd() uintptr {
448 0 : return f.inner.Fd()
449 0 : }
450 :
451 : var _ FS = (*enospcFS)(nil)
452 :
453 1 : func isENOSPC(err error) bool {
454 1 : err = errors.UnwrapAll(err)
455 1 : e, ok := err.(syscall.Errno)
456 1 : return ok && e == syscall.ENOSPC
457 1 : }
|