Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package vfs 6 : 7 : import ( 8 : "sync/atomic" 9 : 10 : "github.com/cockroachdb/errors" 11 : ) 12 : 13 : // SyncingFileOptions holds the options for a syncingFile. 14 : type SyncingFileOptions struct { 15 : // NoSyncOnClose elides the automatic Sync during Close if it's not possible 16 : // to sync the remainder of the file in a non-blocking way. 17 : NoSyncOnClose bool 18 : BytesPerSync int 19 : PreallocateSize int 20 : } 21 : 22 : type syncingFile struct { 23 : File 24 : // fd can be InvalidFd if the underlying File does not support it. 25 : fd uintptr 26 : noSyncOnClose bool 27 : bytesPerSync int64 28 : preallocateSize int64 29 : // The offset at which dirty data has been written. 30 : offset atomic.Int64 31 : // The offset at which data has been synced. Note that if SyncFileRange is 32 : // being used, the periodic syncing of data during writing will only ever 33 : // sync up to offset-1MB. This is done to avoid rewriting the tail of the 34 : // file multiple times, but has the side effect of ensuring that Close will 35 : // sync the file's metadata. 36 : syncOffset atomic.Int64 37 : preallocatedBlocks int64 38 : } 39 : 40 : // NewSyncingFile wraps a writable file and ensures that data is synced 41 : // periodically as it is written. The syncing does not provide persistency 42 : // guarantees for these periodic syncs, but is used to avoid latency spikes if 43 : // the OS automatically decides to write out a large chunk of dirty filesystem 44 : // buffers. The underlying file is fully synced upon close. 45 2 : func NewSyncingFile(f File, opts SyncingFileOptions) File { 46 2 : s := &syncingFile{ 47 2 : File: f, 48 2 : fd: f.Fd(), 49 2 : noSyncOnClose: bool(opts.NoSyncOnClose), 50 2 : bytesPerSync: int64(opts.BytesPerSync), 51 2 : preallocateSize: int64(opts.PreallocateSize), 52 2 : } 53 2 : // Ensure a file that is opened and then closed will be synced, even if no 54 2 : // data has been written to it. 55 2 : s.syncOffset.Store(-1) 56 2 : return s 57 2 : } 58 : 59 : // NB: syncingFile.Write is unsafe for concurrent use! 60 2 : func (f *syncingFile) Write(p []byte) (n int, err error) { 61 2 : _ = f.preallocate(f.offset.Load()) 62 2 : 63 2 : n, err = f.File.Write(p) 64 2 : if err != nil { 65 1 : return n, errors.WithStack(err) 66 1 : } 67 : // The offset is updated atomically so that it can be accessed safely from 68 : // Sync. 69 2 : f.offset.Add(int64(n)) 70 2 : if err := f.maybeSync(); err != nil { 71 0 : return 0, err 72 0 : } 73 2 : return n, nil 74 : } 75 : 76 2 : func (f *syncingFile) preallocate(offset int64) error { 77 2 : if f.fd == InvalidFd || f.preallocateSize == 0 { 78 2 : return nil 79 2 : } 80 : 81 2 : newPreallocatedBlocks := (offset + f.preallocateSize - 1) / f.preallocateSize 82 2 : if newPreallocatedBlocks <= f.preallocatedBlocks { 83 2 : return nil 84 2 : } 85 : 86 2 : length := f.preallocateSize * (newPreallocatedBlocks - f.preallocatedBlocks) 87 2 : offset = f.preallocateSize * f.preallocatedBlocks 88 2 : f.preallocatedBlocks = newPreallocatedBlocks 89 2 : return f.Preallocate(offset, length) 90 : } 91 : 92 2 : func (f *syncingFile) ratchetSyncOffset(offset int64) { 93 2 : for { 94 2 : syncOffset := f.syncOffset.Load() 95 2 : if syncOffset >= offset { 96 2 : return 97 2 : } 98 2 : if f.syncOffset.CompareAndSwap(syncOffset, offset) { 99 2 : return 100 2 : } 101 : } 102 : } 103 : 104 2 : func (f *syncingFile) Sync() error { 105 2 : // We update syncOffset (atomically) in order to avoid spurious syncs in 106 2 : // maybeSync. Note that even if syncOffset is larger than the current file 107 2 : // offset, we still need to call the underlying file's sync for persistence 108 2 : // guarantees which are not provided by SyncTo (or by sync_file_range on 109 2 : // Linux). 110 2 : f.ratchetSyncOffset(f.offset.Load()) 111 2 : return f.SyncData() 112 2 : } 113 : 114 2 : func (f *syncingFile) maybeSync() error { 115 2 : if f.bytesPerSync <= 0 { 116 2 : return nil 117 2 : } 118 : 119 : // From the RocksDB source: 120 : // 121 : // We try to avoid sync to the last 1MB of data. For two reasons: 122 : // (1) avoid rewrite the same page that is modified later. 123 : // (2) for older version of OS, write can block while writing out 124 : // the page. 125 : // Xfs does neighbor page flushing outside of the specified ranges. We 126 : // need to make sure sync range is far from the write offset. 127 2 : const syncRangeBuffer = 1 << 20 // 1 MB 128 2 : offset := f.offset.Load() 129 2 : if offset <= syncRangeBuffer { 130 2 : return nil 131 2 : } 132 : 133 1 : const syncRangeAlignment = 4 << 10 // 4 KB 134 1 : syncToOffset := offset - syncRangeBuffer 135 1 : syncToOffset -= syncToOffset % syncRangeAlignment 136 1 : syncOffset := f.syncOffset.Load() 137 1 : if syncToOffset < 0 || (syncToOffset-syncOffset) < f.bytesPerSync { 138 1 : return nil 139 1 : } 140 : 141 1 : if f.fd == InvalidFd { 142 1 : return errors.WithStack(f.Sync()) 143 1 : } 144 : 145 : // Note that SyncTo will always be called with an offset < atomic.offset. 146 : // The SyncTo implementation may choose to sync the entire file (i.e. on 147 : // OSes which do not support syncing a portion of the file). 148 1 : fullSync, err := f.SyncTo(syncToOffset) 149 1 : if err != nil { 150 0 : return errors.WithStack(err) 151 0 : } 152 1 : if fullSync { 153 1 : f.ratchetSyncOffset(offset) 154 1 : } else { 155 1 : f.ratchetSyncOffset(syncToOffset) 156 1 : } 157 1 : return nil 158 : } 159 : 160 2 : func (f *syncingFile) Close() error { 161 2 : // Sync any data that has been written but not yet synced unless the file 162 2 : // has noSyncOnClose option explicitly set. 163 2 : // 164 2 : // NB: If the file is capable of non-durability-guarantee SyncTos, and the 165 2 : // caller has not called Sync since the last write, syncOffset is guaranteed 166 2 : // to be less than atomic.offset. This ensures we fall into the below 167 2 : // conditional and perform a full sync to durably persist the file. 168 2 : if off := f.offset.Load(); off > f.syncOffset.Load() { 169 2 : // There's still remaining dirty data. 170 2 : 171 2 : if f.noSyncOnClose { 172 1 : // If NoSyncOnClose is set, only perform a SyncTo. On linux, SyncTo 173 1 : // translates to a non-blocking `sync_file_range` call which 174 1 : // provides no persistence guarantee. Since it's non-blocking, 175 1 : // there's no latency hit of a blocking sync call, but we still 176 1 : // ensure we're not allowing significant dirty data to accumulate. 177 1 : if _, err := f.File.SyncTo(off); err != nil { 178 0 : return err 179 0 : } 180 1 : f.ratchetSyncOffset(off) 181 2 : } else if err := f.Sync(); err != nil { 182 0 : return errors.WithStack(err) 183 0 : } 184 : } 185 2 : return errors.WithStack(f.File.Close()) 186 : } 187 : 188 : // NewSyncingFS wraps a vfs.FS with one that wraps newly created files with 189 : // vfs.NewSyncingFile. 190 2 : func NewSyncingFS(fs FS, syncOpts SyncingFileOptions) FS { 191 2 : return &syncingFS{ 192 2 : FS: fs, 193 2 : syncOpts: syncOpts, 194 2 : } 195 2 : } 196 : 197 : type syncingFS struct { 198 : FS 199 : syncOpts SyncingFileOptions 200 : } 201 : 202 : var _ FS = (*syncingFS)(nil) 203 : 204 2 : func (fs *syncingFS) Create(name string) (File, error) { 205 2 : f, err := fs.FS.Create(name) 206 2 : if err != nil { 207 0 : return nil, err 208 0 : } 209 2 : return NewSyncingFile(f, fs.syncOpts), nil 210 : } 211 : 212 0 : func (fs *syncingFS) ReuseForWrite(oldname, newname string) (File, error) { 213 0 : // TODO(radu): implement this if needed. 214 0 : panic("unimplemented") 215 : }