LCOV - code coverage report
Current view: top level - pebble/vfs - disk_health.go (source / functions) Hit Total Coverage
Test: 2023-12-10 08:15Z 58bdc725 - tests + meta.lcov Lines: 334 404 82.7 %
Date: 2023-12-10 08:16:52 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package vfs
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "os"
      11             :         "path/filepath"
      12             :         "sync"
      13             :         "sync/atomic"
      14             :         "time"
      15             : 
      16             :         "github.com/cockroachdb/redact"
      17             : )
      18             : 
      19             : const (
      20             :         // preallocatedSlotCount is the default number of slots available for
      21             :         // concurrent filesystem operations. The slot count may be exceeded, but
      22             :         // each additional slot will incur an additional allocation. We choose 16
      23             :         // here with the expectation that it is significantly more than required in
      24             :         // practice. See the comment above the diskHealthCheckingFS type definition.
      25             :         preallocatedSlotCount = 16
      26             :         // deltaBits is the number of bits in the packed 64-bit integer used for
      27             :         // identifying a delta from the file creation time in milliseconds.
      28             :         deltaBits = 40
      29             :         // writeSizeBits is the number of bits in the packed 64-bit integer used for
      30             :         // identifying the size of the write operation, if the operation is sized. See
      31             :         // writeSizePrecision below for precision of size.
      32             :         writeSizeBits = 20
      33             :         // Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
      34             :         writeSizePrecision = 1024
      35             : )
      36             : 
      37             : // Variables to enable testing.
      38             : var (
      39             :         // defaultTickInterval is the default interval between two ticks of each
      40             :         // diskHealthCheckingFile loop iteration.
      41             :         defaultTickInterval = 2 * time.Second
      42             : )
      43             : 
      44             : // OpType is the type of IO operation being monitored by a
      45             : // diskHealthCheckingFile.
      46             : type OpType uint8
      47             : 
      48             : // The following OpTypes is limited to the subset of file system operations that
      49             : // a diskHealthCheckingFile supports (namely writes and syncs).
      50             : const (
      51             :         OpTypeUnknown OpType = iota
      52             :         OpTypeWrite
      53             :         OpTypeSync
      54             :         OpTypeSyncData
      55             :         OpTypeSyncTo
      56             :         OpTypeCreate
      57             :         OpTypeLink
      58             :         OpTypeMkdirAll
      59             :         OpTypePreallocate
      60             :         OpTypeRemove
      61             :         OpTypeRemoveAll
      62             :         OpTypeRename
      63             :         OpTypeReuseForWrite
      64             :         // Note: opTypeMax is just used in tests. It must appear last in the list
      65             :         // of OpTypes.
      66             :         opTypeMax
      67             : )
      68             : 
      69             : // String implements fmt.Stringer.
      70           1 : func (o OpType) String() string {
      71           1 :         switch o {
      72           1 :         case OpTypeWrite:
      73           1 :                 return "write"
      74           1 :         case OpTypeSync:
      75           1 :                 return "sync"
      76           1 :         case OpTypeSyncData:
      77           1 :                 return "syncdata"
      78           1 :         case OpTypeSyncTo:
      79           1 :                 return "syncto"
      80           1 :         case OpTypeCreate:
      81           1 :                 return "create"
      82           1 :         case OpTypeLink:
      83           1 :                 return "link"
      84           1 :         case OpTypeMkdirAll:
      85           1 :                 return "mkdirall"
      86           1 :         case OpTypePreallocate:
      87           1 :                 return "preallocate"
      88           1 :         case OpTypeRemove:
      89           1 :                 return "remove"
      90           1 :         case OpTypeRemoveAll:
      91           1 :                 return "removall"
      92           1 :         case OpTypeRename:
      93           1 :                 return "rename"
      94           1 :         case OpTypeReuseForWrite:
      95           1 :                 return "reuseforwrite"
      96           1 :         case OpTypeUnknown:
      97           1 :                 return "unknown"
      98           0 :         default:
      99           0 :                 panic(fmt.Sprintf("vfs: unknown op type: %d", o))
     100             :         }
     101             : }
     102             : 
     103             : // diskHealthCheckingFile is a File wrapper to detect slow disk operations, and
     104             : // call onSlowDisk if a disk operation is seen to exceed diskSlowThreshold.
     105             : //
     106             : // This struct creates a goroutine (in startTicker()) that, at every tick
     107             : // interval, sees if there's a disk operation taking longer than the specified
     108             : // duration. This setup is preferable to creating a new timer at every disk
     109             : // operation, as it reduces overhead per disk operation.
     110             : type diskHealthCheckingFile struct {
     111             :         file              File
     112             :         onSlowDisk        func(opType OpType, writeSizeInBytes int, duration time.Duration)
     113             :         diskSlowThreshold time.Duration
     114             :         tickInterval      time.Duration
     115             : 
     116             :         stopper chan struct{}
     117             :         // lastWritePacked is a 64-bit unsigned int. The most significant
     118             :         // 40 bits represent an delta (in milliseconds) from the creation
     119             :         // time of the diskHealthCheckingFile. The next most significant 20 bits
     120             :         // represent the size of the write in KBs, if the write has a size. (If
     121             :         // it doesn't, the 20 bits are zeroed). The least significant four bits
     122             :         // contains the OpType.
     123             :         //
     124             :         // The use of 40 bits for an delta provides ~34 years of effective
     125             :         // monitoring time before the uint wraps around, at millisecond precision.
     126             :         // ~34 years of process uptime "ought to be enough for anybody". Millisecond
     127             :         // writeSizePrecision is sufficient, given that we are monitoring for writes that take
     128             :         // longer than one millisecond.
     129             :         //
     130             :         // The use of 20 bits for the size in KBs allows representing sizes up
     131             :         // to nearly one GB. If the write is larger than that, we round down to ~one GB.
     132             :         //
     133             :         // The use of four bits for OpType allows for 16 operation types.
     134             :         //
     135             :         // NB: this packing scheme is not persisted, and is therefore safe to adjust
     136             :         // across process boundaries.
     137             :         lastWritePacked atomic.Uint64
     138             :         createTimeNanos int64
     139             : }
     140             : 
     141             : // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
     142             : // specified time threshold and event listener.
     143             : func newDiskHealthCheckingFile(
     144             :         file File,
     145             :         diskSlowThreshold time.Duration,
     146             :         onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
     147           2 : ) *diskHealthCheckingFile {
     148           2 :         return &diskHealthCheckingFile{
     149           2 :                 file:              file,
     150           2 :                 onSlowDisk:        onSlowDisk,
     151           2 :                 diskSlowThreshold: diskSlowThreshold,
     152           2 :                 tickInterval:      defaultTickInterval,
     153           2 : 
     154           2 :                 stopper:         make(chan struct{}),
     155           2 :                 createTimeNanos: time.Now().UnixNano(),
     156           2 :         }
     157           2 : }
     158             : 
     159             : // startTicker starts a new goroutine with a ticker to monitor disk operations.
     160             : // Can only be called if the ticker goroutine isn't running already.
     161           2 : func (d *diskHealthCheckingFile) startTicker() {
     162           2 :         if d.diskSlowThreshold == 0 {
     163           0 :                 return
     164           0 :         }
     165             : 
     166           2 :         go func() {
     167           2 :                 ticker := time.NewTicker(d.tickInterval)
     168           2 :                 defer ticker.Stop()
     169           2 : 
     170           2 :                 for {
     171           2 :                         select {
     172           2 :                         case <-d.stopper:
     173           2 :                                 return
     174             : 
     175           2 :                         case <-ticker.C:
     176           2 :                                 packed := d.lastWritePacked.Load()
     177           2 :                                 if packed == 0 {
     178           2 :                                         continue
     179             :                                 }
     180           1 :                                 delta, writeSize, op := unpack(packed)
     181           1 :                                 lastWrite := time.Unix(0, d.createTimeNanos+delta.Nanoseconds())
     182           1 :                                 now := time.Now()
     183           1 :                                 if lastWrite.Add(d.diskSlowThreshold).Before(now) {
     184           1 :                                         // diskSlowThreshold was exceeded. Call the passed-in
     185           1 :                                         // listener.
     186           1 :                                         d.onSlowDisk(op, writeSize, now.Sub(lastWrite))
     187           1 :                                 }
     188             :                         }
     189             :                 }
     190             :         }()
     191             : }
     192             : 
     193             : // stopTicker stops the goroutine started in startTicker.
     194           2 : func (d *diskHealthCheckingFile) stopTicker() {
     195           2 :         close(d.stopper)
     196           2 : }
     197             : 
     198             : // Fd implements (vfs.File).Fd.
     199           2 : func (d *diskHealthCheckingFile) Fd() uintptr {
     200           2 :         return d.file.Fd()
     201           2 : }
     202             : 
     203             : // Read implements (vfs.File).Read
     204           0 : func (d *diskHealthCheckingFile) Read(p []byte) (int, error) {
     205           0 :         return d.file.Read(p)
     206           0 : }
     207             : 
     208             : // ReadAt implements (vfs.File).ReadAt
     209           0 : func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {
     210           0 :         return d.file.ReadAt(p, off)
     211           0 : }
     212             : 
     213             : // Write implements the io.Writer interface.
     214           2 : func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
     215           2 :         d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
     216           2 :                 n, err = d.file.Write(p)
     217           2 :         }, time.Now().UnixNano())
     218           2 :         return n, err
     219             : }
     220             : 
     221             : // Write implements the io.WriterAt interface.
     222           0 : func (d *diskHealthCheckingFile) WriteAt(p []byte, ofs int64) (n int, err error) {
     223           0 :         d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
     224           0 :                 n, err = d.file.WriteAt(p, ofs)
     225           0 :         }, time.Now().UnixNano())
     226           0 :         return n, err
     227             : }
     228             : 
     229             : // Close implements the io.Closer interface.
     230           2 : func (d *diskHealthCheckingFile) Close() error {
     231           2 :         d.stopTicker()
     232           2 :         return d.file.Close()
     233           2 : }
     234             : 
     235             : // Prefetch implements (vfs.File).Prefetch.
     236           0 : func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
     237           0 :         return d.file.Prefetch(offset, length)
     238           0 : }
     239             : 
     240             : // Preallocate implements (vfs.File).Preallocate.
     241           2 : func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
     242           2 :         d.timeDiskOp(OpTypePreallocate, n, func() {
     243           2 :                 err = d.file.Preallocate(off, n)
     244           2 :         }, time.Now().UnixNano())
     245           2 :         return err
     246             : }
     247             : 
     248             : // Stat implements (vfs.File).Stat.
     249           1 : func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) {
     250           1 :         return d.file.Stat()
     251           1 : }
     252             : 
     253             : // Sync implements the io.Syncer interface.
     254           2 : func (d *diskHealthCheckingFile) Sync() (err error) {
     255           2 :         d.timeDiskOp(OpTypeSync, 0, func() {
     256           2 :                 err = d.file.Sync()
     257           2 :         }, time.Now().UnixNano())
     258           2 :         return err
     259             : }
     260             : 
     261             : // SyncData implements (vfs.File).SyncData.
     262           2 : func (d *diskHealthCheckingFile) SyncData() (err error) {
     263           2 :         d.timeDiskOp(OpTypeSyncData, 0, func() {
     264           2 :                 err = d.file.SyncData()
     265           2 :         }, time.Now().UnixNano())
     266           2 :         return err
     267             : }
     268             : 
     269             : // SyncTo implements (vfs.File).SyncTo.
     270           0 : func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
     271           0 :         d.timeDiskOp(OpTypeSyncTo, length, func() {
     272           0 :                 fullSync, err = d.file.SyncTo(length)
     273           0 :         }, time.Now().UnixNano())
     274           0 :         return fullSync, err
     275             : }
     276             : 
     277             : // timeDiskOp runs the specified closure and makes its timing visible to the
     278             : // monitoring goroutine, in case it exceeds one of the slow disk durations.
     279             : // opType should always be set. writeSizeInBytes should be set if the write
     280             : // operation is sized. If not, it should be set to zero.
     281             : //
     282             : // The start time is taken as a parameter in the form of nanoseconds since the
     283             : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
     284             : // is set appropriately), aiding postmortem debugging.
     285             : func (d *diskHealthCheckingFile) timeDiskOp(
     286             :         opType OpType, writeSizeInBytes int64, op func(), startNanos int64,
     287           2 : ) {
     288           2 :         if d == nil {
     289           0 :                 op()
     290           0 :                 return
     291           0 :         }
     292             : 
     293           2 :         delta := time.Duration(startNanos - d.createTimeNanos)
     294           2 :         packed := pack(delta, writeSizeInBytes, opType)
     295           2 :         if d.lastWritePacked.Swap(packed) != 0 {
     296           0 :                 panic("concurrent write operations detected on file")
     297             :         }
     298           2 :         defer func() {
     299           2 :                 if d.lastWritePacked.Swap(0) != packed {
     300           0 :                         panic("concurrent write operations detected on file")
     301             :                 }
     302             :         }()
     303           2 :         op()
     304             : }
     305             : 
     306             : // Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
     307             : // callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
     308             : // safe because the packing scheme implies we only actually need 32 bits.
     309           2 : func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
     310           2 :         // We have no guarantee of clock monotonicity. If we have a small regression
     311           2 :         // in the clock, we set deltaMillis to zero, so we can still catch the operation
     312           2 :         // if happens to be slow.
     313           2 :         deltaMillis := delta.Milliseconds()
     314           2 :         if deltaMillis < 0 {
     315           0 :                 deltaMillis = 0
     316           0 :         }
     317             :         // As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
     318             :         // of effective monitoring time before the uint wraps around, at millisecond
     319             :         // precision.
     320           2 :         if deltaMillis > 1<<deltaBits-1 {
     321           1 :                 panic("vfs: last write delta would result in integer wraparound")
     322             :         }
     323             : 
     324             :         // See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
     325           2 :         writeSize := writeSizeInBytes / writeSizePrecision
     326           2 :         // If the size of the write is larger than we can store in the packed int, store the max
     327           2 :         // value we can store in the packed int.
     328           2 :         const writeSizeCeiling = 1<<writeSizeBits - 1
     329           2 :         if writeSize > writeSizeCeiling {
     330           1 :                 writeSize = writeSizeCeiling
     331           1 :         }
     332             : 
     333           2 :         return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
     334             : }
     335             : 
     336           1 : func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
     337           1 :         delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
     338           1 :         wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
     339           1 :         // Given the packing scheme, converting wz to an int will not truncate anything.
     340           1 :         writeSizeInBytes = int(wz)
     341           1 :         opType = OpType(packed & 0xf)
     342           1 :         return delta, writeSizeInBytes, opType
     343           1 : }
     344             : 
     345             : // diskHealthCheckingDir implements disk-health checking for directories. Unlike
     346             : // other files, we allow directories to receive concurrent write operations
     347             : // (Syncs are the only write operations supported by a directory.) Since the
     348             : // diskHealthCheckingFile's timeDiskOp can only track a single in-flight
     349             : // operation at a time, we time the operation using the filesystem-level
     350             : // timeFilesystemOp function instead.
     351             : type diskHealthCheckingDir struct {
     352             :         File
     353             :         name string
     354             :         fs   *diskHealthCheckingFS
     355             : }
     356             : 
     357             : // Sync implements the io.Syncer interface.
     358           2 : func (d *diskHealthCheckingDir) Sync() (err error) {
     359           2 :         d.fs.timeFilesystemOp(d.name, OpTypeSync, func() {
     360           2 :                 err = d.File.Sync()
     361           2 :         }, time.Now().UnixNano())
     362           2 :         return err
     363             : }
     364             : 
     365             : // DiskSlowInfo captures info about detected slow operations on the vfs.
     366             : type DiskSlowInfo struct {
     367             :         // Path of file being written to.
     368             :         Path string
     369             :         // Operation being performed on the file.
     370             :         OpType OpType
     371             :         // Size of write in bytes, if the write is sized.
     372             :         WriteSize int
     373             :         // Duration that has elapsed since this disk operation started.
     374             :         Duration time.Duration
     375             : }
     376             : 
     377           0 : func (i DiskSlowInfo) String() string {
     378           0 :         return redact.StringWithoutMarkers(i)
     379           0 : }
     380             : 
     381             : // SafeFormat implements redact.SafeFormatter.
     382           0 : func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     383           0 :         switch i.OpType {
     384             :         // Operations for which i.WriteSize is meaningful.
     385           0 :         case OpTypeWrite, OpTypeSyncTo, OpTypePreallocate:
     386           0 :                 w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
     387           0 :                         redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
     388           0 :                         redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds()))
     389           0 :         default:
     390           0 :                 w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
     391           0 :                         redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
     392           0 :                         redact.Safe(i.Duration.Seconds()))
     393             :         }
     394             : }
     395             : 
     396             : // diskHealthCheckingFS adds disk-health checking facilities to a VFS.
     397             : // It times disk write operations in two ways:
     398             : //
     399             : // 1. Wrapping vfs.Files.
     400             : //
     401             : // The bulk of write I/O activity is file writing and syncing, invoked through
     402             : // the `vfs.File` interface. This VFS wraps all files open for writing with a
     403             : // special diskHealthCheckingFile implementation of the vfs.File interface. See
     404             : // above for the implementation.
     405             : //
     406             : // 2. Monitoring filesystem metadata operations.
     407             : //
     408             : // Filesystem metadata operations (create, link, remove, rename, etc) are also
     409             : // sources of disk writes. Unlike a vfs.File which requires Write and Sync calls
     410             : // to be sequential, a vfs.FS may receive these filesystem metadata operations
     411             : // in parallel. To accommodate this parallelism, the diskHealthCheckingFS's
     412             : // write-oriented filesystem operations record their start times into a 'slot'
     413             : // on the filesystem. A single long-running goroutine periodically scans the
     414             : // slots looking for slow operations.
     415             : //
     416             : // The number of slots on a diskHealthCheckingFS grows to a working set of the
     417             : // maximum concurrent filesystem operations. This is expected to be very few
     418             : // for these reasons:
     419             : //  1. Pebble has limited write concurrency. Flushes, compactions and WAL
     420             : //     rotations are the primary sources of filesystem metadata operations. With
     421             : //     the default max-compaction concurrency, these operations require at most 5
     422             : //     concurrent slots if all 5 perform a filesystem metadata operation
     423             : //     simultaneously.
     424             : //  2. Pebble's limited concurrent I/O writers spend most of their time
     425             : //     performing file I/O, not performing the filesystem metadata operations that
     426             : //     require recording a slot on the diskHealthCheckingFS.
     427             : //  3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS
     428             : //     which provides a separate goroutine and set of slots.
     429             : //  4. In CockroachDB, many of the additional sources of filesystem metadata
     430             : //     operations (like encryption-at-rest) are sequential with respect to Pebble's
     431             : //     threads.
     432             : type diskHealthCheckingFS struct {
     433             :         tickInterval      time.Duration
     434             :         diskSlowThreshold time.Duration
     435             :         onSlowDisk        func(DiskSlowInfo)
     436             :         fs                FS
     437             :         mu                struct {
     438             :                 sync.Mutex
     439             :                 tickerRunning bool
     440             :                 stopper       chan struct{}
     441             :                 inflight      []*slot
     442             :         }
     443             :         // prealloc preallocates the memory for mu.inflight slots and the slice
     444             :         // itself. The contained fields are not accessed directly except by
     445             :         // WithDiskHealthChecks when initializing mu.inflight. The number of slots
     446             :         // in d.mu.inflight will grow to the maximum number of concurrent file
     447             :         // metadata operations (create, remove, link, etc). If the number of
     448             :         // concurrent operations never exceeds preallocatedSlotCount, we'll never
     449             :         // incur an additional allocation.
     450             :         prealloc struct {
     451             :                 slots        [preallocatedSlotCount]slot
     452             :                 slotPtrSlice [preallocatedSlotCount]*slot
     453             :         }
     454             : }
     455             : 
     456             : type slot struct {
     457             :         name       string
     458             :         opType     OpType
     459             :         startNanos atomic.Int64
     460             : }
     461             : 
     462             : // diskHealthCheckingFS implements FS.
     463             : var _ FS = (*diskHealthCheckingFS)(nil)
     464             : 
     465             : // WithDiskHealthChecks wraps an FS and ensures that all write-oriented
     466             : // operations on the FS are wrapped with disk health detection checks. Disk
     467             : // operations that are observed to take longer than diskSlowThreshold trigger an
     468             : // onSlowDisk call.
     469             : //
     470             : // A threshold of zero disables disk-health checking.
     471             : func WithDiskHealthChecks(
     472             :         innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(info DiskSlowInfo),
     473           2 : ) (FS, io.Closer) {
     474           2 :         if diskSlowThreshold == 0 {
     475           0 :                 return innerFS, noopCloser{}
     476           0 :         }
     477             : 
     478           2 :         fs := &diskHealthCheckingFS{
     479           2 :                 fs:                innerFS,
     480           2 :                 tickInterval:      defaultTickInterval,
     481           2 :                 diskSlowThreshold: diskSlowThreshold,
     482           2 :                 onSlowDisk:        onSlowDisk,
     483           2 :         }
     484           2 :         fs.mu.stopper = make(chan struct{})
     485           2 :         // The fs holds preallocated slots and a preallocated array of slot pointers
     486           2 :         // with equal length. Initialize the inflight slice to use a slice backed by
     487           2 :         // the preallocated array with each slot initialized to a preallocated slot.
     488           2 :         fs.mu.inflight = fs.prealloc.slotPtrSlice[:]
     489           2 :         for i := range fs.mu.inflight {
     490           2 :                 fs.mu.inflight[i] = &fs.prealloc.slots[i]
     491           2 :         }
     492           2 :         return fs, fs
     493             : }
     494             : 
     495             : // timeFilesystemOp executes the provided closure, which should perform a
     496             : // singular filesystem operation of a type matching opType on the named file. It
     497             : // records the provided start time such that the long-lived disk-health checking
     498             : // goroutine can observe if the operation is blocked for an inordinate time.
     499             : //
     500             : // The start time is taken as a parameter in the form of nanoseconds since the
     501             : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
     502             : // is set appropriately), aiding postmortem debugging.
     503             : func (d *diskHealthCheckingFS) timeFilesystemOp(
     504             :         name string, opType OpType, op func(), startNanos int64,
     505           2 : ) {
     506           2 :         if d == nil {
     507           0 :                 op()
     508           0 :                 return
     509           0 :         }
     510             : 
     511             :         // Record this operation's start time on the FS, so that the long-running
     512             :         // goroutine can monitor the filesystem operation.
     513             :         //
     514             :         // The diskHealthCheckingFile implementation uses a single field that is
     515             :         // atomically updated, taking advantage of the fact that writes to a single
     516             :         // vfs.File handle are not performed in parallel. The vfs.FS however may
     517             :         // receive write filesystem operations in parallel. To accommodate this
     518             :         // parallelism, writing goroutines append their start time to a
     519             :         // mutex-protected vector. On ticks, the long-running goroutine scans the
     520             :         // vector searching for start times older than the slow-disk threshold. When
     521             :         // a writing goroutine completes its operation, it atomically overwrites its
     522             :         // slot to signal completion.
     523           2 :         var s *slot
     524           2 :         func() {
     525           2 :                 d.mu.Lock()
     526           2 :                 defer d.mu.Unlock()
     527           2 : 
     528           2 :                 // If there's no long-running goroutine to monitor this filesystem
     529           2 :                 // operation, start one.
     530           2 :                 if !d.mu.tickerRunning {
     531           2 :                         d.startTickerLocked()
     532           2 :                 }
     533             : 
     534           2 :                 for i := 0; i < len(d.mu.inflight); i++ {
     535           2 :                         if d.mu.inflight[i].startNanos.Load() == 0 {
     536           2 :                                 // This slot is not in use. Claim it.
     537           2 :                                 s = d.mu.inflight[i]
     538           2 :                                 s.name = name
     539           2 :                                 s.opType = opType
     540           2 :                                 s.startNanos.Store(startNanos)
     541           2 :                                 break
     542             :                         }
     543             :                 }
     544             :                 // If we didn't find any unused slots, create a new slot and append it.
     545             :                 // This slot will exist forever. The number of slots will grow to the
     546             :                 // maximum number of concurrent filesystem operations over the lifetime
     547             :                 // of the process. Only operations that grow the number of slots must
     548             :                 // incur an allocation.
     549           2 :                 if s == nil {
     550           0 :                         s = &slot{
     551           0 :                                 name:   name,
     552           0 :                                 opType: opType,
     553           0 :                         }
     554           0 :                         s.startNanos.Store(startNanos)
     555           0 :                         d.mu.inflight = append(d.mu.inflight, s)
     556           0 :                 }
     557             :         }()
     558             : 
     559           2 :         op()
     560           2 : 
     561           2 :         // Signal completion by zeroing the start time.
     562           2 :         s.startNanos.Store(0)
     563             : }
     564             : 
     565             : // startTickerLocked starts a new goroutine with a ticker to monitor disk
     566             : // filesystem operations. Requires d.mu and !d.mu.tickerRunning.
     567           2 : func (d *diskHealthCheckingFS) startTickerLocked() {
     568           2 :         d.mu.tickerRunning = true
     569           2 :         stopper := d.mu.stopper
     570           2 :         go func() {
     571           2 :                 ticker := time.NewTicker(d.tickInterval)
     572           2 :                 defer ticker.Stop()
     573           2 :                 type exceededSlot struct {
     574           2 :                         name       string
     575           2 :                         opType     OpType
     576           2 :                         startNanos int64
     577           2 :                 }
     578           2 :                 var exceededSlots []exceededSlot
     579           2 : 
     580           2 :                 for {
     581           2 :                         select {
     582           2 :                         case <-ticker.C:
     583           2 :                                 // Scan the inflight slots for any slots recording a start
     584           2 :                                 // time older than the diskSlowThreshold.
     585           2 :                                 exceededSlots = exceededSlots[:0]
     586           2 :                                 d.mu.Lock()
     587           2 :                                 now := time.Now()
     588           2 :                                 for i := range d.mu.inflight {
     589           2 :                                         nanos := d.mu.inflight[i].startNanos.Load()
     590           2 :                                         if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) {
     591           1 :                                                 // diskSlowThreshold was exceeded. Copy this inflightOp into
     592           1 :                                                 // exceededSlots and call d.onSlowDisk after dropping the mutex.
     593           1 :                                                 inflightOp := exceededSlot{
     594           1 :                                                         name:       d.mu.inflight[i].name,
     595           1 :                                                         opType:     d.mu.inflight[i].opType,
     596           1 :                                                         startNanos: nanos,
     597           1 :                                                 }
     598           1 :                                                 exceededSlots = append(exceededSlots, inflightOp)
     599           1 :                                         }
     600             :                                 }
     601           2 :                                 d.mu.Unlock()
     602           2 :                                 for i := range exceededSlots {
     603           1 :                                         d.onSlowDisk(
     604           1 :                                                 DiskSlowInfo{
     605           1 :                                                         Path:      exceededSlots[i].name,
     606           1 :                                                         OpType:    exceededSlots[i].opType,
     607           1 :                                                         WriteSize: 0, // writes at the fs level are not sized
     608           1 :                                                         Duration:  now.Sub(time.Unix(0, exceededSlots[i].startNanos)),
     609           1 :                                                 })
     610           1 :                                 }
     611           2 :                         case <-stopper:
     612           2 :                                 return
     613             :                         }
     614             :                 }
     615             :         }()
     616             : }
     617             : 
     618             : // Close implements io.Closer. Close stops the long-running goroutine that
     619             : // monitors for slow filesystem metadata operations. Close may be called
     620             : // multiple times. If the filesystem is used after Close has been called, a new
     621             : // long-running goroutine will be created.
     622           2 : func (d *diskHealthCheckingFS) Close() error {
     623           2 :         d.mu.Lock()
     624           2 :         if !d.mu.tickerRunning {
     625           2 :                 // Nothing to stop.
     626           2 :                 d.mu.Unlock()
     627           2 :                 return nil
     628           2 :         }
     629             : 
     630             :         // Grab the stopper so we can request the long-running goroutine to stop.
     631             :         // Replace the stopper in case this FS is reused. It's possible to Close and
     632             :         // reuse a disk-health checking FS. This is to accommodate the on-by-default
     633             :         // behavior in Pebble, and the possibility that users may continue to use
     634             :         // the Pebble default FS beyond the lifetime of a single DB.
     635           2 :         stopper := d.mu.stopper
     636           2 :         d.mu.stopper = make(chan struct{})
     637           2 :         d.mu.tickerRunning = false
     638           2 :         d.mu.Unlock()
     639           2 : 
     640           2 :         // Ask the long-running goroutine to stop. This is a synchronous channel
     641           2 :         // send.
     642           2 :         stopper <- struct{}{}
     643           2 :         close(stopper)
     644           2 :         return nil
     645             : }
     646             : 
     647             : // Create implements the FS interface.
     648           2 : func (d *diskHealthCheckingFS) Create(name string) (File, error) {
     649           2 :         var f File
     650           2 :         var err error
     651           2 :         d.timeFilesystemOp(name, OpTypeCreate, func() {
     652           2 :                 f, err = d.fs.Create(name)
     653           2 :         }, time.Now().UnixNano())
     654           2 :         if err != nil {
     655           1 :                 return f, err
     656           1 :         }
     657           2 :         if d.diskSlowThreshold == 0 {
     658           0 :                 return f, nil
     659           0 :         }
     660           2 :         checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
     661           1 :                 d.onSlowDisk(
     662           1 :                         DiskSlowInfo{
     663           1 :                                 Path:      name,
     664           1 :                                 OpType:    opType,
     665           1 :                                 WriteSize: writeSizeInBytes,
     666           1 :                                 Duration:  duration,
     667           1 :                         })
     668           1 :         })
     669           2 :         checkingFile.startTicker()
     670           2 :         return checkingFile, nil
     671             : }
     672             : 
     673             : // GetDiskUsage implements the FS interface.
     674           2 : func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
     675           2 :         return d.fs.GetDiskUsage(path)
     676           2 : }
     677             : 
     678             : // Link implements the FS interface.
     679           2 : func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
     680           2 :         var err error
     681           2 :         d.timeFilesystemOp(newname, OpTypeLink, func() {
     682           2 :                 err = d.fs.Link(oldname, newname)
     683           2 :         }, time.Now().UnixNano())
     684           2 :         return err
     685             : }
     686             : 
     687             : // List implements the FS interface.
     688           2 : func (d *diskHealthCheckingFS) List(dir string) ([]string, error) {
     689           2 :         return d.fs.List(dir)
     690           2 : }
     691             : 
     692             : // Lock implements the FS interface.
     693           2 : func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
     694           2 :         return d.fs.Lock(name)
     695           2 : }
     696             : 
     697             : // MkdirAll implements the FS interface.
     698           2 : func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
     699           2 :         var err error
     700           2 :         d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
     701           2 :                 err = d.fs.MkdirAll(dir, perm)
     702           2 :         }, time.Now().UnixNano())
     703           2 :         return err
     704             : }
     705             : 
     706             : // Open implements the FS interface.
     707           2 : func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) {
     708           2 :         return d.fs.Open(name, opts...)
     709           2 : }
     710             : 
     711             : // OpenReadWrite implements the FS interface.
     712           1 : func (d *diskHealthCheckingFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) {
     713           1 :         return d.fs.OpenReadWrite(name, opts...)
     714           1 : }
     715             : 
     716             : // OpenDir implements the FS interface.
     717           2 : func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
     718           2 :         f, err := d.fs.OpenDir(name)
     719           2 :         if err != nil {
     720           0 :                 return f, err
     721           0 :         }
     722             :         // Directories opened with OpenDir must be opened with health checking,
     723             :         // because they may be explicitly synced.
     724           2 :         return &diskHealthCheckingDir{
     725           2 :                 File: f,
     726           2 :                 name: name,
     727           2 :                 fs:   d,
     728           2 :         }, nil
     729             : }
     730             : 
     731             : // PathBase implements the FS interface.
     732           2 : func (d *diskHealthCheckingFS) PathBase(path string) string {
     733           2 :         return d.fs.PathBase(path)
     734           2 : }
     735             : 
     736             : // PathJoin implements the FS interface.
     737           2 : func (d *diskHealthCheckingFS) PathJoin(elem ...string) string {
     738           2 :         return d.fs.PathJoin(elem...)
     739           2 : }
     740             : 
     741             : // PathDir implements the FS interface.
     742           2 : func (d *diskHealthCheckingFS) PathDir(path string) string {
     743           2 :         return d.fs.PathDir(path)
     744           2 : }
     745             : 
     746             : // Remove implements the FS interface.
     747           2 : func (d *diskHealthCheckingFS) Remove(name string) error {
     748           2 :         var err error
     749           2 :         d.timeFilesystemOp(name, OpTypeRemove, func() {
     750           2 :                 err = d.fs.Remove(name)
     751           2 :         }, time.Now().UnixNano())
     752           2 :         return err
     753             : }
     754             : 
     755             : // RemoveAll implements the FS interface.
     756           1 : func (d *diskHealthCheckingFS) RemoveAll(name string) error {
     757           1 :         var err error
     758           1 :         d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
     759           1 :                 err = d.fs.RemoveAll(name)
     760           1 :         }, time.Now().UnixNano())
     761           1 :         return err
     762             : }
     763             : 
     764             : // Rename implements the FS interface.
     765           2 : func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
     766           2 :         var err error
     767           2 :         d.timeFilesystemOp(newname, OpTypeRename, func() {
     768           2 :                 err = d.fs.Rename(oldname, newname)
     769           2 :         }, time.Now().UnixNano())
     770           2 :         return err
     771             : }
     772             : 
     773             : // ReuseForWrite implements the FS interface.
     774           1 : func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) {
     775           1 :         var f File
     776           1 :         var err error
     777           1 :         d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
     778           1 :                 f, err = d.fs.ReuseForWrite(oldname, newname)
     779           1 :         }, time.Now().UnixNano())
     780           1 :         if err != nil {
     781           1 :                 return f, err
     782           1 :         }
     783           1 :         if d.diskSlowThreshold == 0 {
     784           0 :                 return f, nil
     785           0 :         }
     786           1 :         checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
     787           0 :                 d.onSlowDisk(
     788           0 :                         DiskSlowInfo{
     789           0 :                                 Path:      newname,
     790           0 :                                 OpType:    opType,
     791           0 :                                 WriteSize: writeSizeInBytes,
     792           0 :                                 Duration:  duration,
     793           0 :                         })
     794           0 :         })
     795           1 :         checkingFile.startTicker()
     796           1 :         return checkingFile, nil
     797             : }
     798             : 
     799             : // Stat implements the FS interface.
     800           2 : func (d *diskHealthCheckingFS) Stat(name string) (os.FileInfo, error) {
     801           2 :         return d.fs.Stat(name)
     802           2 : }
     803             : 
     804             : type noopCloser struct{}
     805             : 
     806           0 : func (noopCloser) Close() error { return nil }

Generated by: LCOV version 1.14