LCOV - code coverage report
Current view: top level - pebble/vfs - syncing_file.go (source / functions) Hit Total Coverage
Test: 2024-02-06 08:16Z e49380ba - tests only.lcov Lines: 111 124 89.5 %
Date: 2024-02-06 08:16:59 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package vfs
       6             : 
       7             : import (
       8             :         "sync/atomic"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             : )
      12             : 
      13             : // SyncingFileOptions holds the options for a syncingFile.
      14             : type SyncingFileOptions struct {
      15             :         // NoSyncOnClose elides the automatic Sync during Close if it's not possible
      16             :         // to sync the remainder of the file in a non-blocking way.
      17             :         NoSyncOnClose   bool
      18             :         BytesPerSync    int
      19             :         PreallocateSize int
      20             : }
      21             : 
      22             : type syncingFile struct {
      23             :         File
      24             :         // fd can be InvalidFd if the underlying File does not support it.
      25             :         fd              uintptr
      26             :         noSyncOnClose   bool
      27             :         bytesPerSync    int64
      28             :         preallocateSize int64
      29             :         // The offset at which dirty data has been written.
      30             :         offset atomic.Int64
      31             :         // The offset at which data has been synced. Note that if SyncFileRange is
      32             :         // being used, the periodic syncing of data during writing will only ever
      33             :         // sync up to offset-1MB. This is done to avoid rewriting the tail of the
      34             :         // file multiple times, but has the side effect of ensuring that Close will
      35             :         // sync the file's metadata.
      36             :         syncOffset         atomic.Int64
      37             :         preallocatedBlocks int64
      38             : }
      39             : 
      40             : // NewSyncingFile wraps a writable file and ensures that data is synced
      41             : // periodically as it is written. The syncing does not provide persistency
      42             : // guarantees for these periodic syncs, but is used to avoid latency spikes if
      43             : // the OS automatically decides to write out a large chunk of dirty filesystem
      44             : // buffers. The underlying file is fully synced upon close.
      45           1 : func NewSyncingFile(f File, opts SyncingFileOptions) File {
      46           1 :         s := &syncingFile{
      47           1 :                 File:            f,
      48           1 :                 fd:              f.Fd(),
      49           1 :                 noSyncOnClose:   bool(opts.NoSyncOnClose),
      50           1 :                 bytesPerSync:    int64(opts.BytesPerSync),
      51           1 :                 preallocateSize: int64(opts.PreallocateSize),
      52           1 :         }
      53           1 :         // Ensure a file that is opened and then closed will be synced, even if no
      54           1 :         // data has been written to it.
      55           1 :         s.syncOffset.Store(-1)
      56           1 :         return s
      57           1 : }
      58             : 
      59             : // NB: syncingFile.Write is unsafe for concurrent use!
      60           1 : func (f *syncingFile) Write(p []byte) (n int, err error) {
      61           1 :         _ = f.preallocate(f.offset.Load())
      62           1 : 
      63           1 :         n, err = f.File.Write(p)
      64           1 :         if err != nil {
      65           1 :                 return n, errors.WithStack(err)
      66           1 :         }
      67             :         // The offset is updated atomically so that it can be accessed safely from
      68             :         // Sync.
      69           1 :         f.offset.Add(int64(n))
      70           1 :         if err := f.maybeSync(); err != nil {
      71           0 :                 return 0, err
      72           0 :         }
      73           1 :         return n, nil
      74             : }
      75             : 
      76           1 : func (f *syncingFile) preallocate(offset int64) error {
      77           1 :         if f.fd == InvalidFd || f.preallocateSize == 0 {
      78           1 :                 return nil
      79           1 :         }
      80             : 
      81           1 :         newPreallocatedBlocks := (offset + f.preallocateSize - 1) / f.preallocateSize
      82           1 :         if newPreallocatedBlocks <= f.preallocatedBlocks {
      83           1 :                 return nil
      84           1 :         }
      85             : 
      86           1 :         length := f.preallocateSize * (newPreallocatedBlocks - f.preallocatedBlocks)
      87           1 :         offset = f.preallocateSize * f.preallocatedBlocks
      88           1 :         f.preallocatedBlocks = newPreallocatedBlocks
      89           1 :         return f.Preallocate(offset, length)
      90             : }
      91             : 
      92           1 : func (f *syncingFile) ratchetSyncOffset(offset int64) {
      93           1 :         for {
      94           1 :                 syncOffset := f.syncOffset.Load()
      95           1 :                 if syncOffset >= offset {
      96           1 :                         return
      97           1 :                 }
      98           1 :                 if f.syncOffset.CompareAndSwap(syncOffset, offset) {
      99           1 :                         return
     100           1 :                 }
     101             :         }
     102             : }
     103             : 
     104           1 : func (f *syncingFile) Sync() error {
     105           1 :         // We update syncOffset (atomically) in order to avoid spurious syncs in
     106           1 :         // maybeSync. Note that even if syncOffset is larger than the current file
     107           1 :         // offset, we still need to call the underlying file's sync for persistence
     108           1 :         // guarantees which are not provided by SyncTo (or by sync_file_range on
     109           1 :         // Linux).
     110           1 :         f.ratchetSyncOffset(f.offset.Load())
     111           1 :         return f.SyncData()
     112           1 : }
     113             : 
     114           1 : func (f *syncingFile) maybeSync() error {
     115           1 :         if f.bytesPerSync <= 0 {
     116           1 :                 return nil
     117           1 :         }
     118             : 
     119             :         // From the RocksDB source:
     120             :         //
     121             :         //   We try to avoid sync to the last 1MB of data. For two reasons:
     122             :         //   (1) avoid rewrite the same page that is modified later.
     123             :         //   (2) for older version of OS, write can block while writing out
     124             :         //       the page.
     125             :         //   Xfs does neighbor page flushing outside of the specified ranges. We
     126             :         //   need to make sure sync range is far from the write offset.
     127           1 :         const syncRangeBuffer = 1 << 20 // 1 MB
     128           1 :         offset := f.offset.Load()
     129           1 :         if offset <= syncRangeBuffer {
     130           1 :                 return nil
     131           1 :         }
     132             : 
     133           1 :         const syncRangeAlignment = 4 << 10 // 4 KB
     134           1 :         syncToOffset := offset - syncRangeBuffer
     135           1 :         syncToOffset -= syncToOffset % syncRangeAlignment
     136           1 :         syncOffset := f.syncOffset.Load()
     137           1 :         if syncToOffset < 0 || (syncToOffset-syncOffset) < f.bytesPerSync {
     138           1 :                 return nil
     139           1 :         }
     140             : 
     141           1 :         if f.fd == InvalidFd {
     142           1 :                 return errors.WithStack(f.Sync())
     143           1 :         }
     144             : 
     145             :         // Note that SyncTo will always be called with an offset < atomic.offset.
     146             :         // The SyncTo implementation may choose to sync the entire file (i.e. on
     147             :         // OSes which do not support syncing a portion of the file).
     148           1 :         fullSync, err := f.SyncTo(syncToOffset)
     149           1 :         if err != nil {
     150           0 :                 return errors.WithStack(err)
     151           0 :         }
     152           1 :         if fullSync {
     153           1 :                 f.ratchetSyncOffset(offset)
     154           1 :         } else {
     155           1 :                 f.ratchetSyncOffset(syncToOffset)
     156           1 :         }
     157           1 :         return nil
     158             : }
     159             : 
     160           1 : func (f *syncingFile) Close() error {
     161           1 :         // Sync any data that has been written but not yet synced unless the file
     162           1 :         // has noSyncOnClose option explicitly set.
     163           1 :         //
     164           1 :         // NB: If the file is capable of non-durability-guarantee SyncTos, and the
     165           1 :         // caller has not called Sync since the last write, syncOffset is guaranteed
     166           1 :         // to be less than atomic.offset. This ensures we fall into the below
     167           1 :         // conditional and perform a full sync to durably persist the file.
     168           1 :         if off := f.offset.Load(); off > f.syncOffset.Load() {
     169           1 :                 // There's still remaining dirty data.
     170           1 : 
     171           1 :                 if f.noSyncOnClose {
     172           1 :                         // If NoSyncOnClose is set, only perform a SyncTo. On linux, SyncTo
     173           1 :                         // translates to a non-blocking `sync_file_range` call which
     174           1 :                         // provides no persistence guarantee. Since it's non-blocking,
     175           1 :                         // there's no latency hit of a blocking sync call, but we still
     176           1 :                         // ensure we're not allowing significant dirty data to accumulate.
     177           1 :                         if _, err := f.File.SyncTo(off); err != nil {
     178           0 :                                 return err
     179           0 :                         }
     180           1 :                         f.ratchetSyncOffset(off)
     181           1 :                 } else if err := f.Sync(); err != nil {
     182           0 :                         return errors.WithStack(err)
     183           0 :                 }
     184             :         }
     185           1 :         return errors.WithStack(f.File.Close())
     186             : }
     187             : 
     188             : // NewSyncingFS wraps a vfs.FS with one that wraps newly created files with
     189             : // vfs.NewSyncingFile.
     190           1 : func NewSyncingFS(fs FS, syncOpts SyncingFileOptions) FS {
     191           1 :         return &syncingFS{
     192           1 :                 FS:       fs,
     193           1 :                 syncOpts: syncOpts,
     194           1 :         }
     195           1 : }
     196             : 
     197             : type syncingFS struct {
     198             :         FS
     199             :         syncOpts SyncingFileOptions
     200             : }
     201             : 
     202             : var _ FS = (*syncingFS)(nil)
     203             : 
     204           1 : func (fs *syncingFS) Create(name string) (File, error) {
     205           1 :         f, err := fs.FS.Create(name)
     206           1 :         if err != nil {
     207           0 :                 return nil, err
     208           0 :         }
     209           1 :         return NewSyncingFile(f, fs.syncOpts), nil
     210             : }
     211             : 
     212           0 : func (fs *syncingFS) ReuseForWrite(oldname, newname string) (File, error) {
     213           0 :         // TODO(radu): implement this if needed.
     214           0 :         panic("unimplemented")
     215             : }

Generated by: LCOV version 1.14