LCOV - code coverage report
Current view: top level - pebble/vfs - disk_full.go (source / functions) Hit Total Coverage
Test: 2024-02-26 08:16Z 0b946194 - meta test only.lcov Lines: 0 266 0.0 %
Date: 2024-02-26 08:17:02 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package vfs
       6             : 
       7             : import (
       8             :         "io"
       9             :         "os"
      10             :         "sync"
      11             :         "sync/atomic"
      12             :         "syscall"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             : )
      16             : 
      17             : // OnDiskFull wraps the provided FS with an FS that examines returned errors,
      18             : // looking for ENOSPC errors. It invokes the provided callback when the
      19             : // underlying filesystem returns an error signifying the storage is out of
      20             : // disk space.
      21             : //
      22             : // All new writes to the filesystem are blocked while the callback executes,
      23             : // so care must be taken to avoid expensive work from within the callback.
      24             : //
      25             : // Once the callback completes, any write-oriented operations that encountered
      26             : // ENOSPC are retried exactly once. Once the callback completes, it will not
      27             : // be invoked again until a new operation that began after the callback
      28             : // returned encounters an ENOSPC error.
      29             : //
      30             : // OnDiskFull may be used to automatically manage a ballast file, which is
      31             : // removed from the filesystem from within the callback. Note that if managing
      32             : // a ballast, the caller should maintain a reference to the inner FS and
      33             : // remove the ballast on the unwrapped FS.
      34           0 : func OnDiskFull(fs FS, fn func()) FS {
      35           0 :         newFS := &enospcFS{inner: fs}
      36           0 :         newFS.mu.Cond.L = &newFS.mu.Mutex
      37           0 :         newFS.mu.onDiskFull = fn
      38           0 :         return newFS
      39           0 : }
      40             : 
      41             : type enospcFS struct {
      42             :         inner FS
      43             :         // generation is a monotonically increasing number that encodes the
      44             :         // current state of ENOSPC error handling. Incoming writes are
      45             :         // organized into generations to provide strong guarantees on when the
      46             :         // disk full callback is invoked. The callback is invoked once per
      47             :         // write generation.
      48             :         //
      49             :         // Special significance is given to the parity of this generation
      50             :         // field to optimize incoming writes in the normal state, which only
      51             :         // need to perform a single atomic load. If generation is odd, an
      52             :         // ENOSPC error is being actively handled. The generations associated
      53             :         // with writes are always even.
      54             :         //
      55             :         // The lifecycle of a write is:
      56             :         //
      57             :         // 1. Atomically load the current generation.
      58             :         //    a. If it's even, this is the write's generation number.
      59             :         //    b. If it's odd, an ENOSPC was recently encountered and the
      60             :         //       corresponding invocation of the disk full callback has not
      61             :         //       yet completed. The write must wait until the callback has
      62             :         //       completed and generation is updated to an even number, which
      63             :         //       becomes the write's generation number.
      64             :         // 2. Perform the write. If it encounters no error or an error other
      65             :         //    than ENOSPC, the write returns and proceeds no further in this
      66             :         //    lifecycle.
      67             :         // 3. Handle ENOSPC. If the write encounters ENOSPC, the callback must
      68             :         //    be invoked for the write's generation. The write's goroutine
      69             :         //    acquires the FS's mutex.
      70             :         //    a. If the FS's current generation is still equal to the write's
      71             :         //       generation, the write is the first write of its generation to
      72             :         //       encounter ENOSPC. It increments the FS's current generation
      73             :         //       to an odd number, signifying that an ENOSPC is being handled
      74             :         //       and invokes the callback.
      75             :         //    b. If the FS's current generation has changed, some other write
      76             :         //       from the same generation encountered an ENOSPC first. This
      77             :         //       write waits on the condition variable until the FS's current
      78             :         //       generation is updated indicating that the generation's
      79             :         //       callback invocation has completed.
      80             :         // 3. Retry the write once. The callback for the write's generation
      81             :         //    has completed, either by this write's goroutine or another's.
      82             :         //    The write may proceed with the expectation that the callback
      83             :         //    remedied the full disk by freeing up disk space and an ENOSPC
      84             :         //    should not be encountered again for at least a few minutes. If
      85             :         //    we do encounter another ENOSPC on the retry, the callback was
      86             :         //    unable to remedy the full disk and another retry won't be
      87             :         //    useful. Any error, including ENOSPC, during the retry is
      88             :         //    returned without further handling.  None of the retries invoke
      89             :         //    the callback.
      90             :         //
      91             :         // This scheme has a few nice properties:
      92             :         // * Once the disk-full callback completes, it won't be invoked
      93             :         //   again unless a write that started strictly later encounters an
      94             :         //   ENOSPC. This is convenient if the callback strives to 'fix' the
      95             :         //   full disk, for example, by removing a ballast file. A new
      96             :         //   invocation of the callback guarantees a new problem.
      97             :         // * Incoming writes block if there's an unhandled ENOSPC. Some
      98             :         //   writes, like WAL or MANIFEST fsyncs, are fatal if they encounter
      99             :         //   an ENOSPC.
     100             :         generation atomic.Uint32
     101             :         mu         struct {
     102             :                 sync.Mutex
     103             :                 sync.Cond
     104             :                 onDiskFull func()
     105             :         }
     106             : }
     107             : 
     108             : // Unwrap returns the underlying FS. This may be called by vfs.Root to access
     109             : // the underlying filesystem.
     110           0 : func (fs *enospcFS) Unwrap() FS {
     111           0 :         return fs.inner
     112           0 : }
     113             : 
     114             : // waitUntilReady is called before every FS or File operation that
     115             : // might return ENOSPC. If an ENOSPC was encountered and the corresponding
     116             : // invocation of the `onDiskFull` callback has not yet returned,
     117             : // waitUntilReady blocks until the callback returns. The returned generation
     118             : // is always even.
     119           0 : func (fs *enospcFS) waitUntilReady() uint32 {
     120           0 :         gen := fs.generation.Load()
     121           0 :         if gen%2 == 0 {
     122           0 :                 // An even generation indicates that we're not currently handling an
     123           0 :                 // ENOSPC. Allow the write to proceed.
     124           0 :                 return gen
     125           0 :         }
     126             : 
     127             :         // We're currently handling an ENOSPC error. Wait on the condition
     128             :         // variable until we're not handling an ENOSPC.
     129           0 :         fs.mu.Lock()
     130           0 :         defer fs.mu.Unlock()
     131           0 : 
     132           0 :         // Load the generation again with fs.mu locked.
     133           0 :         gen = fs.generation.Load()
     134           0 :         for gen%2 == 1 {
     135           0 :                 fs.mu.Wait()
     136           0 :                 gen = fs.generation.Load()
     137           0 :         }
     138           0 :         return gen
     139             : }
     140             : 
     141           0 : func (fs *enospcFS) handleENOSPC(gen uint32) {
     142           0 :         fs.mu.Lock()
     143           0 :         defer fs.mu.Unlock()
     144           0 : 
     145           0 :         currentGeneration := fs.generation.Load()
     146           0 : 
     147           0 :         // If the current generation is still `gen`, this is the first goroutine
     148           0 :         // to hit an ENOSPC within this write generation, so this goroutine is
     149           0 :         // responsible for invoking the callback.
     150           0 :         if currentGeneration == gen {
     151           0 :                 // Increment the generation to an odd number, indicating that the FS
     152           0 :                 // is out-of-disk space and incoming writes should pause and wait for
     153           0 :                 // the next generation before continuing.
     154           0 :                 fs.generation.Store(gen + 1)
     155           0 : 
     156           0 :                 func() {
     157           0 :                         // Drop the mutex while we invoke the callback, re-acquiring
     158           0 :                         // afterwards.
     159           0 :                         fs.mu.Unlock()
     160           0 :                         defer fs.mu.Lock()
     161           0 :                         fs.mu.onDiskFull()
     162           0 :                 }()
     163             : 
     164             :                 // Update the current generation again to an even number, indicating
     165             :                 // that the callback has completed for the write generation `gen`.
     166           0 :                 fs.generation.Store(gen + 2)
     167           0 :                 fs.mu.Broadcast()
     168           0 :                 return
     169             :         }
     170             : 
     171             :         // The current generation has already been incremented, so either the
     172             :         // callback is currently being run by another goroutine or it's already
     173             :         // completed. Wait for it complete if it hasn't already.
     174             :         //
     175             :         // The current generation may be updated multiple times, including to an
     176             :         // odd number signifying a later write generation has already encountered
     177             :         // ENOSPC. In that case, the callback was not able to remedy the full disk
     178             :         // and waiting is unlikely to be helpful.  Continuing to wait risks
     179             :         // blocking an unbounded number of generations.  Retrying and bubbling the
     180             :         // ENOSPC up might be helpful if we can abort a large compaction that
     181             :         // started before we became more selective about compaction picking, so
     182             :         // this loop only waits for this write generation's callback and no
     183             :         // subsequent generations' callbacks.
     184           0 :         for currentGeneration == gen+1 {
     185           0 :                 fs.mu.Wait()
     186           0 :                 currentGeneration = fs.generation.Load()
     187           0 :         }
     188             : }
     189             : 
     190           0 : func (fs *enospcFS) Create(name string) (File, error) {
     191           0 :         gen := fs.waitUntilReady()
     192           0 : 
     193           0 :         f, err := fs.inner.Create(name)
     194           0 : 
     195           0 :         if err != nil && isENOSPC(err) {
     196           0 :                 fs.handleENOSPC(gen)
     197           0 :                 f, err = fs.inner.Create(name)
     198           0 :         }
     199           0 :         if f != nil {
     200           0 :                 f = &enospcFile{
     201           0 :                         fs:    fs,
     202           0 :                         inner: f,
     203           0 :                 }
     204           0 :         }
     205           0 :         return f, err
     206             : }
     207             : 
     208           0 : func (fs *enospcFS) Link(oldname, newname string) error {
     209           0 :         gen := fs.waitUntilReady()
     210           0 : 
     211           0 :         err := fs.inner.Link(oldname, newname)
     212           0 : 
     213           0 :         if err != nil && isENOSPC(err) {
     214           0 :                 fs.handleENOSPC(gen)
     215           0 :                 err = fs.inner.Link(oldname, newname)
     216           0 :         }
     217           0 :         return err
     218             : }
     219             : 
     220           0 : func (fs *enospcFS) Open(name string, opts ...OpenOption) (File, error) {
     221           0 :         f, err := fs.inner.Open(name, opts...)
     222           0 :         if f != nil {
     223           0 :                 f = &enospcFile{
     224           0 :                         fs:    fs,
     225           0 :                         inner: f,
     226           0 :                 }
     227           0 :         }
     228           0 :         return f, err
     229             : }
     230             : 
     231           0 : func (fs *enospcFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) {
     232           0 :         f, err := fs.inner.OpenReadWrite(name, opts...)
     233           0 :         if f != nil {
     234           0 :                 f = &enospcFile{
     235           0 :                         fs:    fs,
     236           0 :                         inner: f,
     237           0 :                 }
     238           0 :         }
     239           0 :         return f, err
     240             : }
     241             : 
     242           0 : func (fs *enospcFS) OpenDir(name string) (File, error) {
     243           0 :         f, err := fs.inner.OpenDir(name)
     244           0 :         if f != nil {
     245           0 :                 f = &enospcFile{
     246           0 :                         fs:    fs,
     247           0 :                         inner: f,
     248           0 :                 }
     249           0 :         }
     250           0 :         return f, err
     251             : }
     252             : 
     253           0 : func (fs *enospcFS) Remove(name string) error {
     254           0 :         gen := fs.waitUntilReady()
     255           0 : 
     256           0 :         err := fs.inner.Remove(name)
     257           0 : 
     258           0 :         if err != nil && isENOSPC(err) {
     259           0 :                 fs.handleENOSPC(gen)
     260           0 :                 err = fs.inner.Remove(name)
     261           0 :         }
     262           0 :         return err
     263             : }
     264             : 
     265           0 : func (fs *enospcFS) RemoveAll(name string) error {
     266           0 :         gen := fs.waitUntilReady()
     267           0 : 
     268           0 :         err := fs.inner.RemoveAll(name)
     269           0 : 
     270           0 :         if err != nil && isENOSPC(err) {
     271           0 :                 fs.handleENOSPC(gen)
     272           0 :                 err = fs.inner.RemoveAll(name)
     273           0 :         }
     274           0 :         return err
     275             : }
     276             : 
     277           0 : func (fs *enospcFS) Rename(oldname, newname string) error {
     278           0 :         gen := fs.waitUntilReady()
     279           0 : 
     280           0 :         err := fs.inner.Rename(oldname, newname)
     281           0 : 
     282           0 :         if err != nil && isENOSPC(err) {
     283           0 :                 fs.handleENOSPC(gen)
     284           0 :                 err = fs.inner.Rename(oldname, newname)
     285           0 :         }
     286           0 :         return err
     287             : }
     288             : 
     289           0 : func (fs *enospcFS) ReuseForWrite(oldname, newname string) (File, error) {
     290           0 :         gen := fs.waitUntilReady()
     291           0 : 
     292           0 :         f, err := fs.inner.ReuseForWrite(oldname, newname)
     293           0 : 
     294           0 :         if err != nil && isENOSPC(err) {
     295           0 :                 fs.handleENOSPC(gen)
     296           0 :                 f, err = fs.inner.ReuseForWrite(oldname, newname)
     297           0 :         }
     298             : 
     299           0 :         if f != nil {
     300           0 :                 f = &enospcFile{
     301           0 :                         fs:    fs,
     302           0 :                         inner: f,
     303           0 :                 }
     304           0 :         }
     305           0 :         return f, err
     306             : }
     307             : 
     308           0 : func (fs *enospcFS) MkdirAll(dir string, perm os.FileMode) error {
     309           0 :         gen := fs.waitUntilReady()
     310           0 : 
     311           0 :         err := fs.inner.MkdirAll(dir, perm)
     312           0 : 
     313           0 :         if err != nil && isENOSPC(err) {
     314           0 :                 fs.handleENOSPC(gen)
     315           0 :                 err = fs.inner.MkdirAll(dir, perm)
     316           0 :         }
     317           0 :         return err
     318             : }
     319             : 
     320           0 : func (fs *enospcFS) Lock(name string) (io.Closer, error) {
     321           0 :         gen := fs.waitUntilReady()
     322           0 : 
     323           0 :         closer, err := fs.inner.Lock(name)
     324           0 : 
     325           0 :         if err != nil && isENOSPC(err) {
     326           0 :                 fs.handleENOSPC(gen)
     327           0 :                 closer, err = fs.inner.Lock(name)
     328           0 :         }
     329           0 :         return closer, err
     330             : }
     331             : 
     332           0 : func (fs *enospcFS) List(dir string) ([]string, error) {
     333           0 :         return fs.inner.List(dir)
     334           0 : }
     335             : 
     336           0 : func (fs *enospcFS) Stat(name string) (os.FileInfo, error) {
     337           0 :         return fs.inner.Stat(name)
     338           0 : }
     339             : 
     340           0 : func (fs *enospcFS) PathBase(path string) string {
     341           0 :         return fs.inner.PathBase(path)
     342           0 : }
     343             : 
     344           0 : func (fs *enospcFS) PathJoin(elem ...string) string {
     345           0 :         return fs.inner.PathJoin(elem...)
     346           0 : }
     347             : 
     348           0 : func (fs *enospcFS) PathDir(path string) string {
     349           0 :         return fs.inner.PathDir(path)
     350           0 : }
     351             : 
     352           0 : func (fs *enospcFS) GetDiskUsage(path string) (DiskUsage, error) {
     353           0 :         return fs.inner.GetDiskUsage(path)
     354           0 : }
     355             : 
     356             : type enospcFile struct {
     357             :         fs    *enospcFS
     358             :         inner File
     359             : }
     360             : 
     361             : var _ File = (*enospcFile)(nil)
     362             : 
     363           0 : func (f *enospcFile) Close() error {
     364           0 :         return f.inner.Close()
     365           0 : }
     366             : 
     367           0 : func (f *enospcFile) Read(p []byte) (n int, err error) {
     368           0 :         return f.inner.Read(p)
     369           0 : }
     370             : 
     371           0 : func (f *enospcFile) ReadAt(p []byte, off int64) (n int, err error) {
     372           0 :         return f.inner.ReadAt(p, off)
     373           0 : }
     374             : 
     375           0 : func (f *enospcFile) Write(p []byte) (n int, err error) {
     376           0 :         gen := f.fs.waitUntilReady()
     377           0 : 
     378           0 :         n, err = f.inner.Write(p)
     379           0 : 
     380           0 :         if err != nil && isENOSPC(err) {
     381           0 :                 f.fs.handleENOSPC(gen)
     382           0 :                 var n2 int
     383           0 :                 n2, err = f.inner.Write(p[n:])
     384           0 :                 n += n2
     385           0 :         }
     386           0 :         return n, err
     387             : }
     388             : 
     389           0 : func (f *enospcFile) WriteAt(p []byte, ofs int64) (n int, err error) {
     390           0 :         gen := f.fs.waitUntilReady()
     391           0 : 
     392           0 :         n, err = f.inner.WriteAt(p, ofs)
     393           0 : 
     394           0 :         if err != nil && isENOSPC(err) {
     395           0 :                 f.fs.handleENOSPC(gen)
     396           0 :                 var n2 int
     397           0 :                 n2, err = f.inner.WriteAt(p[n:], ofs+int64(n))
     398           0 :                 n += n2
     399           0 :         }
     400           0 :         return n, err
     401             : }
     402             : 
     403           0 : func (f *enospcFile) Prefetch(offset, length int64) error {
     404           0 :         return f.inner.Prefetch(offset, length)
     405           0 : }
     406             : 
     407           0 : func (f *enospcFile) Preallocate(offset, length int64) error {
     408           0 :         return f.inner.Preallocate(offset, length)
     409           0 : }
     410             : 
     411           0 : func (f *enospcFile) Stat() (os.FileInfo, error) {
     412           0 :         return f.inner.Stat()
     413           0 : }
     414             : 
     415           0 : func (f *enospcFile) Sync() error {
     416           0 :         gen := f.fs.waitUntilReady()
     417           0 : 
     418           0 :         err := f.inner.Sync()
     419           0 : 
     420           0 :         if err != nil && isENOSPC(err) {
     421           0 :                 f.fs.handleENOSPC(gen)
     422           0 : 
     423           0 :                 // NB: It is NOT safe to retry the Sync. See the PostgreSQL
     424           0 :                 // 'fsyncgate' discussion. A successful Sync after a failed one does
     425           0 :                 // not provide any guarantees and (always?) loses the unsynced writes.
     426           0 :                 // We need to bubble the error up and hope we weren't syncing a WAL or
     427           0 :                 // MANIFEST, because we'll have no choice but to crash. Errors while
     428           0 :                 // syncing an sstable will result in a failed flush/compaction, and
     429           0 :                 // the relevant sstable(s) will be marked as obsolete and deleted.
     430           0 :                 // See: https://lwn.net/Articles/752063/
     431           0 :         }
     432           0 :         return err
     433             : }
     434             : 
     435           0 : func (f *enospcFile) SyncData() error {
     436           0 :         return f.inner.SyncData()
     437           0 : }
     438             : 
     439           0 : func (f *enospcFile) SyncTo(length int64) (fullSync bool, err error) {
     440           0 :         return f.inner.SyncTo(length)
     441           0 : }
     442             : 
     443           0 : func (f *enospcFile) Fd() uintptr {
     444           0 :         return f.inner.Fd()
     445           0 : }
     446             : 
     447             : var _ FS = (*enospcFS)(nil)
     448             : 
     449           0 : func isENOSPC(err error) bool {
     450           0 :         err = errors.UnwrapAll(err)
     451           0 :         e, ok := err.(syscall.Errno)
     452           0 :         return ok && e == syscall.ENOSPC
     453           0 : }

Generated by: LCOV version 1.14