LCOV - code coverage report
Current view: top level - pebble/vfs/vfs - disk_full.go (source / functions) Coverage Total Hit
Test: 2025-09-15 08:18Z 77ef79ed - tests + meta.lcov Lines: 62.4 % 266 166
Test Date: 2025-09-15 08:21:14 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package vfs
       6              : 
       7              : import (
       8              :         "io"
       9              :         "os"
      10              :         "sync"
      11              :         "sync/atomic"
      12              :         "syscall"
      13              : 
      14              :         "github.com/cockroachdb/errors"
      15              : )
      16              : 
      17              : // OnDiskFull wraps the provided FS with an FS that examines returned errors,
      18              : // looking for ENOSPC errors. It invokes the provided callback when the
      19              : // underlying filesystem returns an error signifying the storage is out of
      20              : // disk space.
      21              : //
      22              : // All new writes to the filesystem are blocked while the callback executes,
      23              : // so care must be taken to avoid expensive work from within the callback.
      24              : //
      25              : // Once the callback completes, any write-oriented operations that encountered
      26              : // ENOSPC are retried exactly once. Once the callback completes, it will not
      27              : // be invoked again until a new operation that began after the callback
      28              : // returned encounters an ENOSPC error.
      29              : //
      30              : // OnDiskFull may be used to automatically manage a ballast file, which is
      31              : // removed from the filesystem from within the callback. Note that if managing
      32              : // a ballast, the caller should maintain a reference to the inner FS and
      33              : // remove the ballast on the unwrapped FS.
      34            1 : func OnDiskFull(fs FS, fn func()) FS {
      35            1 :         newFS := &enospcFS{inner: fs}
      36            1 :         newFS.mu.Cond.L = &newFS.mu.Mutex
      37            1 :         newFS.mu.onDiskFull = fn
      38            1 :         return newFS
      39            1 : }
      40              : 
      41              : type enospcFS struct {
      42              :         inner FS
      43              :         // generation is a monotonically increasing number that encodes the
      44              :         // current state of ENOSPC error handling. Incoming writes are
      45              :         // organized into generations to provide strong guarantees on when the
      46              :         // disk full callback is invoked. The callback is invoked once per
      47              :         // write generation.
      48              :         //
      49              :         // Special significance is given to the parity of this generation
      50              :         // field to optimize incoming writes in the normal state, which only
      51              :         // need to perform a single atomic load. If generation is odd, an
      52              :         // ENOSPC error is being actively handled. The generations associated
      53              :         // with writes are always even.
      54              :         //
      55              :         // The lifecycle of a write is:
      56              :         //
      57              :         // 1. Atomically load the current generation.
      58              :         //    a. If it's even, this is the write's generation number.
      59              :         //    b. If it's odd, an ENOSPC was recently encountered and the
      60              :         //       corresponding invocation of the disk full callback has not
      61              :         //       yet completed. The write must wait until the callback has
      62              :         //       completed and generation is updated to an even number, which
      63              :         //       becomes the write's generation number.
      64              :         // 2. Perform the write. If it encounters no error or an error other
      65              :         //    than ENOSPC, the write returns and proceeds no further in this
      66              :         //    lifecycle.
      67              :         // 3. Handle ENOSPC. If the write encounters ENOSPC, the callback must
      68              :         //    be invoked for the write's generation. The write's goroutine
      69              :         //    acquires the FS's mutex.
      70              :         //    a. If the FS's current generation is still equal to the write's
      71              :         //       generation, the write is the first write of its generation to
      72              :         //       encounter ENOSPC. It increments the FS's current generation
      73              :         //       to an odd number, signifying that an ENOSPC is being handled
      74              :         //       and invokes the callback.
      75              :         //    b. If the FS's current generation has changed, some other write
      76              :         //       from the same generation encountered an ENOSPC first. This
      77              :         //       write waits on the condition variable until the FS's current
      78              :         //       generation is updated indicating that the generation's
      79              :         //       callback invocation has completed.
      80              :         // 3. Retry the write once. The callback for the write's generation
      81              :         //    has completed, either by this write's goroutine or another's.
      82              :         //    The write may proceed with the expectation that the callback
      83              :         //    remedied the full disk by freeing up disk space and an ENOSPC
      84              :         //    should not be encountered again for at least a few minutes. If
      85              :         //    we do encounter another ENOSPC on the retry, the callback was
      86              :         //    unable to remedy the full disk and another retry won't be
      87              :         //    useful. Any error, including ENOSPC, during the retry is
      88              :         //    returned without further handling.  None of the retries invoke
      89              :         //    the callback.
      90              :         //
      91              :         // This scheme has a few nice properties:
      92              :         // * Once the disk-full callback completes, it won't be invoked
      93              :         //   again unless a write that started strictly later encounters an
      94              :         //   ENOSPC. This is convenient if the callback strives to 'fix' the
      95              :         //   full disk, for example, by removing a ballast file. A new
      96              :         //   invocation of the callback guarantees a new problem.
      97              :         // * Incoming writes block if there's an unhandled ENOSPC. Some
      98              :         //   writes, like WAL or MANIFEST fsyncs, are fatal if they encounter
      99              :         //   an ENOSPC.
     100              :         generation atomic.Uint32
     101              :         mu         struct {
     102              :                 sync.Mutex
     103              :                 sync.Cond
     104              :                 onDiskFull func()
     105              :         }
     106              : }
     107              : 
     108              : // Unwrap returns the underlying FS. This may be called by vfs.Root to access
     109              : // the underlying filesystem.
     110            0 : func (fs *enospcFS) Unwrap() FS {
     111            0 :         return fs.inner
     112            0 : }
     113              : 
     114              : // waitUntilReady is called before every FS or File operation that
     115              : // might return ENOSPC. If an ENOSPC was encountered and the corresponding
     116              : // invocation of the `onDiskFull` callback has not yet returned,
     117              : // waitUntilReady blocks until the callback returns. The returned generation
     118              : // is always even.
     119            1 : func (fs *enospcFS) waitUntilReady() uint32 {
     120            1 :         gen := fs.generation.Load()
     121            1 :         if gen%2 == 0 {
     122            1 :                 // An even generation indicates that we're not currently handling an
     123            1 :                 // ENOSPC. Allow the write to proceed.
     124            1 :                 return gen
     125            1 :         }
     126              : 
     127              :         // We're currently handling an ENOSPC error. Wait on the condition
     128              :         // variable until we're not handling an ENOSPC.
     129            0 :         fs.mu.Lock()
     130            0 :         defer fs.mu.Unlock()
     131            0 : 
     132            0 :         // Load the generation again with fs.mu locked.
     133            0 :         gen = fs.generation.Load()
     134            0 :         for gen%2 == 1 {
     135            0 :                 fs.mu.Wait()
     136            0 :                 gen = fs.generation.Load()
     137            0 :         }
     138            0 :         return gen
     139              : }
     140              : 
     141            1 : func (fs *enospcFS) handleENOSPC(gen uint32) {
     142            1 :         fs.mu.Lock()
     143            1 :         defer fs.mu.Unlock()
     144            1 : 
     145            1 :         currentGeneration := fs.generation.Load()
     146            1 : 
     147            1 :         // If the current generation is still `gen`, this is the first goroutine
     148            1 :         // to hit an ENOSPC within this write generation, so this goroutine is
     149            1 :         // responsible for invoking the callback.
     150            1 :         if currentGeneration == gen {
     151            1 :                 // Increment the generation to an odd number, indicating that the FS
     152            1 :                 // is out-of-disk space and incoming writes should pause and wait for
     153            1 :                 // the next generation before continuing.
     154            1 :                 fs.generation.Store(gen + 1)
     155            1 : 
     156            1 :                 func() {
     157            1 :                         // Drop the mutex while we invoke the callback, re-acquiring
     158            1 :                         // afterwards.
     159            1 :                         fs.mu.Unlock()
     160            1 :                         defer fs.mu.Lock()
     161            1 :                         fs.mu.onDiskFull()
     162            1 :                 }()
     163              : 
     164              :                 // Update the current generation again to an even number, indicating
     165              :                 // that the callback has completed for the write generation `gen`.
     166            1 :                 fs.generation.Store(gen + 2)
     167            1 :                 fs.mu.Broadcast()
     168            1 :                 return
     169              :         }
     170              : 
     171              :         // The current generation has already been incremented, so either the
     172              :         // callback is currently being run by another goroutine or it's already
     173              :         // completed. Wait for it complete if it hasn't already.
     174              :         //
     175              :         // The current generation may be updated multiple times, including to an
     176              :         // odd number signifying a later write generation has already encountered
     177              :         // ENOSPC. In that case, the callback was not able to remedy the full disk
     178              :         // and waiting is unlikely to be helpful.  Continuing to wait risks
     179              :         // blocking an unbounded number of generations.  Retrying and bubbling the
     180              :         // ENOSPC up might be helpful if we can abort a large compaction that
     181              :         // started before we became more selective about compaction picking, so
     182              :         // this loop only waits for this write generation's callback and no
     183              :         // subsequent generations' callbacks.
     184            1 :         for currentGeneration == gen+1 {
     185            0 :                 fs.mu.Wait()
     186            0 :                 currentGeneration = fs.generation.Load()
     187            0 :         }
     188              : }
     189              : 
     190            1 : func (fs *enospcFS) Create(name string, category DiskWriteCategory) (File, error) {
     191            1 :         gen := fs.waitUntilReady()
     192            1 : 
     193            1 :         f, err := fs.inner.Create(name, category)
     194            1 : 
     195            1 :         if err != nil && isENOSPC(err) {
     196            1 :                 fs.handleENOSPC(gen)
     197            1 :                 f, err = fs.inner.Create(name, category)
     198            1 :         }
     199            1 :         if f != nil {
     200            1 :                 f = &enospcFile{
     201            1 :                         fs:    fs,
     202            1 :                         inner: f,
     203            1 :                 }
     204            1 :         }
     205            1 :         return f, err
     206              : }
     207              : 
     208            1 : func (fs *enospcFS) Link(oldname, newname string) error {
     209            1 :         gen := fs.waitUntilReady()
     210            1 : 
     211            1 :         err := fs.inner.Link(oldname, newname)
     212            1 : 
     213            1 :         if err != nil && isENOSPC(err) {
     214            1 :                 fs.handleENOSPC(gen)
     215            1 :                 err = fs.inner.Link(oldname, newname)
     216            1 :         }
     217            1 :         return err
     218              : }
     219              : 
     220            0 : func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) {
     221            0 :         f, err := fs.inner.Open(name, opts...)
     222            0 :         if f != nil {
     223            0 :                 f = &enospcFile{
     224            0 :                         fs:    fs,
     225            0 :                         inner: f,
     226            0 :                 }
     227            0 :         }
     228            0 :         return f, err
     229              : }
     230              : 
     231              : func (fs *enospcFS) OpenReadWrite(
     232              :         name string, category DiskWriteCategory, opts ...OpenOption,
     233            0 : ) (File, error) {
     234            0 :         f, err := fs.inner.OpenReadWrite(name, category, opts...)
     235            0 :         if f != nil {
     236            0 :                 f = &enospcFile{
     237            0 :                         fs:    fs,
     238            0 :                         inner: f,
     239            0 :                 }
     240            0 :         }
     241            0 :         return f, err
     242              : }
     243              : 
     244            0 : func (fs *enospcFS) OpenDir(name string) (File, error) {
     245            0 :         f, err := fs.inner.OpenDir(name)
     246            0 :         if f != nil {
     247            0 :                 f = &enospcFile{
     248            0 :                         fs:    fs,
     249            0 :                         inner: f,
     250            0 :                 }
     251            0 :         }
     252            0 :         return f, err
     253              : }
     254              : 
     255            1 : func (fs *enospcFS) Remove(name string) error {
     256            1 :         gen := fs.waitUntilReady()
     257            1 : 
     258            1 :         err := fs.inner.Remove(name)
     259            1 : 
     260            1 :         if err != nil && isENOSPC(err) {
     261            1 :                 fs.handleENOSPC(gen)
     262            1 :                 err = fs.inner.Remove(name)
     263            1 :         }
     264            1 :         return err
     265              : }
     266              : 
     267            1 : func (fs *enospcFS) RemoveAll(name string) error {
     268            1 :         gen := fs.waitUntilReady()
     269            1 : 
     270            1 :         err := fs.inner.RemoveAll(name)
     271            1 : 
     272            1 :         if err != nil && isENOSPC(err) {
     273            1 :                 fs.handleENOSPC(gen)
     274            1 :                 err = fs.inner.RemoveAll(name)
     275            1 :         }
     276            1 :         return err
     277              : }
     278              : 
     279            1 : func (fs *enospcFS) Rename(oldname, newname string) error {
     280            1 :         gen := fs.waitUntilReady()
     281            1 : 
     282            1 :         err := fs.inner.Rename(oldname, newname)
     283            1 : 
     284            1 :         if err != nil && isENOSPC(err) {
     285            1 :                 fs.handleENOSPC(gen)
     286            1 :                 err = fs.inner.Rename(oldname, newname)
     287            1 :         }
     288            1 :         return err
     289              : }
     290              : 
     291              : func (fs *enospcFS) ReuseForWrite(
     292              :         oldname, newname string, category DiskWriteCategory,
     293            1 : ) (File, error) {
     294            1 :         gen := fs.waitUntilReady()
     295            1 : 
     296            1 :         f, err := fs.inner.ReuseForWrite(oldname, newname, category)
     297            1 : 
     298            1 :         if err != nil && isENOSPC(err) {
     299            1 :                 fs.handleENOSPC(gen)
     300            1 :                 f, err = fs.inner.ReuseForWrite(oldname, newname, category)
     301            1 :         }
     302              : 
     303            1 :         if f != nil {
     304            1 :                 f = &enospcFile{
     305            1 :                         fs:    fs,
     306            1 :                         inner: f,
     307            1 :                 }
     308            1 :         }
     309            1 :         return f, err
     310              : }
     311              : 
     312            1 : func (fs *enospcFS) MkdirAll(dir string, perm os.FileMode) error {
     313            1 :         gen := fs.waitUntilReady()
     314            1 : 
     315            1 :         err := fs.inner.MkdirAll(dir, perm)
     316            1 : 
     317            1 :         if err != nil && isENOSPC(err) {
     318            1 :                 fs.handleENOSPC(gen)
     319            1 :                 err = fs.inner.MkdirAll(dir, perm)
     320            1 :         }
     321            1 :         return err
     322              : }
     323              : 
     324            1 : func (fs *enospcFS) Lock(name string) (io.Closer, error) {
     325            1 :         gen := fs.waitUntilReady()
     326            1 : 
     327            1 :         closer, err := fs.inner.Lock(name)
     328            1 : 
     329            1 :         if err != nil && isENOSPC(err) {
     330            1 :                 fs.handleENOSPC(gen)
     331            1 :                 closer, err = fs.inner.Lock(name)
     332            1 :         }
     333            1 :         return closer, err
     334              : }
     335              : 
     336            0 : func (fs *enospcFS) List(dir string) ([]string, error) {
     337            0 :         return fs.inner.List(dir)
     338            0 : }
     339              : 
     340            0 : func (fs *enospcFS) Stat(name string) (FileInfo, error) {
     341            0 :         return fs.inner.Stat(name)
     342            0 : }
     343              : 
     344            0 : func (fs *enospcFS) PathBase(path string) string {
     345            0 :         return fs.inner.PathBase(path)
     346            0 : }
     347              : 
     348            0 : func (fs *enospcFS) PathJoin(elem ...string) string {
     349            0 :         return fs.inner.PathJoin(elem...)
     350            0 : }
     351              : 
     352            0 : func (fs *enospcFS) PathDir(path string) string {
     353            0 :         return fs.inner.PathDir(path)
     354            0 : }
     355              : 
     356            0 : func (fs *enospcFS) GetDiskUsage(path string) (DiskUsage, error) {
     357            0 :         return fs.inner.GetDiskUsage(path)
     358            0 : }
     359              : 
     360              : type enospcFile struct {
     361              :         fs    *enospcFS
     362              :         inner File
     363              : }
     364              : 
     365              : var _ File = (*enospcFile)(nil)
     366              : 
     367            0 : func (f *enospcFile) Close() error {
     368            0 :         return f.inner.Close()
     369            0 : }
     370              : 
     371            0 : func (f *enospcFile) Read(p []byte) (n int, err error) {
     372            0 :         return f.inner.Read(p)
     373            0 : }
     374              : 
     375            0 : func (f *enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
     376            0 :         return f.inner.ReadAt(p, off)
     377            0 : }
     378              : 
     379            1 : func (f *enospcFile) Write(p []byte) (n int, err error) {
     380            1 :         gen := f.fs.waitUntilReady()
     381            1 : 
     382            1 :         n, err = f.inner.Write(p)
     383            1 : 
     384            1 :         if err != nil && isENOSPC(err) {
     385            1 :                 f.fs.handleENOSPC(gen)
     386            1 :                 var n2 int
     387            1 :                 n2, err = f.inner.Write(p[n:])
     388            1 :                 n += n2
     389            1 :         }
     390            1 :         return n, err
     391              : }
     392              : 
     393            0 : func (f *enospcFile) WriteAt(p []byte, ofs int64) (n int, err error) {
     394            0 :         gen := f.fs.waitUntilReady()
     395            0 : 
     396            0 :         n, err = f.inner.WriteAt(p, ofs)
     397            0 : 
     398            0 :         if err != nil && isENOSPC(err) {
     399            0 :                 f.fs.handleENOSPC(gen)
     400            0 :                 var n2 int
     401            0 :                 n2, err = f.inner.WriteAt(p[n:], ofs+int64(n))
     402            0 :                 n += n2
     403            0 :         }
     404            0 :         return n, err
     405              : }
     406              : 
     407            0 : func (f *enospcFile) Prefetch(offset, length int64) error {
     408            0 :         return f.inner.Prefetch(offset, length)
     409            0 : }
     410              : 
     411            0 : func (f *enospcFile) Preallocate(offset, length int64) error {
     412            0 :         return f.inner.Preallocate(offset, length)
     413            0 : }
     414              : 
     415            0 : func (f *enospcFile) Stat() (FileInfo, error) {
     416            0 :         return f.inner.Stat()
     417            0 : }
     418              : 
     419            1 : func (f *enospcFile) Sync() error {
     420            1 :         gen := f.fs.waitUntilReady()
     421            1 : 
     422            1 :         err := f.inner.Sync()
     423            1 : 
     424            1 :         if err != nil && isENOSPC(err) {
     425            1 :                 f.fs.handleENOSPC(gen)
     426            1 : 
     427            1 :                 // NB: It is NOT safe to retry the Sync. See the PostgreSQL
     428            1 :                 // 'fsyncgate' discussion. A successful Sync after a failed one does
     429            1 :                 // not provide any guarantees and (always?) loses the unsynced writes.
     430            1 :                 // We need to bubble the error up and hope we weren't syncing a WAL or
     431            1 :                 // MANIFEST, because we'll have no choice but to crash. Errors while
     432            1 :                 // syncing an sstable will result in a failed flush/compaction, and
     433            1 :                 // the relevant sstable(s) will be marked as obsolete and deleted.
     434            1 :                 // See: https://lwn.net/Articles/752063/
     435            1 :         }
     436            1 :         return err
     437              : }
     438              : 
     439            0 : func (f *enospcFile) SyncData() error {
     440            0 :         return f.inner.SyncData()
     441            0 : }
     442              : 
     443            0 : func (f *enospcFile) SyncTo(length int64) (fullSync bool, err error) {
     444            0 :         return f.inner.SyncTo(length)
     445            0 : }
     446              : 
     447            0 : func (f *enospcFile) Fd() uintptr {
     448            0 :         return f.inner.Fd()
     449            0 : }
     450              : 
     451              : var _ FS = (*enospcFS)(nil)
     452              : 
     453            1 : func isENOSPC(err error) bool {
     454            1 :         err = errors.UnwrapAll(err)
     455            1 :         e, ok := err.(syscall.Errno)
     456            1 :         return ok && e == syscall.ENOSPC
     457            1 : }
        

Generated by: LCOV version 2.0-1