LCOV - code coverage report
Current view: top level - pebble/vfs - disk_health.go (source / functions) Hit Total Coverage
Test: 2024-11-22 08:17Z 3ec779d3 - tests only.lcov Lines: 373 452 82.5 %
Date: 2024-11-22 08:17:34 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package vfs
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "fmt"
      10             :         "io"
      11             :         "os"
      12             :         "path/filepath"
      13             :         "slices"
      14             :         "sync"
      15             :         "sync/atomic"
      16             :         "time"
      17             : 
      18             :         "github.com/cockroachdb/crlib/crtime"
      19             :         "github.com/cockroachdb/redact"
      20             : )
      21             : 
      22             : const (
      23             :         // preallocatedSlotCount is the default number of slots available for
      24             :         // concurrent filesystem operations. The slot count may be exceeded, but
      25             :         // each additional slot will incur an additional allocation. We choose 16
      26             :         // here with the expectation that it is significantly more than required in
      27             :         // practice. See the comment above the diskHealthCheckingFS type definition.
      28             :         preallocatedSlotCount = 16
      29             :         // deltaBits is the number of bits in the packed 64-bit integer used for
      30             :         // identifying a delta from the file creation time in milliseconds.
      31             :         deltaBits = 40
      32             :         // writeSizeBits is the number of bits in the packed 64-bit integer used for
      33             :         // identifying the size of the write operation, if the operation is sized. See
      34             :         // writeSizePrecision below for precision of size.
      35             :         writeSizeBits = 20
      36             :         // Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
      37             :         writeSizePrecision = 1024
      38             : )
      39             : 
      40             : // Variables to enable testing.
      41             : var (
      42             :         // defaultTickInterval is the default interval between two ticks of each
      43             :         // diskHealthCheckingFile loop iteration.
      44             :         defaultTickInterval = 2 * time.Second
      45             : )
      46             : 
      47             : // OpType is the type of IO operation being monitored by a
      48             : // diskHealthCheckingFile.
      49             : type OpType uint8
      50             : 
      51             : // The following OpTypes is limited to the subset of file system operations that
      52             : // a diskHealthCheckingFile supports (namely writes and syncs).
      53             : const (
      54             :         OpTypeUnknown OpType = iota
      55             :         OpTypeWrite
      56             :         OpTypeSync
      57             :         OpTypeSyncData
      58             :         OpTypeSyncTo
      59             :         OpTypeCreate
      60             :         OpTypeLink
      61             :         OpTypeMkdirAll
      62             :         OpTypePreallocate
      63             :         OpTypeRemove
      64             :         OpTypeRemoveAll
      65             :         OpTypeRename
      66             :         OpTypeReuseForWrite
      67             :         // Note: opTypeMax is just used in tests. It must appear last in the list
      68             :         // of OpTypes.
      69             :         opTypeMax
      70             : )
      71             : 
      72             : // String implements fmt.Stringer.
      73           1 : func (o OpType) String() string {
      74           1 :         switch o {
      75           1 :         case OpTypeWrite:
      76           1 :                 return "write"
      77           1 :         case OpTypeSync:
      78           1 :                 return "sync"
      79           1 :         case OpTypeSyncData:
      80           1 :                 return "syncdata"
      81           1 :         case OpTypeSyncTo:
      82           1 :                 return "syncto"
      83           1 :         case OpTypeCreate:
      84           1 :                 return "create"
      85           1 :         case OpTypeLink:
      86           1 :                 return "link"
      87           1 :         case OpTypeMkdirAll:
      88           1 :                 return "mkdirall"
      89           1 :         case OpTypePreallocate:
      90           1 :                 return "preallocate"
      91           1 :         case OpTypeRemove:
      92           1 :                 return "remove"
      93           1 :         case OpTypeRemoveAll:
      94           1 :                 return "removall"
      95           1 :         case OpTypeRename:
      96           1 :                 return "rename"
      97           1 :         case OpTypeReuseForWrite:
      98           1 :                 return "reuseforwrite"
      99           1 :         case OpTypeUnknown:
     100           1 :                 return "unknown"
     101           0 :         default:
     102           0 :                 panic(fmt.Sprintf("vfs: unknown op type: %d", o))
     103             :         }
     104             : }
     105             : 
     106             : // DiskWriteCategory is a user-understandable string used to identify and aggregate
     107             : // stats for disk writes. The prefix "pebble-" is reserved for internal Pebble categories.
     108             : //
     109             : // Some examples include, pebble-wal, pebble-memtable-flush, pebble-manifest and in the
     110             : // Cockroach context includes, sql-row-spill, range-snapshot, crdb-log.
     111             : type DiskWriteCategory string
     112             : 
     113             : // WriteCategoryUnspecified denotes a disk write without a significant category.
     114             : const WriteCategoryUnspecified DiskWriteCategory = "unspecified"
     115             : 
     116             : // DiskWriteStatsAggregate is an aggregate of the bytes written to disk for a given category.
     117             : type DiskWriteStatsAggregate struct {
     118             :         Category     DiskWriteCategory
     119             :         BytesWritten uint64
     120             : }
     121             : 
     122             : // DiskWriteStatsCollector collects and aggregates disk write metrics per category.
     123             : type DiskWriteStatsCollector struct {
     124             :         mu       sync.Mutex
     125             :         statsMap map[DiskWriteCategory]*atomic.Uint64
     126             : }
     127             : 
     128             : // NewDiskWriteStatsCollector instantiates a new DiskWriteStatsCollector.
     129           1 : func NewDiskWriteStatsCollector() *DiskWriteStatsCollector {
     130           1 :         return &DiskWriteStatsCollector{
     131           1 :                 statsMap: make(map[DiskWriteCategory]*atomic.Uint64),
     132           1 :         }
     133           1 : }
     134             : 
     135             : // CreateStat inserts a new category to the statsMap if it doesn't already exist, otherwise
     136             : // it returns a pointer to the current stats.
     137           1 : func (d *DiskWriteStatsCollector) CreateStat(category DiskWriteCategory) *atomic.Uint64 {
     138           1 :         var bytesWritten *atomic.Uint64
     139           1 :         d.mu.Lock()
     140           1 :         defer d.mu.Unlock()
     141           1 :         if aggStats, ok := d.statsMap[category]; !ok {
     142           1 :                 bytesWritten = new(atomic.Uint64)
     143           1 :                 d.statsMap[category] = bytesWritten
     144           1 :         } else {
     145           0 :                 bytesWritten = aggStats
     146           0 :         }
     147           1 :         return bytesWritten
     148             : }
     149             : 
     150             : // GetStats returns the aggregated metrics for all categories.
     151           1 : func (d *DiskWriteStatsCollector) GetStats() []DiskWriteStatsAggregate {
     152           1 :         var stats []DiskWriteStatsAggregate
     153           1 :         d.mu.Lock()
     154           1 :         for category, numBytes := range d.statsMap {
     155           1 :                 stats = append(stats, DiskWriteStatsAggregate{
     156           1 :                         Category:     category,
     157           1 :                         BytesWritten: numBytes.Load(),
     158           1 :                 })
     159           1 :         }
     160           1 :         d.mu.Unlock()
     161           1 :         slices.SortFunc(stats, func(a, b DiskWriteStatsAggregate) int { return cmp.Compare(a.Category, b.Category) })
     162           1 :         return stats
     163             : }
     164             : 
     165             : // diskHealthCheckingFile is a File wrapper to collect disk write stats,
     166             : // detect slow disk operations, and call onSlowDisk if a disk operation
     167             : // is seen to exceed diskSlowThreshold.
     168             : //
     169             : // This struct creates a goroutine (in startTicker()) that, at every tick
     170             : // interval, sees if there's a disk operation taking longer than the specified
     171             : // duration. This setup is preferable to creating a new timer at every disk
     172             : // operation, as it reduces overhead per disk operation.
     173             : type diskHealthCheckingFile struct {
     174             :         file              File
     175             :         onSlowDisk        func(opType OpType, writeSizeInBytes int, duration time.Duration)
     176             :         diskSlowThreshold time.Duration
     177             :         tickInterval      time.Duration
     178             : 
     179             :         stopper chan struct{}
     180             :         // lastWritePacked is a 64-bit unsigned int. The most significant
     181             :         // 40 bits represent an delta (in milliseconds) from the creation
     182             :         // time of the diskHealthCheckingFile. The next most significant 20 bits
     183             :         // represent the size of the write in KBs, if the write has a size. (If
     184             :         // it doesn't, the 20 bits are zeroed). The least significant four bits
     185             :         // contains the OpType.
     186             :         //
     187             :         // The use of 40 bits for an delta provides ~34 years of effective
     188             :         // monitoring time before the uint wraps around, at millisecond precision.
     189             :         // ~34 years of process uptime "ought to be enough for anybody". Millisecond
     190             :         // writeSizePrecision is sufficient, given that we are monitoring for writes that take
     191             :         // longer than one millisecond.
     192             :         //
     193             :         // The use of 20 bits for the size in KBs allows representing sizes up
     194             :         // to nearly one GB. If the write is larger than that, we round down to ~one GB.
     195             :         //
     196             :         // The use of four bits for OpType allows for 16 operation types.
     197             :         //
     198             :         // NB: this packing scheme is not persisted, and is therefore safe to adjust
     199             :         // across process boundaries.
     200             :         lastWritePacked atomic.Uint64
     201             :         createTime      crtime.Mono
     202             : 
     203             :         // aggBytesWritten points to an atomic that aggregates the bytes written for files
     204             :         // that belong to a specific DiskWriteCategory. This pointer is also stored in the
     205             :         // DiskWriteStatsCollector for metric collection.
     206             :         aggBytesWritten *atomic.Uint64
     207             :         category        DiskWriteCategory
     208             : }
     209             : 
     210             : // diskHealthCheckingFile implements File.
     211             : var _ File = (*diskHealthCheckingFile)(nil)
     212             : 
     213             : // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
     214             : // specified time threshold and event listener.
     215             : func newDiskHealthCheckingFile(
     216             :         file File,
     217             :         diskSlowThreshold time.Duration,
     218             :         category DiskWriteCategory,
     219             :         statsCollector *DiskWriteStatsCollector,
     220             :         onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
     221           1 : ) *diskHealthCheckingFile {
     222           1 :         var bytesWritten *atomic.Uint64
     223           1 :         if statsCollector != nil {
     224           1 :                 bytesWritten = statsCollector.CreateStat(category)
     225           1 :         } else {
     226           1 :                 bytesWritten = new(atomic.Uint64)
     227           1 :         }
     228           1 :         return &diskHealthCheckingFile{
     229           1 :                 file:              file,
     230           1 :                 onSlowDisk:        onSlowDisk,
     231           1 :                 diskSlowThreshold: diskSlowThreshold,
     232           1 :                 tickInterval:      defaultTickInterval,
     233           1 : 
     234           1 :                 stopper:    make(chan struct{}),
     235           1 :                 createTime: crtime.NowMono(),
     236           1 : 
     237           1 :                 aggBytesWritten: bytesWritten,
     238           1 :                 category:        category,
     239           1 :         }
     240             : }
     241             : 
     242             : // startTicker starts a new goroutine with a ticker to monitor disk operations.
     243             : // Can only be called if the ticker goroutine isn't running already.
     244           1 : func (d *diskHealthCheckingFile) startTicker() {
     245           1 :         if d.diskSlowThreshold == 0 {
     246           0 :                 return
     247           0 :         }
     248             : 
     249           1 :         go func() {
     250           1 :                 ticker := time.NewTicker(d.tickInterval)
     251           1 :                 defer ticker.Stop()
     252           1 : 
     253           1 :                 for {
     254           1 :                         select {
     255           1 :                         case <-d.stopper:
     256           1 :                                 return
     257             : 
     258           1 :                         case <-ticker.C:
     259           1 :                                 packed := d.lastWritePacked.Load()
     260           1 :                                 if packed == 0 {
     261           1 :                                         continue
     262             :                                 }
     263           1 :                                 delta, writeSize, op := unpack(packed)
     264           1 :                                 now := crtime.NowMono()
     265           1 :                                 // The last write started at d.createTime + delta and ended now.
     266           1 :                                 lastWriteDuration := now.Sub(d.createTime) - delta
     267           1 :                                 if lastWriteDuration > d.diskSlowThreshold {
     268           1 :                                         // diskSlowThreshold was exceeded. Call the passed-in
     269           1 :                                         // listener.
     270           1 :                                         d.onSlowDisk(op, writeSize, lastWriteDuration)
     271           1 :                                 }
     272             :                         }
     273             :                 }
     274             :         }()
     275             : }
     276             : 
     277             : // stopTicker stops the goroutine started in startTicker.
     278           1 : func (d *diskHealthCheckingFile) stopTicker() {
     279           1 :         close(d.stopper)
     280           1 : }
     281             : 
     282             : // Fd implements (vfs.File).Fd.
     283           1 : func (d *diskHealthCheckingFile) Fd() uintptr {
     284           1 :         return d.file.Fd()
     285           1 : }
     286             : 
     287             : // Read implements (vfs.File).Read
     288           0 : func (d *diskHealthCheckingFile) Read(p []byte) (int, error) {
     289           0 :         return d.file.Read(p)
     290           0 : }
     291             : 
     292             : // ReadAt implements (vfs.File).ReadAt
     293           0 : func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {
     294           0 :         return d.file.ReadAt(p, off)
     295           0 : }
     296             : 
     297             : // Write implements the io.Writer interface.
     298           1 : func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
     299           1 :         d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
     300           1 :                 n, err = d.file.Write(p)
     301           1 :         }, crtime.NowMono())
     302           1 :         d.aggBytesWritten.Add(uint64(n))
     303           1 :         return n, err
     304             : }
     305             : 
     306             : // WriteAt implements the io.WriterAt interface.
     307           0 : func (d *diskHealthCheckingFile) WriteAt(p []byte, ofs int64) (n int, err error) {
     308           0 :         d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
     309           0 :                 n, err = d.file.WriteAt(p, ofs)
     310           0 :         }, crtime.NowMono())
     311           0 :         d.aggBytesWritten.Add(uint64(n))
     312           0 :         return n, err
     313             : }
     314             : 
     315             : // Close implements the io.Closer interface.
     316           1 : func (d *diskHealthCheckingFile) Close() error {
     317           1 :         d.stopTicker()
     318           1 :         return d.file.Close()
     319           1 : }
     320             : 
     321             : // Prefetch implements (vfs.File).Prefetch.
     322           0 : func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
     323           0 :         return d.file.Prefetch(offset, length)
     324           0 : }
     325             : 
     326             : // Preallocate implements (vfs.File).Preallocate.
     327           1 : func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
     328           1 :         d.timeDiskOp(OpTypePreallocate, n, func() {
     329           1 :                 err = d.file.Preallocate(off, n)
     330           1 :         }, crtime.NowMono())
     331           1 :         return err
     332             : }
     333             : 
     334             : // Stat implements (vfs.File).Stat.
     335           1 : func (d *diskHealthCheckingFile) Stat() (FileInfo, error) {
     336           1 :         return d.file.Stat()
     337           1 : }
     338             : 
     339             : // Sync implements the io.Syncer interface.
     340           1 : func (d *diskHealthCheckingFile) Sync() (err error) {
     341           1 :         d.timeDiskOp(OpTypeSync, 0, func() {
     342           1 :                 err = d.file.Sync()
     343           1 :         }, crtime.NowMono())
     344           1 :         return err
     345             : }
     346             : 
     347             : // SyncData implements (vfs.File).SyncData.
     348           1 : func (d *diskHealthCheckingFile) SyncData() (err error) {
     349           1 :         d.timeDiskOp(OpTypeSyncData, 0, func() {
     350           1 :                 err = d.file.SyncData()
     351           1 :         }, crtime.NowMono())
     352           1 :         return err
     353             : }
     354             : 
     355             : // SyncTo implements (vfs.File).SyncTo.
     356           0 : func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
     357           0 :         d.timeDiskOp(OpTypeSyncTo, length, func() {
     358           0 :                 fullSync, err = d.file.SyncTo(length)
     359           0 :         }, crtime.NowMono())
     360           0 :         return fullSync, err
     361             : }
     362             : 
     363             : // timeDiskOp runs the specified closure and makes its timing visible to the
     364             : // monitoring goroutine, in case it exceeds one of the slow disk durations.
     365             : // opType should always be set. writeSizeInBytes should be set if the write
     366             : // operation is sized. If not, it should be set to zero.
     367             : //
     368             : // The start time is taken as a parameter in the form of nanoseconds since the
     369             : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
     370             : // is set appropriately), aiding postmortem debugging.
     371             : func (d *diskHealthCheckingFile) timeDiskOp(
     372             :         opType OpType, writeSizeInBytes int64, op func(), start crtime.Mono,
     373           1 : ) {
     374           1 :         if d == nil || d.diskSlowThreshold == 0 {
     375           0 :                 op()
     376           0 :                 return
     377           0 :         }
     378             : 
     379           1 :         delta := start.Sub(d.createTime)
     380           1 :         packed := pack(delta, writeSizeInBytes, opType)
     381           1 :         if d.lastWritePacked.Swap(packed) != 0 {
     382           0 :                 panic("concurrent write operations detected on file")
     383             :         }
     384           1 :         defer func() {
     385           1 :                 if d.lastWritePacked.Swap(0) != packed {
     386           0 :                         panic("concurrent write operations detected on file")
     387             :                 }
     388             :         }()
     389           1 :         op()
     390             : }
     391             : 
     392             : // Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
     393             : // callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
     394             : // safe because the packing scheme implies we only actually need 32 bits.
     395           1 : func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
     396           1 :         // We have no guarantee of clock monotonicity. If we have a small regression
     397           1 :         // in the clock, we set deltaMillis to zero, so we can still catch the operation
     398           1 :         // if happens to be slow.
     399           1 :         deltaMillis := delta.Milliseconds()
     400           1 :         if deltaMillis < 0 {
     401           0 :                 deltaMillis = 0
     402           0 :         }
     403             :         // As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
     404             :         // of effective monitoring time before the uint wraps around, at millisecond
     405             :         // precision.
     406           1 :         if deltaMillis > 1<<deltaBits-1 {
     407           1 :                 panic("vfs: last write delta would result in integer wraparound")
     408             :         }
     409             : 
     410             :         // See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
     411           1 :         writeSize := writeSizeInBytes / writeSizePrecision
     412           1 :         // If the size of the write is larger than we can store in the packed int, store the max
     413           1 :         // value we can store in the packed int.
     414           1 :         const writeSizeCeiling = 1<<writeSizeBits - 1
     415           1 :         if writeSize > writeSizeCeiling {
     416           1 :                 writeSize = writeSizeCeiling
     417           1 :         }
     418             : 
     419           1 :         return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
     420             : }
     421             : 
     422           1 : func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
     423           1 :         delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
     424           1 :         wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
     425           1 :         // Given the packing scheme, converting wz to an int will not truncate anything.
     426           1 :         writeSizeInBytes = int(wz)
     427           1 :         opType = OpType(packed & 0xf)
     428           1 :         return delta, writeSizeInBytes, opType
     429           1 : }
     430             : 
     431             : // diskHealthCheckingDir implements disk-health checking for directories. Unlike
     432             : // other files, we allow directories to receive concurrent write operations
     433             : // (Syncs are the only write operations supported by a directory.) Since the
     434             : // diskHealthCheckingFile's timeDiskOp can only track a single in-flight
     435             : // operation at a time, we time the operation using the filesystem-level
     436             : // timeFilesystemOp function instead.
     437             : type diskHealthCheckingDir struct {
     438             :         File
     439             :         name string
     440             :         fs   *diskHealthCheckingFS
     441             : }
     442             : 
     443             : // Sync implements the io.Syncer interface.
     444           1 : func (d *diskHealthCheckingDir) Sync() (err error) {
     445           1 :         d.fs.timeFilesystemOp(d.name, OpTypeSync, func() {
     446           1 :                 err = d.File.Sync()
     447           1 :         }, crtime.NowMono())
     448           1 :         return err
     449             : }
     450             : 
     451             : // DiskSlowInfo captures info about detected slow operations on the vfs.
     452             : type DiskSlowInfo struct {
     453             :         // Path of file being written to.
     454             :         Path string
     455             :         // Operation being performed on the file.
     456             :         OpType OpType
     457             :         // Size of write in bytes, if the write is sized.
     458             :         WriteSize int
     459             :         // Duration that has elapsed since this disk operation started.
     460             :         Duration time.Duration
     461             : }
     462             : 
     463           0 : func (i DiskSlowInfo) String() string {
     464           0 :         return redact.StringWithoutMarkers(i)
     465           0 : }
     466             : 
     467             : // SafeFormat implements redact.SafeFormatter.
     468           0 : func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     469           0 :         switch i.OpType {
     470             :         // Operations for which i.WriteSize is meaningful.
     471           0 :         case OpTypeWrite, OpTypeSyncTo, OpTypePreallocate:
     472           0 :                 w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
     473           0 :                         redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
     474           0 :                         redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds()))
     475           0 :         default:
     476           0 :                 w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
     477           0 :                         redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
     478           0 :                         redact.Safe(i.Duration.Seconds()))
     479             :         }
     480             : }
     481             : 
     482             : // diskHealthCheckingFS adds disk-health checking facilities to a VFS.
     483             : // It times disk write operations in two ways:
     484             : //
     485             : // 1. Wrapping vfs.Files.
     486             : //
     487             : // The bulk of write I/O activity is file writing and syncing, invoked through
     488             : // the `vfs.File` interface. This VFS wraps all files open for writing with a
     489             : // special diskHealthCheckingFile implementation of the vfs.File interface. See
     490             : // above for the implementation.
     491             : //
     492             : // 2. Monitoring filesystem metadata operations.
     493             : //
     494             : // Filesystem metadata operations (create, link, remove, rename, etc) are also
     495             : // sources of disk writes. Unlike a vfs.File which requires Write and Sync calls
     496             : // to be sequential, a vfs.FS may receive these filesystem metadata operations
     497             : // in parallel. To accommodate this parallelism, the diskHealthCheckingFS's
     498             : // write-oriented filesystem operations record their start times into a 'slot'
     499             : // on the filesystem. A single long-running goroutine periodically scans the
     500             : // slots looking for slow operations.
     501             : //
     502             : // The number of slots on a diskHealthCheckingFS grows to a working set of the
     503             : // maximum concurrent filesystem operations. This is expected to be very few
     504             : // for these reasons:
     505             : //  1. Pebble has limited write concurrency. Flushes, compactions and WAL
     506             : //     rotations are the primary sources of filesystem metadata operations. With
     507             : //     the default max-compaction concurrency, these operations require at most 5
     508             : //     concurrent slots if all 5 perform a filesystem metadata operation
     509             : //     simultaneously.
     510             : //  2. Pebble's limited concurrent I/O writers spend most of their time
     511             : //     performing file I/O, not performing the filesystem metadata operations that
     512             : //     require recording a slot on the diskHealthCheckingFS.
     513             : //  3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS
     514             : //     which provides a separate goroutine and set of slots.
     515             : //  4. In CockroachDB, many of the additional sources of filesystem metadata
     516             : //     operations (like encryption-at-rest) are sequential with respect to Pebble's
     517             : //     threads.
     518             : type diskHealthCheckingFS struct {
     519             :         tickInterval      time.Duration
     520             :         diskSlowThreshold time.Duration
     521             :         statsCollector    *DiskWriteStatsCollector
     522             :         onSlowDisk        func(DiskSlowInfo)
     523             :         fs                FS
     524             :         mu                struct {
     525             :                 sync.Mutex
     526             :                 tickerRunning bool
     527             :                 stopper       chan struct{}
     528             :                 inflight      []*slot
     529             :         }
     530             :         // prealloc preallocates the memory for mu.inflight slots and the slice
     531             :         // itself. The contained fields are not accessed directly except by
     532             :         // WithDiskHealthChecks when initializing mu.inflight. The number of slots
     533             :         // in d.mu.inflight will grow to the maximum number of concurrent file
     534             :         // metadata operations (create, remove, link, etc). If the number of
     535             :         // concurrent operations never exceeds preallocatedSlotCount, we'll never
     536             :         // incur an additional allocation.
     537             :         prealloc struct {
     538             :                 slots        [preallocatedSlotCount]slot
     539             :                 slotPtrSlice [preallocatedSlotCount]*slot
     540             :         }
     541             : }
     542             : 
     543             : type slot struct {
     544             :         name      string
     545             :         opType    OpType
     546             :         startTime crtime.AtomicMono
     547             : }
     548             : 
     549             : // diskHealthCheckingFS implements FS.
     550             : var _ FS = (*diskHealthCheckingFS)(nil)
     551             : 
     552             : // WithDiskHealthChecks wraps an FS and ensures that all write-oriented
     553             : // operations on the FS are wrapped with disk health detection checks and
     554             : // aggregated. Disk operations that are observed to take longer than
     555             : // diskSlowThreshold trigger an onSlowDisk call.
     556             : //
     557             : // A threshold of zero disables disk-health checking.
     558             : func WithDiskHealthChecks(
     559             :         innerFS FS,
     560             :         diskSlowThreshold time.Duration,
     561             :         statsCollector *DiskWriteStatsCollector,
     562             :         onSlowDisk func(info DiskSlowInfo),
     563           1 : ) (FS, io.Closer) {
     564           1 :         if diskSlowThreshold == 0 {
     565           0 :                 return innerFS, noopCloser{}
     566           0 :         }
     567             : 
     568           1 :         fs := &diskHealthCheckingFS{
     569           1 :                 fs:                innerFS,
     570           1 :                 tickInterval:      defaultTickInterval,
     571           1 :                 diskSlowThreshold: diskSlowThreshold,
     572           1 :                 statsCollector:    statsCollector,
     573           1 :                 onSlowDisk:        onSlowDisk,
     574           1 :         }
     575           1 :         fs.mu.stopper = make(chan struct{})
     576           1 :         // The fs holds preallocated slots and a preallocated array of slot pointers
     577           1 :         // with equal length. Initialize the inflight slice to use a slice backed by
     578           1 :         // the preallocated array with each slot initialized to a preallocated slot.
     579           1 :         fs.mu.inflight = fs.prealloc.slotPtrSlice[:]
     580           1 :         for i := range fs.mu.inflight {
     581           1 :                 fs.mu.inflight[i] = &fs.prealloc.slots[i]
     582           1 :         }
     583           1 :         return fs, fs
     584             : }
     585             : 
     586             : // Unwrap is part of the FS interface.
     587           1 : func (d *diskHealthCheckingFS) Unwrap() FS {
     588           1 :         return d.fs
     589           1 : }
     590             : 
     591             : // timeFilesystemOp executes the provided closure, which should perform a
     592             : // singular filesystem operation of a type matching opType on the named file. It
     593             : // records the provided start time such that the long-lived disk-health checking
     594             : // goroutine can observe if the operation is blocked for an inordinate time.
     595             : //
     596             : // The start time is taken as a parameter in the form of nanoseconds since the
     597             : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
     598             : // is set appropriately), aiding postmortem debugging.
     599             : func (d *diskHealthCheckingFS) timeFilesystemOp(
     600             :         name string, opType OpType, op func(), start crtime.Mono,
     601           1 : ) {
     602           1 :         if d == nil {
     603           0 :                 op()
     604           0 :                 return
     605           0 :         }
     606             : 
     607             :         // Record this operation's start time on the FS, so that the long-running
     608             :         // goroutine can monitor the filesystem operation.
     609             :         //
     610             :         // The diskHealthCheckingFile implementation uses a single field that is
     611             :         // atomically updated, taking advantage of the fact that writes to a single
     612             :         // vfs.File handle are not performed in parallel. The vfs.FS however may
     613             :         // receive write filesystem operations in parallel. To accommodate this
     614             :         // parallelism, writing goroutines append their start time to a
     615             :         // mutex-protected vector. On ticks, the long-running goroutine scans the
     616             :         // vector searching for start times older than the slow-disk threshold. When
     617             :         // a writing goroutine completes its operation, it atomically overwrites its
     618             :         // slot to signal completion.
     619           1 :         var s *slot
     620           1 :         func() {
     621           1 :                 d.mu.Lock()
     622           1 :                 defer d.mu.Unlock()
     623           1 : 
     624           1 :                 // If there's no long-running goroutine to monitor this filesystem
     625           1 :                 // operation, start one.
     626           1 :                 if !d.mu.tickerRunning {
     627           1 :                         d.startTickerLocked()
     628           1 :                 }
     629             : 
     630           1 :                 for i := 0; i < len(d.mu.inflight); i++ {
     631           1 :                         if d.mu.inflight[i].startTime.Load() == 0 {
     632           1 :                                 // This slot is not in use. Claim it.
     633           1 :                                 s = d.mu.inflight[i]
     634           1 :                                 s.name = name
     635           1 :                                 s.opType = opType
     636           1 :                                 s.startTime.Store(start)
     637           1 :                                 break
     638             :                         }
     639             :                 }
     640             :                 // If we didn't find any unused slots, create a new slot and append it.
     641             :                 // This slot will exist forever. The number of slots will grow to the
     642             :                 // maximum number of concurrent filesystem operations over the lifetime
     643             :                 // of the process. Only operations that grow the number of slots must
     644             :                 // incur an allocation.
     645           1 :                 if s == nil {
     646           0 :                         s = &slot{
     647           0 :                                 name:   name,
     648           0 :                                 opType: opType,
     649           0 :                         }
     650           0 :                         s.startTime.Store(start)
     651           0 :                         d.mu.inflight = append(d.mu.inflight, s)
     652           0 :                 }
     653             :         }()
     654             : 
     655           1 :         op()
     656           1 : 
     657           1 :         // Signal completion by zeroing the start time.
     658           1 :         s.startTime.Store(0)
     659             : }
     660             : 
     661             : // startTickerLocked starts a new goroutine with a ticker to monitor disk
     662             : // filesystem operations. Requires d.mu and !d.mu.tickerRunning.
     663           1 : func (d *diskHealthCheckingFS) startTickerLocked() {
     664           1 :         d.mu.tickerRunning = true
     665           1 :         stopper := d.mu.stopper
     666           1 :         go func() {
     667           1 :                 ticker := time.NewTicker(d.tickInterval)
     668           1 :                 defer ticker.Stop()
     669           1 :                 type exceededSlot struct {
     670           1 :                         name      string
     671           1 :                         opType    OpType
     672           1 :                         startTime crtime.Mono
     673           1 :                 }
     674           1 :                 var exceededSlots []exceededSlot
     675           1 : 
     676           1 :                 for {
     677           1 :                         select {
     678           1 :                         case <-ticker.C:
     679           1 :                                 // Scan the inflight slots for any slots recording a start
     680           1 :                                 // time older than the diskSlowThreshold.
     681           1 :                                 exceededSlots = exceededSlots[:0]
     682           1 :                                 d.mu.Lock()
     683           1 :                                 now := crtime.NowMono()
     684           1 :                                 for i := range d.mu.inflight {
     685           1 :                                         startTime := d.mu.inflight[i].startTime.Load()
     686           1 :                                         if startTime != 0 && now.Sub(startTime) > d.diskSlowThreshold {
     687           1 :                                                 // diskSlowThreshold was exceeded. Copy this inflightOp into
     688           1 :                                                 // exceededSlots and call d.onSlowDisk after dropping the mutex.
     689           1 :                                                 inflightOp := exceededSlot{
     690           1 :                                                         name:      d.mu.inflight[i].name,
     691           1 :                                                         opType:    d.mu.inflight[i].opType,
     692           1 :                                                         startTime: startTime,
     693           1 :                                                 }
     694           1 :                                                 exceededSlots = append(exceededSlots, inflightOp)
     695           1 :                                         }
     696             :                                 }
     697           1 :                                 d.mu.Unlock()
     698           1 :                                 for i := range exceededSlots {
     699           1 :                                         d.onSlowDisk(
     700           1 :                                                 DiskSlowInfo{
     701           1 :                                                         Path:      exceededSlots[i].name,
     702           1 :                                                         OpType:    exceededSlots[i].opType,
     703           1 :                                                         WriteSize: 0, // writes at the fs level are not sized
     704           1 :                                                         Duration:  now.Sub(exceededSlots[i].startTime),
     705           1 :                                                 })
     706           1 :                                 }
     707           1 :                         case <-stopper:
     708           1 :                                 return
     709             :                         }
     710             :                 }
     711             :         }()
     712             : }
     713             : 
     714             : // Close implements io.Closer. Close stops the long-running goroutine that
     715             : // monitors for slow filesystem metadata operations. Close may be called
     716             : // multiple times. If the filesystem is used after Close has been called, a new
     717             : // long-running goroutine will be created.
     718           1 : func (d *diskHealthCheckingFS) Close() error {
     719           1 :         d.mu.Lock()
     720           1 :         if !d.mu.tickerRunning {
     721           1 :                 // Nothing to stop.
     722           1 :                 d.mu.Unlock()
     723           1 :                 return nil
     724           1 :         }
     725             : 
     726             :         // Grab the stopper so we can request the long-running goroutine to stop.
     727             :         // Replace the stopper in case this FS is reused. It's possible to Close and
     728             :         // reuse a disk-health checking FS. This is to accommodate the on-by-default
     729             :         // behavior in Pebble, and the possibility that users may continue to use
     730             :         // the Pebble default FS beyond the lifetime of a single DB.
     731           1 :         stopper := d.mu.stopper
     732           1 :         d.mu.stopper = make(chan struct{})
     733           1 :         d.mu.tickerRunning = false
     734           1 :         d.mu.Unlock()
     735           1 : 
     736           1 :         // Ask the long-running goroutine to stop. This is a synchronous channel
     737           1 :         // send.
     738           1 :         stopper <- struct{}{}
     739           1 :         close(stopper)
     740           1 :         return nil
     741             : }
     742             : 
     743             : // Create implements the FS interface.
     744           1 : func (d *diskHealthCheckingFS) Create(name string, category DiskWriteCategory) (File, error) {
     745           1 :         var f File
     746           1 :         var err error
     747           1 :         d.timeFilesystemOp(name, OpTypeCreate, func() {
     748           1 :                 f, err = d.fs.Create(name, category)
     749           1 :         }, crtime.NowMono())
     750           1 :         if err != nil {
     751           1 :                 return f, err
     752           1 :         }
     753           1 :         if d.diskSlowThreshold == 0 {
     754           0 :                 return f, nil
     755           0 :         }
     756           1 :         checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, category, d.statsCollector,
     757           1 :                 func(opType OpType, writeSizeInBytes int, duration time.Duration) {
     758           1 :                         d.onSlowDisk(
     759           1 :                                 DiskSlowInfo{
     760           1 :                                         Path:      name,
     761           1 :                                         OpType:    opType,
     762           1 :                                         WriteSize: writeSizeInBytes,
     763           1 :                                         Duration:  duration,
     764           1 :                                 })
     765           1 :                 })
     766           1 :         checkingFile.startTicker()
     767           1 :         return checkingFile, nil
     768             : }
     769             : 
     770             : // GetDiskUsage implements the FS interface.
     771           1 : func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
     772           1 :         return d.fs.GetDiskUsage(path)
     773           1 : }
     774             : 
     775             : // Link implements the FS interface.
     776           1 : func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
     777           1 :         var err error
     778           1 :         d.timeFilesystemOp(newname, OpTypeLink, func() {
     779           1 :                 err = d.fs.Link(oldname, newname)
     780           1 :         }, crtime.NowMono())
     781           1 :         return err
     782             : }
     783             : 
     784             : // List implements the FS interface.
     785           1 : func (d *diskHealthCheckingFS) List(dir string) ([]string, error) {
     786           1 :         return d.fs.List(dir)
     787           1 : }
     788             : 
     789             : // Lock implements the FS interface.
     790           1 : func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
     791           1 :         return d.fs.Lock(name)
     792           1 : }
     793             : 
     794             : // MkdirAll implements the FS interface.
     795           1 : func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
     796           1 :         var err error
     797           1 :         d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
     798           1 :                 err = d.fs.MkdirAll(dir, perm)
     799           1 :         }, crtime.NowMono())
     800           1 :         return err
     801             : }
     802             : 
     803             : // Open implements the FS interface.
     804           1 : func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) {
     805           1 :         return d.fs.Open(name, opts...)
     806           1 : }
     807             : 
     808             : // OpenReadWrite implements the FS interface.
     809             : func (d *diskHealthCheckingFS) OpenReadWrite(
     810             :         name string, category DiskWriteCategory, opts ...OpenOption,
     811           0 : ) (File, error) {
     812           0 :         f, err := d.fs.OpenReadWrite(name, category, opts...)
     813           0 :         if err != nil {
     814           0 :                 return nil, err
     815           0 :         }
     816           0 :         return newDiskHealthCheckingFile(f, 0, category, d.statsCollector, func(opType OpType, writeSizeInBytes int, duration time.Duration) {}), nil
     817             : }
     818             : 
     819             : // OpenDir implements the FS interface.
     820           1 : func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
     821           1 :         f, err := d.fs.OpenDir(name)
     822           1 :         if err != nil {
     823           0 :                 return f, err
     824           0 :         }
     825             :         // Directories opened with OpenDir must be opened with health checking,
     826             :         // because they may be explicitly synced.
     827           1 :         return &diskHealthCheckingDir{
     828           1 :                 File: f,
     829           1 :                 name: name,
     830           1 :                 fs:   d,
     831           1 :         }, nil
     832             : }
     833             : 
     834             : // PathBase implements the FS interface.
     835           1 : func (d *diskHealthCheckingFS) PathBase(path string) string {
     836           1 :         return d.fs.PathBase(path)
     837           1 : }
     838             : 
     839             : // PathJoin implements the FS interface.
     840           1 : func (d *diskHealthCheckingFS) PathJoin(elem ...string) string {
     841           1 :         return d.fs.PathJoin(elem...)
     842           1 : }
     843             : 
     844             : // PathDir implements the FS interface.
     845           1 : func (d *diskHealthCheckingFS) PathDir(path string) string {
     846           1 :         return d.fs.PathDir(path)
     847           1 : }
     848             : 
     849             : // Remove implements the FS interface.
     850           1 : func (d *diskHealthCheckingFS) Remove(name string) error {
     851           1 :         var err error
     852           1 :         d.timeFilesystemOp(name, OpTypeRemove, func() {
     853           1 :                 err = d.fs.Remove(name)
     854           1 :         }, crtime.NowMono())
     855           1 :         return err
     856             : }
     857             : 
     858             : // RemoveAll implements the FS interface.
     859           1 : func (d *diskHealthCheckingFS) RemoveAll(name string) error {
     860           1 :         var err error
     861           1 :         d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
     862           1 :                 err = d.fs.RemoveAll(name)
     863           1 :         }, crtime.NowMono())
     864           1 :         return err
     865             : }
     866             : 
     867             : // Rename implements the FS interface.
     868           1 : func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
     869           1 :         var err error
     870           1 :         d.timeFilesystemOp(newname, OpTypeRename, func() {
     871           1 :                 err = d.fs.Rename(oldname, newname)
     872           1 :         }, crtime.NowMono())
     873           1 :         return err
     874             : }
     875             : 
     876             : // ReuseForWrite implements the FS interface.
     877             : func (d *diskHealthCheckingFS) ReuseForWrite(
     878             :         oldname, newname string, category DiskWriteCategory,
     879           1 : ) (File, error) {
     880           1 :         var f File
     881           1 :         var err error
     882           1 :         d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
     883           1 :                 f, err = d.fs.ReuseForWrite(oldname, newname, category)
     884           1 :         }, crtime.NowMono())
     885           1 :         if err != nil {
     886           1 :                 return f, err
     887           1 :         }
     888           1 :         if d.diskSlowThreshold == 0 {
     889           0 :                 return f, nil
     890           0 :         }
     891           1 :         checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, category, d.statsCollector,
     892           1 :                 func(opType OpType, writeSizeInBytes int, duration time.Duration) {
     893           0 :                         d.onSlowDisk(
     894           0 :                                 DiskSlowInfo{
     895           0 :                                         Path:      newname,
     896           0 :                                         OpType:    opType,
     897           0 :                                         WriteSize: writeSizeInBytes,
     898           0 :                                         Duration:  duration,
     899           0 :                                 })
     900           0 :                 })
     901           1 :         checkingFile.startTicker()
     902           1 :         return checkingFile, nil
     903             : }
     904             : 
     905             : // Stat implements the FS interface.
     906           1 : func (d *diskHealthCheckingFS) Stat(name string) (FileInfo, error) {
     907           1 :         return d.fs.Stat(name)
     908           1 : }
     909             : 
     910             : type noopCloser struct{}
     911             : 
     912           0 : func (noopCloser) Close() error { return nil }

Generated by: LCOV version 1.14