LCOV - code coverage report
Current view: top level - pebble/vfs - disk_full.go (source / functions) Hit Total Coverage
Test: 2024-03-24 08:15Z 86b5f54c - tests only.lcov Lines: 166 266 62.4 %
Date: 2024-03-24 08:16:28 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package vfs
       6             : 
       7             : import (
       8             :         "io"
       9             :         "os"
      10             :         "sync"
      11             :         "sync/atomic"
      12             :         "syscall"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             : )
      16             : 
      17             : // OnDiskFull wraps the provided FS with an FS that examines returned errors,
      18             : // looking for ENOSPC errors. It invokes the provided callback when the
      19             : // underlying filesystem returns an error signifying the storage is out of
      20             : // disk space.
      21             : //
      22             : // All new writes to the filesystem are blocked while the callback executes,
      23             : // so care must be taken to avoid expensive work from within the callback.
      24             : //
      25             : // Once the callback completes, any write-oriented operations that encountered
      26             : // ENOSPC are retried exactly once. Once the callback completes, it will not
      27             : // be invoked again until a new operation that began after the callback
      28             : // returned encounters an ENOSPC error.
      29             : //
      30             : // OnDiskFull may be used to automatically manage a ballast file, which is
      31             : // removed from the filesystem from within the callback. Note that if managing
      32             : // a ballast, the caller should maintain a reference to the inner FS and
      33             : // remove the ballast on the unwrapped FS.
      34           1 : func OnDiskFull(fs FS, fn func()) FS {
      35           1 :         newFS := &enospcFS{inner: fs}
      36           1 :         newFS.mu.Cond.L = &newFS.mu.Mutex
      37           1 :         newFS.mu.onDiskFull = fn
      38           1 :         return newFS
      39           1 : }
      40             : 
      41             : type enospcFS struct {
      42             :         inner FS
      43             :         // generation is a monotonically increasing number that encodes the
      44             :         // current state of ENOSPC error handling. Incoming writes are
      45             :         // organized into generations to provide strong guarantees on when the
      46             :         // disk full callback is invoked. The callback is invoked once per
      47             :         // write generation.
      48             :         //
      49             :         // Special significance is given to the parity of this generation
      50             :         // field to optimize incoming writes in the normal state, which only
      51             :         // need to perform a single atomic load. If generation is odd, an
      52             :         // ENOSPC error is being actively handled. The generations associated
      53             :         // with writes are always even.
      54             :         //
      55             :         // The lifecycle of a write is:
      56             :         //
      57             :         // 1. Atomically load the current generation.
      58             :         //    a. If it's even, this is the write's generation number.
      59             :         //    b. If it's odd, an ENOSPC was recently encountered and the
      60             :         //       corresponding invocation of the disk full callback has not
      61             :         //       yet completed. The write must wait until the callback has
      62             :         //       completed and generation is updated to an even number, which
      63             :         //       becomes the write's generation number.
      64             :         // 2. Perform the write. If it encounters no error or an error other
      65             :         //    than ENOSPC, the write returns and proceeds no further in this
      66             :         //    lifecycle.
      67             :         // 3. Handle ENOSPC. If the write encounters ENOSPC, the callback must
      68             :         //    be invoked for the write's generation. The write's goroutine
      69             :         //    acquires the FS's mutex.
      70             :         //    a. If the FS's current generation is still equal to the write's
      71             :         //       generation, the write is the first write of its generation to
      72             :         //       encounter ENOSPC. It increments the FS's current generation
      73             :         //       to an odd number, signifying that an ENOSPC is being handled
      74             :         //       and invokes the callback.
      75             :         //    b. If the FS's current generation has changed, some other write
      76             :         //       from the same generation encountered an ENOSPC first. This
      77             :         //       write waits on the condition variable until the FS's current
      78             :         //       generation is updated indicating that the generation's
      79             :         //       callback invocation has completed.
      80             :         // 3. Retry the write once. The callback for the write's generation
      81             :         //    has completed, either by this write's goroutine or another's.
      82             :         //    The write may proceed with the expectation that the callback
      83             :         //    remedied the full disk by freeing up disk space and an ENOSPC
      84             :         //    should not be encountered again for at least a few minutes. If
      85             :         //    we do encounter another ENOSPC on the retry, the callback was
      86             :         //    unable to remedy the full disk and another retry won't be
      87             :         //    useful. Any error, including ENOSPC, during the retry is
      88             :         //    returned without further handling.  None of the retries invoke
      89             :         //    the callback.
      90             :         //
      91             :         // This scheme has a few nice properties:
      92             :         // * Once the disk-full callback completes, it won't be invoked
      93             :         //   again unless a write that started strictly later encounters an
      94             :         //   ENOSPC. This is convenient if the callback strives to 'fix' the
      95             :         //   full disk, for example, by removing a ballast file. A new
      96             :         //   invocation of the callback guarantees a new problem.
      97             :         // * Incoming writes block if there's an unhandled ENOSPC. Some
      98             :         //   writes, like WAL or MANIFEST fsyncs, are fatal if they encounter
      99             :         //   an ENOSPC.
     100             :         generation atomic.Uint32
     101             :         mu         struct {
     102             :                 sync.Mutex
     103             :                 sync.Cond
     104             :                 onDiskFull func()
     105             :         }
     106             : }
     107             : 
     108             : // Unwrap returns the underlying FS. This may be called by vfs.Root to access
     109             : // the underlying filesystem.
     110           0 : func (fs *enospcFS) Unwrap() FS {
     111           0 :         return fs.inner
     112           0 : }
     113             : 
     114             : // waitUntilReady is called before every FS or File operation that
     115             : // might return ENOSPC. If an ENOSPC was encountered and the corresponding
     116             : // invocation of the `onDiskFull` callback has not yet returned,
     117             : // waitUntilReady blocks until the callback returns. The returned generation
     118             : // is always even.
     119           1 : func (fs *enospcFS) waitUntilReady() uint32 {
     120           1 :         gen := fs.generation.Load()
     121           1 :         if gen%2 == 0 {
     122           1 :                 // An even generation indicates that we're not currently handling an
     123           1 :                 // ENOSPC. Allow the write to proceed.
     124           1 :                 return gen
     125           1 :         }
     126             : 
     127             :         // We're currently handling an ENOSPC error. Wait on the condition
     128             :         // variable until we're not handling an ENOSPC.
     129           0 :         fs.mu.Lock()
     130           0 :         defer fs.mu.Unlock()
     131           0 : 
     132           0 :         // Load the generation again with fs.mu locked.
     133           0 :         gen = fs.generation.Load()
     134           0 :         for gen%2 == 1 {
     135           0 :                 fs.mu.Wait()
     136           0 :                 gen = fs.generation.Load()
     137           0 :         }
     138           0 :         return gen
     139             : }
     140             : 
     141           1 : func (fs *enospcFS) handleENOSPC(gen uint32) {
     142           1 :         fs.mu.Lock()
     143           1 :         defer fs.mu.Unlock()
     144           1 : 
     145           1 :         currentGeneration := fs.generation.Load()
     146           1 : 
     147           1 :         // If the current generation is still `gen`, this is the first goroutine
     148           1 :         // to hit an ENOSPC within this write generation, so this goroutine is
     149           1 :         // responsible for invoking the callback.
     150           1 :         if currentGeneration == gen {
     151           1 :                 // Increment the generation to an odd number, indicating that the FS
     152           1 :                 // is out-of-disk space and incoming writes should pause and wait for
     153           1 :                 // the next generation before continuing.
     154           1 :                 fs.generation.Store(gen + 1)
     155           1 : 
     156           1 :                 func() {
     157           1 :                         // Drop the mutex while we invoke the callback, re-acquiring
     158           1 :                         // afterwards.
     159           1 :                         fs.mu.Unlock()
     160           1 :                         defer fs.mu.Lock()
     161           1 :                         fs.mu.onDiskFull()
     162           1 :                 }()
     163             : 
     164             :                 // Update the current generation again to an even number, indicating
     165             :                 // that the callback has completed for the write generation `gen`.
     166           1 :                 fs.generation.Store(gen + 2)
     167           1 :                 fs.mu.Broadcast()
     168           1 :                 return
     169             :         }
     170             : 
     171             :         // The current generation has already been incremented, so either the
     172             :         // callback is currently being run by another goroutine or it's already
     173             :         // completed. Wait for it complete if it hasn't already.
     174             :         //
     175             :         // The current generation may be updated multiple times, including to an
     176             :         // odd number signifying a later write generation has already encountered
     177             :         // ENOSPC. In that case, the callback was not able to remedy the full disk
     178             :         // and waiting is unlikely to be helpful.  Continuing to wait risks
     179             :         // blocking an unbounded number of generations.  Retrying and bubbling the
     180             :         // ENOSPC up might be helpful if we can abort a large compaction that
     181             :         // started before we became more selective about compaction picking, so
     182             :         // this loop only waits for this write generation's callback and no
     183             :         // subsequent generations' callbacks.
     184           1 :         for currentGeneration == gen+1 {
     185           0 :                 fs.mu.Wait()
     186           0 :                 currentGeneration = fs.generation.Load()
     187           0 :         }
     188             : }
     189             : 
     190           1 : func (fs *enospcFS) Create(name string) (File, error) {
     191           1 :         gen := fs.waitUntilReady()
     192           1 : 
     193           1 :         f, err := fs.inner.Create(name)
     194           1 : 
     195           1 :         if err != nil && isENOSPC(err) {
     196           1 :                 fs.handleENOSPC(gen)
     197           1 :                 f, err = fs.inner.Create(name)
     198           1 :         }
     199           1 :         if f != nil {
     200           1 :                 f = &enospcFile{
     201           1 :                         fs:    fs,
     202           1 :                         inner: f,
     203           1 :                 }
     204           1 :         }
     205           1 :         return f, err
     206             : }
     207             : 
     208           1 : func (fs *enospcFS) Link(oldname, newname string) error {
     209           1 :         gen := fs.waitUntilReady()
     210           1 : 
     211           1 :         err := fs.inner.Link(oldname, newname)
     212           1 : 
     213           1 :         if err != nil && isENOSPC(err) {
     214           1 :                 fs.handleENOSPC(gen)
     215           1 :                 err = fs.inner.Link(oldname, newname)
     216           1 :         }
     217           1 :         return err
     218             : }
     219             : 
     220           0 : func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) {
     221           0 :         f, err := fs.inner.Open(name, opts...)
     222           0 :         if f != nil {
     223           0 :                 f = &enospcFile{
     224           0 :                         fs:    fs,
     225           0 :                         inner: f,
     226           0 :                 }
     227           0 :         }
     228           0 :         return f, err
     229             : }
     230             : 
     231           0 : func (fs *enospcFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) {
     232           0 :         f, err := fs.inner.OpenReadWrite(name, opts...)
     233           0 :         if f != nil {
     234           0 :                 f = &enospcFile{
     235           0 :                         fs:    fs,
     236           0 :                         inner: f,
     237           0 :                 }
     238           0 :         }
     239           0 :         return f, err
     240             : }
     241             : 
     242           0 : func (fs *enospcFS) OpenDir(name string) (File, error) {
     243           0 :         f, err := fs.inner.OpenDir(name)
     244           0 :         if f != nil {
     245           0 :                 f = &enospcFile{
     246           0 :                         fs:    fs,
     247           0 :                         inner: f,
     248           0 :                 }
     249           0 :         }
     250           0 :         return f, err
     251             : }
     252             : 
     253           1 : func (fs *enospcFS) Remove(name string) error {
     254           1 :         gen := fs.waitUntilReady()
     255           1 : 
     256           1 :         err := fs.inner.Remove(name)
     257           1 : 
     258           1 :         if err != nil && isENOSPC(err) {
     259           1 :                 fs.handleENOSPC(gen)
     260           1 :                 err = fs.inner.Remove(name)
     261           1 :         }
     262           1 :         return err
     263             : }
     264             : 
     265           1 : func (fs *enospcFS) RemoveAll(name string) error {
     266           1 :         gen := fs.waitUntilReady()
     267           1 : 
     268           1 :         err := fs.inner.RemoveAll(name)
     269           1 : 
     270           1 :         if err != nil && isENOSPC(err) {
     271           1 :                 fs.handleENOSPC(gen)
     272           1 :                 err = fs.inner.RemoveAll(name)
     273           1 :         }
     274           1 :         return err
     275             : }
     276             : 
     277           1 : func (fs *enospcFS) Rename(oldname, newname string) error {
     278           1 :         gen := fs.waitUntilReady()
     279           1 : 
     280           1 :         err := fs.inner.Rename(oldname, newname)
     281           1 : 
     282           1 :         if err != nil && isENOSPC(err) {
     283           1 :                 fs.handleENOSPC(gen)
     284           1 :                 err = fs.inner.Rename(oldname, newname)
     285           1 :         }
     286           1 :         return err
     287             : }
     288             : 
     289           1 : func (fs *enospcFS) ReuseForWrite(oldname, newname string) (File, error) {
     290           1 :         gen := fs.waitUntilReady()
     291           1 : 
     292           1 :         f, err := fs.inner.ReuseForWrite(oldname, newname)
     293           1 : 
     294           1 :         if err != nil && isENOSPC(err) {
     295           1 :                 fs.handleENOSPC(gen)
     296           1 :                 f, err = fs.inner.ReuseForWrite(oldname, newname)
     297           1 :         }
     298             : 
     299           1 :         if f != nil {
     300           1 :                 f = &enospcFile{
     301           1 :                         fs:    fs,
     302           1 :                         inner: f,
     303           1 :                 }
     304           1 :         }
     305           1 :         return f, err
     306             : }
     307             : 
     308           1 : func (fs *enospcFS) MkdirAll(dir string, perm os.FileMode) error {
     309           1 :         gen := fs.waitUntilReady()
     310           1 : 
     311           1 :         err := fs.inner.MkdirAll(dir, perm)
     312           1 : 
     313           1 :         if err != nil && isENOSPC(err) {
     314           1 :                 fs.handleENOSPC(gen)
     315           1 :                 err = fs.inner.MkdirAll(dir, perm)
     316           1 :         }
     317           1 :         return err
     318             : }
     319             : 
     320           1 : func (fs *enospcFS) Lock(name string) (io.Closer, error) {
     321           1 :         gen := fs.waitUntilReady()
     322           1 : 
     323           1 :         closer, err := fs.inner.Lock(name)
     324           1 : 
     325           1 :         if err != nil && isENOSPC(err) {
     326           1 :                 fs.handleENOSPC(gen)
     327           1 :                 closer, err = fs.inner.Lock(name)
     328           1 :         }
     329           1 :         return closer, err
     330             : }
     331             : 
     332           0 : func (fs *enospcFS) List(dir string) ([]string, error) {
     333           0 :         return fs.inner.List(dir)
     334           0 : }
     335             : 
     336           0 : func (fs *enospcFS) Stat(name string) (os.FileInfo, error) {
     337           0 :         return fs.inner.Stat(name)
     338           0 : }
     339             : 
     340           0 : func (fs *enospcFS) PathBase(path string) string {
     341           0 :         return fs.inner.PathBase(path)
     342           0 : }
     343             : 
     344           0 : func (fs *enospcFS) PathJoin(elem ...string) string {
     345           0 :         return fs.inner.PathJoin(elem...)
     346           0 : }
     347             : 
     348           0 : func (fs *enospcFS) PathDir(path string) string {
     349           0 :         return fs.inner.PathDir(path)
     350           0 : }
     351             : 
     352           0 : func (fs *enospcFS) GetDiskUsage(path string) (DiskUsage, error) {
     353           0 :         return fs.inner.GetDiskUsage(path)
     354           0 : }
     355             : 
     356             : type enospcFile struct {
     357             :         fs    *enospcFS
     358             :         inner File
     359             : }
     360             : 
     361             : var _ File = (*enospcFile)(nil)
     362             : 
     363           0 : func (f *enospcFile) Close() error {
     364           0 :         return f.inner.Close()
     365           0 : }
     366             : 
     367           0 : func (f *enospcFile) Read(p []byte) (n int, err error) {
     368           0 :         return f.inner.Read(p)
     369           0 : }
     370             : 
     371           0 : func (f *enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
     372           0 :         return f.inner.ReadAt(p, off)
     373           0 : }
     374             : 
     375           1 : func (f *enospcFile) Write(p []byte) (n int, err error) {
     376           1 :         gen := f.fs.waitUntilReady()
     377           1 : 
     378           1 :         n, err = f.inner.Write(p)
     379           1 : 
     380           1 :         if err != nil && isENOSPC(err) {
     381           1 :                 f.fs.handleENOSPC(gen)
     382           1 :                 var n2 int
     383           1 :                 n2, err = f.inner.Write(p[n:])
     384           1 :                 n += n2
     385           1 :         }
     386           1 :         return n, err
     387             : }
     388             : 
     389           0 : func (f *enospcFile) WriteAt(p []byte, ofs int64) (n int, err error) {
     390           0 :         gen := f.fs.waitUntilReady()
     391           0 : 
     392           0 :         n, err = f.inner.WriteAt(p, ofs)
     393           0 : 
     394           0 :         if err != nil && isENOSPC(err) {
     395           0 :                 f.fs.handleENOSPC(gen)
     396           0 :                 var n2 int
     397           0 :                 n2, err = f.inner.WriteAt(p[n:], ofs+int64(n))
     398           0 :                 n += n2
     399           0 :         }
     400           0 :         return n, err
     401             : }
     402             : 
     403           0 : func (f *enospcFile) Prefetch(offset, length int64) error {
     404           0 :         return f.inner.Prefetch(offset, length)
     405           0 : }
     406             : 
     407           0 : func (f *enospcFile) Preallocate(offset, length int64) error {
     408           0 :         return f.inner.Preallocate(offset, length)
     409           0 : }
     410             : 
     411           0 : func (f *enospcFile) Stat() (os.FileInfo, error) {
     412           0 :         return f.inner.Stat()
     413           0 : }
     414             : 
     415           1 : func (f *enospcFile) Sync() error {
     416           1 :         gen := f.fs.waitUntilReady()
     417           1 : 
     418           1 :         err := f.inner.Sync()
     419           1 : 
     420           1 :         if err != nil && isENOSPC(err) {
     421           1 :                 f.fs.handleENOSPC(gen)
     422           1 : 
     423           1 :                 // NB: It is NOT safe to retry the Sync. See the PostgreSQL
     424           1 :                 // 'fsyncgate' discussion. A successful Sync after a failed one does
     425           1 :                 // not provide any guarantees and (always?) loses the unsynced writes.
     426           1 :                 // We need to bubble the error up and hope we weren't syncing a WAL or
     427           1 :                 // MANIFEST, because we'll have no choice but to crash. Errors while
     428           1 :                 // syncing an sstable will result in a failed flush/compaction, and
     429           1 :                 // the relevant sstable(s) will be marked as obsolete and deleted.
     430           1 :                 // See: https://lwn.net/Articles/752063/
     431           1 :         }
     432           1 :         return err
     433             : }
     434             : 
     435           0 : func (f *enospcFile) SyncData() error {
     436           0 :         return f.inner.SyncData()
     437           0 : }
     438             : 
     439           0 : func (f *enospcFile) SyncTo(length int64) (fullSync bool, err error) {
     440           0 :         return f.inner.SyncTo(length)
     441           0 : }
     442             : 
     443           0 : func (f *enospcFile) Fd() uintptr {
     444           0 :         return f.inner.Fd()
     445           0 : }
     446             : 
     447             : var _ FS = (*enospcFS)(nil)
     448             : 
     449           1 : func isENOSPC(err error) bool {
     450           1 :         err = errors.UnwrapAll(err)
     451           1 :         e, ok := err.(syscall.Errno)
     452           1 :         return ok && e == syscall.ENOSPC
     453           1 : }

Generated by: LCOV version 1.14