Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package vfs
6 :
7 : import (
8 : "cmp"
9 : "fmt"
10 : "io"
11 : "os"
12 : "path/filepath"
13 : "slices"
14 : "sync"
15 : "sync/atomic"
16 : "time"
17 :
18 : "github.com/cockroachdb/redact"
19 : )
20 :
21 : const (
22 : // preallocatedSlotCount is the default number of slots available for
23 : // concurrent filesystem operations. The slot count may be exceeded, but
24 : // each additional slot will incur an additional allocation. We choose 16
25 : // here with the expectation that it is significantly more than required in
26 : // practice. See the comment above the diskHealthCheckingFS type definition.
27 : preallocatedSlotCount = 16
28 : // deltaBits is the number of bits in the packed 64-bit integer used for
29 : // identifying a delta from the file creation time in milliseconds.
30 : deltaBits = 40
31 : // writeSizeBits is the number of bits in the packed 64-bit integer used for
32 : // identifying the size of the write operation, if the operation is sized. See
33 : // writeSizePrecision below for precision of size.
34 : writeSizeBits = 20
35 : // Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
36 : writeSizePrecision = 1024
37 : )
38 :
39 : // Variables to enable testing.
40 : var (
41 : // defaultTickInterval is the default interval between two ticks of each
42 : // diskHealthCheckingFile loop iteration.
43 : defaultTickInterval = 2 * time.Second
44 : )
45 :
46 : // OpType is the type of IO operation being monitored by a
47 : // diskHealthCheckingFile.
48 : type OpType uint8
49 :
50 : // The following OpTypes is limited to the subset of file system operations that
51 : // a diskHealthCheckingFile supports (namely writes and syncs).
52 : const (
53 : OpTypeUnknown OpType = iota
54 : OpTypeWrite
55 : OpTypeSync
56 : OpTypeSyncData
57 : OpTypeSyncTo
58 : OpTypeCreate
59 : OpTypeLink
60 : OpTypeMkdirAll
61 : OpTypePreallocate
62 : OpTypeRemove
63 : OpTypeRemoveAll
64 : OpTypeRename
65 : OpTypeReuseForWrite
66 : // Note: opTypeMax is just used in tests. It must appear last in the list
67 : // of OpTypes.
68 : opTypeMax
69 : )
70 :
71 : // String implements fmt.Stringer.
72 1 : func (o OpType) String() string {
73 1 : switch o {
74 1 : case OpTypeWrite:
75 1 : return "write"
76 1 : case OpTypeSync:
77 1 : return "sync"
78 1 : case OpTypeSyncData:
79 1 : return "syncdata"
80 1 : case OpTypeSyncTo:
81 1 : return "syncto"
82 1 : case OpTypeCreate:
83 1 : return "create"
84 1 : case OpTypeLink:
85 1 : return "link"
86 1 : case OpTypeMkdirAll:
87 1 : return "mkdirall"
88 1 : case OpTypePreallocate:
89 1 : return "preallocate"
90 1 : case OpTypeRemove:
91 1 : return "remove"
92 1 : case OpTypeRemoveAll:
93 1 : return "removall"
94 1 : case OpTypeRename:
95 1 : return "rename"
96 1 : case OpTypeReuseForWrite:
97 1 : return "reuseforwrite"
98 1 : case OpTypeUnknown:
99 1 : return "unknown"
100 0 : default:
101 0 : panic(fmt.Sprintf("vfs: unknown op type: %d", o))
102 : }
103 : }
104 :
105 : // DiskWriteCategory is a user-understandable string used to identify and aggregate
106 : // stats for disk writes. The prefix "pebble-" is reserved for internal Pebble categories.
107 : //
108 : // Some examples include, pebble-wal, pebble-memtable-flush, pebble-manifest and in the
109 : // Cockroach context includes, sql-row-spill, range-snapshot, crdb-log.
110 : type DiskWriteCategory string
111 :
112 : // WriteCategoryUnspecified denotes a disk write without a significant category.
113 : const WriteCategoryUnspecified DiskWriteCategory = "unspecified"
114 :
115 : // DiskWriteStatsAggregate is an aggregate of the bytes written to disk for a given category.
116 : type DiskWriteStatsAggregate struct {
117 : Category DiskWriteCategory
118 : BytesWritten uint64
119 : }
120 :
121 : // DiskWriteStatsCollector collects and aggregates disk write metrics per category.
122 : type DiskWriteStatsCollector struct {
123 : mu sync.Mutex
124 : statsMap map[DiskWriteCategory]*atomic.Uint64
125 : }
126 :
127 : // NewDiskWriteStatsCollector instantiates a new DiskWriteStatsCollector.
128 1 : func NewDiskWriteStatsCollector() *DiskWriteStatsCollector {
129 1 : return &DiskWriteStatsCollector{
130 1 : statsMap: make(map[DiskWriteCategory]*atomic.Uint64),
131 1 : }
132 1 : }
133 :
134 : // CreateStat inserts a new category to the statsMap if it doesn't already exist, otherwise
135 : // it returns a pointer to the current stats.
136 1 : func (d *DiskWriteStatsCollector) CreateStat(category DiskWriteCategory) *atomic.Uint64 {
137 1 : var bytesWritten *atomic.Uint64
138 1 : d.mu.Lock()
139 1 : defer d.mu.Unlock()
140 1 : if aggStats, ok := d.statsMap[category]; !ok {
141 1 : bytesWritten = new(atomic.Uint64)
142 1 : d.statsMap[category] = bytesWritten
143 1 : } else {
144 0 : bytesWritten = aggStats
145 0 : }
146 1 : return bytesWritten
147 : }
148 :
149 : // GetStats returns the aggregated metrics for all categories.
150 1 : func (d *DiskWriteStatsCollector) GetStats() []DiskWriteStatsAggregate {
151 1 : var stats []DiskWriteStatsAggregate
152 1 : d.mu.Lock()
153 1 : for category, numBytes := range d.statsMap {
154 1 : stats = append(stats, DiskWriteStatsAggregate{
155 1 : Category: category,
156 1 : BytesWritten: numBytes.Load(),
157 1 : })
158 1 : }
159 1 : d.mu.Unlock()
160 1 : slices.SortFunc(stats, func(a, b DiskWriteStatsAggregate) int { return cmp.Compare(a.Category, b.Category) })
161 1 : return stats
162 : }
163 :
164 : // diskHealthCheckingFile is a File wrapper to collect disk write stats,
165 : // detect slow disk operations, and call onSlowDisk if a disk operation
166 : // is seen to exceed diskSlowThreshold.
167 : //
168 : // This struct creates a goroutine (in startTicker()) that, at every tick
169 : // interval, sees if there's a disk operation taking longer than the specified
170 : // duration. This setup is preferable to creating a new timer at every disk
171 : // operation, as it reduces overhead per disk operation.
172 : type diskHealthCheckingFile struct {
173 : file File
174 : onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration)
175 : diskSlowThreshold time.Duration
176 : tickInterval time.Duration
177 :
178 : stopper chan struct{}
179 : // lastWritePacked is a 64-bit unsigned int. The most significant
180 : // 40 bits represent an delta (in milliseconds) from the creation
181 : // time of the diskHealthCheckingFile. The next most significant 20 bits
182 : // represent the size of the write in KBs, if the write has a size. (If
183 : // it doesn't, the 20 bits are zeroed). The least significant four bits
184 : // contains the OpType.
185 : //
186 : // The use of 40 bits for an delta provides ~34 years of effective
187 : // monitoring time before the uint wraps around, at millisecond precision.
188 : // ~34 years of process uptime "ought to be enough for anybody". Millisecond
189 : // writeSizePrecision is sufficient, given that we are monitoring for writes that take
190 : // longer than one millisecond.
191 : //
192 : // The use of 20 bits for the size in KBs allows representing sizes up
193 : // to nearly one GB. If the write is larger than that, we round down to ~one GB.
194 : //
195 : // The use of four bits for OpType allows for 16 operation types.
196 : //
197 : // NB: this packing scheme is not persisted, and is therefore safe to adjust
198 : // across process boundaries.
199 : lastWritePacked atomic.Uint64
200 : createTimeNanos int64
201 :
202 : // aggBytesWritten points to an atomic that aggregates the bytes written for files
203 : // that belong to a specific DiskWriteCategory. This pointer is also stored in the
204 : // DiskWriteStatsCollector for metric collection.
205 : aggBytesWritten *atomic.Uint64
206 : category DiskWriteCategory
207 : }
208 :
209 : // diskHealthCheckingFile implements File.
210 : var _ File = (*diskHealthCheckingFile)(nil)
211 :
212 : // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
213 : // specified time threshold and event listener.
214 : func newDiskHealthCheckingFile(
215 : file File,
216 : diskSlowThreshold time.Duration,
217 : category DiskWriteCategory,
218 : statsCollector *DiskWriteStatsCollector,
219 : onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
220 1 : ) *diskHealthCheckingFile {
221 1 : var bytesWritten *atomic.Uint64
222 1 : if statsCollector != nil {
223 1 : bytesWritten = statsCollector.CreateStat(category)
224 1 : } else {
225 1 : bytesWritten = new(atomic.Uint64)
226 1 : }
227 1 : return &diskHealthCheckingFile{
228 1 : file: file,
229 1 : onSlowDisk: onSlowDisk,
230 1 : diskSlowThreshold: diskSlowThreshold,
231 1 : tickInterval: defaultTickInterval,
232 1 :
233 1 : stopper: make(chan struct{}),
234 1 : createTimeNanos: time.Now().UnixNano(),
235 1 :
236 1 : aggBytesWritten: bytesWritten,
237 1 : category: category,
238 1 : }
239 : }
240 :
241 : // startTicker starts a new goroutine with a ticker to monitor disk operations.
242 : // Can only be called if the ticker goroutine isn't running already.
243 1 : func (d *diskHealthCheckingFile) startTicker() {
244 1 : if d.diskSlowThreshold == 0 {
245 0 : return
246 0 : }
247 :
248 1 : go func() {
249 1 : ticker := time.NewTicker(d.tickInterval)
250 1 : defer ticker.Stop()
251 1 :
252 1 : for {
253 1 : select {
254 1 : case <-d.stopper:
255 1 : return
256 :
257 1 : case <-ticker.C:
258 1 : packed := d.lastWritePacked.Load()
259 1 : if packed == 0 {
260 1 : continue
261 : }
262 1 : delta, writeSize, op := unpack(packed)
263 1 : lastWrite := time.Unix(0, d.createTimeNanos+delta.Nanoseconds())
264 1 : now := time.Now()
265 1 : if lastWrite.Add(d.diskSlowThreshold).Before(now) {
266 1 : // diskSlowThreshold was exceeded. Call the passed-in
267 1 : // listener.
268 1 : d.onSlowDisk(op, writeSize, now.Sub(lastWrite))
269 1 : }
270 : }
271 : }
272 : }()
273 : }
274 :
275 : // stopTicker stops the goroutine started in startTicker.
276 1 : func (d *diskHealthCheckingFile) stopTicker() {
277 1 : close(d.stopper)
278 1 : }
279 :
280 : // Fd implements (vfs.File).Fd.
281 1 : func (d *diskHealthCheckingFile) Fd() uintptr {
282 1 : return d.file.Fd()
283 1 : }
284 :
285 : // Read implements (vfs.File).Read
286 0 : func (d *diskHealthCheckingFile) Read(p []byte) (int, error) {
287 0 : return d.file.Read(p)
288 0 : }
289 :
290 : // ReadAt implements (vfs.File).ReadAt
291 0 : func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {
292 0 : return d.file.ReadAt(p, off)
293 0 : }
294 :
295 : // Write implements the io.Writer interface.
296 1 : func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
297 1 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
298 1 : n, err = d.file.Write(p)
299 1 : }, time.Now().UnixNano())
300 1 : d.aggBytesWritten.Add(uint64(n))
301 1 : return n, err
302 : }
303 :
304 : // WriteAt implements the io.WriterAt interface.
305 0 : func (d *diskHealthCheckingFile) WriteAt(p []byte, ofs int64) (n int, err error) {
306 0 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
307 0 : n, err = d.file.WriteAt(p, ofs)
308 0 : }, time.Now().UnixNano())
309 0 : d.aggBytesWritten.Add(uint64(n))
310 0 : return n, err
311 : }
312 :
313 : // Close implements the io.Closer interface.
314 1 : func (d *diskHealthCheckingFile) Close() error {
315 1 : d.stopTicker()
316 1 : return d.file.Close()
317 1 : }
318 :
319 : // Prefetch implements (vfs.File).Prefetch.
320 0 : func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
321 0 : return d.file.Prefetch(offset, length)
322 0 : }
323 :
324 : // Preallocate implements (vfs.File).Preallocate.
325 1 : func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
326 1 : d.timeDiskOp(OpTypePreallocate, n, func() {
327 1 : err = d.file.Preallocate(off, n)
328 1 : }, time.Now().UnixNano())
329 1 : return err
330 : }
331 :
332 : // Stat implements (vfs.File).Stat.
333 1 : func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) {
334 1 : return d.file.Stat()
335 1 : }
336 :
337 : // Sync implements the io.Syncer interface.
338 1 : func (d *diskHealthCheckingFile) Sync() (err error) {
339 1 : d.timeDiskOp(OpTypeSync, 0, func() {
340 1 : err = d.file.Sync()
341 1 : }, time.Now().UnixNano())
342 1 : return err
343 : }
344 :
345 : // SyncData implements (vfs.File).SyncData.
346 1 : func (d *diskHealthCheckingFile) SyncData() (err error) {
347 1 : d.timeDiskOp(OpTypeSyncData, 0, func() {
348 1 : err = d.file.SyncData()
349 1 : }, time.Now().UnixNano())
350 1 : return err
351 : }
352 :
353 : // SyncTo implements (vfs.File).SyncTo.
354 0 : func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
355 0 : d.timeDiskOp(OpTypeSyncTo, length, func() {
356 0 : fullSync, err = d.file.SyncTo(length)
357 0 : }, time.Now().UnixNano())
358 0 : return fullSync, err
359 : }
360 :
361 : // timeDiskOp runs the specified closure and makes its timing visible to the
362 : // monitoring goroutine, in case it exceeds one of the slow disk durations.
363 : // opType should always be set. writeSizeInBytes should be set if the write
364 : // operation is sized. If not, it should be set to zero.
365 : //
366 : // The start time is taken as a parameter in the form of nanoseconds since the
367 : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
368 : // is set appropriately), aiding postmortem debugging.
369 : func (d *diskHealthCheckingFile) timeDiskOp(
370 : opType OpType, writeSizeInBytes int64, op func(), startNanos int64,
371 1 : ) {
372 1 : if d == nil || d.diskSlowThreshold == 0 {
373 0 : op()
374 0 : return
375 0 : }
376 :
377 1 : delta := time.Duration(startNanos - d.createTimeNanos)
378 1 : packed := pack(delta, writeSizeInBytes, opType)
379 1 : if d.lastWritePacked.Swap(packed) != 0 {
380 0 : panic("concurrent write operations detected on file")
381 : }
382 1 : defer func() {
383 1 : if d.lastWritePacked.Swap(0) != packed {
384 0 : panic("concurrent write operations detected on file")
385 : }
386 : }()
387 1 : op()
388 : }
389 :
390 : // Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
391 : // callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
392 : // safe because the packing scheme implies we only actually need 32 bits.
393 1 : func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
394 1 : // We have no guarantee of clock monotonicity. If we have a small regression
395 1 : // in the clock, we set deltaMillis to zero, so we can still catch the operation
396 1 : // if happens to be slow.
397 1 : deltaMillis := delta.Milliseconds()
398 1 : if deltaMillis < 0 {
399 0 : deltaMillis = 0
400 0 : }
401 : // As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
402 : // of effective monitoring time before the uint wraps around, at millisecond
403 : // precision.
404 1 : if deltaMillis > 1<<deltaBits-1 {
405 1 : panic("vfs: last write delta would result in integer wraparound")
406 : }
407 :
408 : // See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
409 1 : writeSize := writeSizeInBytes / writeSizePrecision
410 1 : // If the size of the write is larger than we can store in the packed int, store the max
411 1 : // value we can store in the packed int.
412 1 : const writeSizeCeiling = 1<<writeSizeBits - 1
413 1 : if writeSize > writeSizeCeiling {
414 1 : writeSize = writeSizeCeiling
415 1 : }
416 :
417 1 : return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
418 : }
419 :
420 1 : func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
421 1 : delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
422 1 : wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
423 1 : // Given the packing scheme, converting wz to an int will not truncate anything.
424 1 : writeSizeInBytes = int(wz)
425 1 : opType = OpType(packed & 0xf)
426 1 : return delta, writeSizeInBytes, opType
427 1 : }
428 :
429 : // diskHealthCheckingDir implements disk-health checking for directories. Unlike
430 : // other files, we allow directories to receive concurrent write operations
431 : // (Syncs are the only write operations supported by a directory.) Since the
432 : // diskHealthCheckingFile's timeDiskOp can only track a single in-flight
433 : // operation at a time, we time the operation using the filesystem-level
434 : // timeFilesystemOp function instead.
435 : type diskHealthCheckingDir struct {
436 : File
437 : name string
438 : fs *diskHealthCheckingFS
439 : }
440 :
441 : // Sync implements the io.Syncer interface.
442 1 : func (d *diskHealthCheckingDir) Sync() (err error) {
443 1 : d.fs.timeFilesystemOp(d.name, OpTypeSync, func() {
444 1 : err = d.File.Sync()
445 1 : }, time.Now().UnixNano())
446 1 : return err
447 : }
448 :
449 : // DiskSlowInfo captures info about detected slow operations on the vfs.
450 : type DiskSlowInfo struct {
451 : // Path of file being written to.
452 : Path string
453 : // Operation being performed on the file.
454 : OpType OpType
455 : // Size of write in bytes, if the write is sized.
456 : WriteSize int
457 : // Duration that has elapsed since this disk operation started.
458 : Duration time.Duration
459 : }
460 :
461 0 : func (i DiskSlowInfo) String() string {
462 0 : return redact.StringWithoutMarkers(i)
463 0 : }
464 :
465 : // SafeFormat implements redact.SafeFormatter.
466 0 : func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
467 0 : switch i.OpType {
468 : // Operations for which i.WriteSize is meaningful.
469 0 : case OpTypeWrite, OpTypeSyncTo, OpTypePreallocate:
470 0 : w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
471 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
472 0 : redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds()))
473 0 : default:
474 0 : w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
475 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
476 0 : redact.Safe(i.Duration.Seconds()))
477 : }
478 : }
479 :
480 : // diskHealthCheckingFS adds disk-health checking facilities to a VFS.
481 : // It times disk write operations in two ways:
482 : //
483 : // 1. Wrapping vfs.Files.
484 : //
485 : // The bulk of write I/O activity is file writing and syncing, invoked through
486 : // the `vfs.File` interface. This VFS wraps all files open for writing with a
487 : // special diskHealthCheckingFile implementation of the vfs.File interface. See
488 : // above for the implementation.
489 : //
490 : // 2. Monitoring filesystem metadata operations.
491 : //
492 : // Filesystem metadata operations (create, link, remove, rename, etc) are also
493 : // sources of disk writes. Unlike a vfs.File which requires Write and Sync calls
494 : // to be sequential, a vfs.FS may receive these filesystem metadata operations
495 : // in parallel. To accommodate this parallelism, the diskHealthCheckingFS's
496 : // write-oriented filesystem operations record their start times into a 'slot'
497 : // on the filesystem. A single long-running goroutine periodically scans the
498 : // slots looking for slow operations.
499 : //
500 : // The number of slots on a diskHealthCheckingFS grows to a working set of the
501 : // maximum concurrent filesystem operations. This is expected to be very few
502 : // for these reasons:
503 : // 1. Pebble has limited write concurrency. Flushes, compactions and WAL
504 : // rotations are the primary sources of filesystem metadata operations. With
505 : // the default max-compaction concurrency, these operations require at most 5
506 : // concurrent slots if all 5 perform a filesystem metadata operation
507 : // simultaneously.
508 : // 2. Pebble's limited concurrent I/O writers spend most of their time
509 : // performing file I/O, not performing the filesystem metadata operations that
510 : // require recording a slot on the diskHealthCheckingFS.
511 : // 3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS
512 : // which provides a separate goroutine and set of slots.
513 : // 4. In CockroachDB, many of the additional sources of filesystem metadata
514 : // operations (like encryption-at-rest) are sequential with respect to Pebble's
515 : // threads.
516 : type diskHealthCheckingFS struct {
517 : tickInterval time.Duration
518 : diskSlowThreshold time.Duration
519 : statsCollector *DiskWriteStatsCollector
520 : onSlowDisk func(DiskSlowInfo)
521 : fs FS
522 : mu struct {
523 : sync.Mutex
524 : tickerRunning bool
525 : stopper chan struct{}
526 : inflight []*slot
527 : }
528 : // prealloc preallocates the memory for mu.inflight slots and the slice
529 : // itself. The contained fields are not accessed directly except by
530 : // WithDiskHealthChecks when initializing mu.inflight. The number of slots
531 : // in d.mu.inflight will grow to the maximum number of concurrent file
532 : // metadata operations (create, remove, link, etc). If the number of
533 : // concurrent operations never exceeds preallocatedSlotCount, we'll never
534 : // incur an additional allocation.
535 : prealloc struct {
536 : slots [preallocatedSlotCount]slot
537 : slotPtrSlice [preallocatedSlotCount]*slot
538 : }
539 : }
540 :
541 : type slot struct {
542 : name string
543 : opType OpType
544 : startNanos atomic.Int64
545 : }
546 :
547 : // diskHealthCheckingFS implements FS.
548 : var _ FS = (*diskHealthCheckingFS)(nil)
549 :
550 : // WithDiskHealthChecks wraps an FS and ensures that all write-oriented
551 : // operations on the FS are wrapped with disk health detection checks and
552 : // aggregated. Disk operations that are observed to take longer than
553 : // diskSlowThreshold trigger an onSlowDisk call.
554 : //
555 : // A threshold of zero disables disk-health checking.
556 : func WithDiskHealthChecks(
557 : innerFS FS,
558 : diskSlowThreshold time.Duration,
559 : statsCollector *DiskWriteStatsCollector,
560 : onSlowDisk func(info DiskSlowInfo),
561 1 : ) (FS, io.Closer) {
562 1 : if diskSlowThreshold == 0 {
563 0 : return innerFS, noopCloser{}
564 0 : }
565 :
566 1 : fs := &diskHealthCheckingFS{
567 1 : fs: innerFS,
568 1 : tickInterval: defaultTickInterval,
569 1 : diskSlowThreshold: diskSlowThreshold,
570 1 : statsCollector: statsCollector,
571 1 : onSlowDisk: onSlowDisk,
572 1 : }
573 1 : fs.mu.stopper = make(chan struct{})
574 1 : // The fs holds preallocated slots and a preallocated array of slot pointers
575 1 : // with equal length. Initialize the inflight slice to use a slice backed by
576 1 : // the preallocated array with each slot initialized to a preallocated slot.
577 1 : fs.mu.inflight = fs.prealloc.slotPtrSlice[:]
578 1 : for i := range fs.mu.inflight {
579 1 : fs.mu.inflight[i] = &fs.prealloc.slots[i]
580 1 : }
581 1 : return fs, fs
582 : }
583 :
584 : // timeFilesystemOp executes the provided closure, which should perform a
585 : // singular filesystem operation of a type matching opType on the named file. It
586 : // records the provided start time such that the long-lived disk-health checking
587 : // goroutine can observe if the operation is blocked for an inordinate time.
588 : //
589 : // The start time is taken as a parameter in the form of nanoseconds since the
590 : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
591 : // is set appropriately), aiding postmortem debugging.
592 : func (d *diskHealthCheckingFS) timeFilesystemOp(
593 : name string, opType OpType, op func(), startNanos int64,
594 1 : ) {
595 1 : if d == nil {
596 0 : op()
597 0 : return
598 0 : }
599 :
600 : // Record this operation's start time on the FS, so that the long-running
601 : // goroutine can monitor the filesystem operation.
602 : //
603 : // The diskHealthCheckingFile implementation uses a single field that is
604 : // atomically updated, taking advantage of the fact that writes to a single
605 : // vfs.File handle are not performed in parallel. The vfs.FS however may
606 : // receive write filesystem operations in parallel. To accommodate this
607 : // parallelism, writing goroutines append their start time to a
608 : // mutex-protected vector. On ticks, the long-running goroutine scans the
609 : // vector searching for start times older than the slow-disk threshold. When
610 : // a writing goroutine completes its operation, it atomically overwrites its
611 : // slot to signal completion.
612 1 : var s *slot
613 1 : func() {
614 1 : d.mu.Lock()
615 1 : defer d.mu.Unlock()
616 1 :
617 1 : // If there's no long-running goroutine to monitor this filesystem
618 1 : // operation, start one.
619 1 : if !d.mu.tickerRunning {
620 1 : d.startTickerLocked()
621 1 : }
622 :
623 1 : for i := 0; i < len(d.mu.inflight); i++ {
624 1 : if d.mu.inflight[i].startNanos.Load() == 0 {
625 1 : // This slot is not in use. Claim it.
626 1 : s = d.mu.inflight[i]
627 1 : s.name = name
628 1 : s.opType = opType
629 1 : s.startNanos.Store(startNanos)
630 1 : break
631 : }
632 : }
633 : // If we didn't find any unused slots, create a new slot and append it.
634 : // This slot will exist forever. The number of slots will grow to the
635 : // maximum number of concurrent filesystem operations over the lifetime
636 : // of the process. Only operations that grow the number of slots must
637 : // incur an allocation.
638 1 : if s == nil {
639 0 : s = &slot{
640 0 : name: name,
641 0 : opType: opType,
642 0 : }
643 0 : s.startNanos.Store(startNanos)
644 0 : d.mu.inflight = append(d.mu.inflight, s)
645 0 : }
646 : }()
647 :
648 1 : op()
649 1 :
650 1 : // Signal completion by zeroing the start time.
651 1 : s.startNanos.Store(0)
652 : }
653 :
654 : // startTickerLocked starts a new goroutine with a ticker to monitor disk
655 : // filesystem operations. Requires d.mu and !d.mu.tickerRunning.
656 1 : func (d *diskHealthCheckingFS) startTickerLocked() {
657 1 : d.mu.tickerRunning = true
658 1 : stopper := d.mu.stopper
659 1 : go func() {
660 1 : ticker := time.NewTicker(d.tickInterval)
661 1 : defer ticker.Stop()
662 1 : type exceededSlot struct {
663 1 : name string
664 1 : opType OpType
665 1 : startNanos int64
666 1 : }
667 1 : var exceededSlots []exceededSlot
668 1 :
669 1 : for {
670 1 : select {
671 1 : case <-ticker.C:
672 1 : // Scan the inflight slots for any slots recording a start
673 1 : // time older than the diskSlowThreshold.
674 1 : exceededSlots = exceededSlots[:0]
675 1 : d.mu.Lock()
676 1 : now := time.Now()
677 1 : for i := range d.mu.inflight {
678 1 : nanos := d.mu.inflight[i].startNanos.Load()
679 1 : if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) {
680 1 : // diskSlowThreshold was exceeded. Copy this inflightOp into
681 1 : // exceededSlots and call d.onSlowDisk after dropping the mutex.
682 1 : inflightOp := exceededSlot{
683 1 : name: d.mu.inflight[i].name,
684 1 : opType: d.mu.inflight[i].opType,
685 1 : startNanos: nanos,
686 1 : }
687 1 : exceededSlots = append(exceededSlots, inflightOp)
688 1 : }
689 : }
690 1 : d.mu.Unlock()
691 1 : for i := range exceededSlots {
692 1 : d.onSlowDisk(
693 1 : DiskSlowInfo{
694 1 : Path: exceededSlots[i].name,
695 1 : OpType: exceededSlots[i].opType,
696 1 : WriteSize: 0, // writes at the fs level are not sized
697 1 : Duration: now.Sub(time.Unix(0, exceededSlots[i].startNanos)),
698 1 : })
699 1 : }
700 1 : case <-stopper:
701 1 : return
702 : }
703 : }
704 : }()
705 : }
706 :
707 : // Close implements io.Closer. Close stops the long-running goroutine that
708 : // monitors for slow filesystem metadata operations. Close may be called
709 : // multiple times. If the filesystem is used after Close has been called, a new
710 : // long-running goroutine will be created.
711 1 : func (d *diskHealthCheckingFS) Close() error {
712 1 : d.mu.Lock()
713 1 : if !d.mu.tickerRunning {
714 1 : // Nothing to stop.
715 1 : d.mu.Unlock()
716 1 : return nil
717 1 : }
718 :
719 : // Grab the stopper so we can request the long-running goroutine to stop.
720 : // Replace the stopper in case this FS is reused. It's possible to Close and
721 : // reuse a disk-health checking FS. This is to accommodate the on-by-default
722 : // behavior in Pebble, and the possibility that users may continue to use
723 : // the Pebble default FS beyond the lifetime of a single DB.
724 1 : stopper := d.mu.stopper
725 1 : d.mu.stopper = make(chan struct{})
726 1 : d.mu.tickerRunning = false
727 1 : d.mu.Unlock()
728 1 :
729 1 : // Ask the long-running goroutine to stop. This is a synchronous channel
730 1 : // send.
731 1 : stopper <- struct{}{}
732 1 : close(stopper)
733 1 : return nil
734 : }
735 :
736 : // Create implements the FS interface.
737 1 : func (d *diskHealthCheckingFS) Create(name string, category DiskWriteCategory) (File, error) {
738 1 : var f File
739 1 : var err error
740 1 : d.timeFilesystemOp(name, OpTypeCreate, func() {
741 1 : f, err = d.fs.Create(name, category)
742 1 : }, time.Now().UnixNano())
743 1 : if err != nil {
744 1 : return f, err
745 1 : }
746 1 : if d.diskSlowThreshold == 0 {
747 0 : return f, nil
748 0 : }
749 1 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, category, d.statsCollector,
750 1 : func(opType OpType, writeSizeInBytes int, duration time.Duration) {
751 1 : d.onSlowDisk(
752 1 : DiskSlowInfo{
753 1 : Path: name,
754 1 : OpType: opType,
755 1 : WriteSize: writeSizeInBytes,
756 1 : Duration: duration,
757 1 : })
758 1 : })
759 1 : checkingFile.startTicker()
760 1 : return checkingFile, nil
761 : }
762 :
763 : // GetDiskUsage implements the FS interface.
764 1 : func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
765 1 : return d.fs.GetDiskUsage(path)
766 1 : }
767 :
768 : // Link implements the FS interface.
769 1 : func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
770 1 : var err error
771 1 : d.timeFilesystemOp(newname, OpTypeLink, func() {
772 1 : err = d.fs.Link(oldname, newname)
773 1 : }, time.Now().UnixNano())
774 1 : return err
775 : }
776 :
777 : // List implements the FS interface.
778 1 : func (d *diskHealthCheckingFS) List(dir string) ([]string, error) {
779 1 : return d.fs.List(dir)
780 1 : }
781 :
782 : // Lock implements the FS interface.
783 1 : func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
784 1 : return d.fs.Lock(name)
785 1 : }
786 :
787 : // MkdirAll implements the FS interface.
788 1 : func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
789 1 : var err error
790 1 : d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
791 1 : err = d.fs.MkdirAll(dir, perm)
792 1 : }, time.Now().UnixNano())
793 1 : return err
794 : }
795 :
796 : // Open implements the FS interface.
797 1 : func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) {
798 1 : return d.fs.Open(name, opts...)
799 1 : }
800 :
801 : // OpenReadWrite implements the FS interface.
802 : func (d *diskHealthCheckingFS) OpenReadWrite(
803 : name string, category DiskWriteCategory, opts ...OpenOption,
804 0 : ) (File, error) {
805 0 : f, err := d.fs.OpenReadWrite(name, category, opts...)
806 0 : if err != nil {
807 0 : return nil, err
808 0 : }
809 0 : return newDiskHealthCheckingFile(f, 0, category, d.statsCollector, func(opType OpType, writeSizeInBytes int, duration time.Duration) {}), nil
810 : }
811 :
812 : // OpenDir implements the FS interface.
813 1 : func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
814 1 : f, err := d.fs.OpenDir(name)
815 1 : if err != nil {
816 0 : return f, err
817 0 : }
818 : // Directories opened with OpenDir must be opened with health checking,
819 : // because they may be explicitly synced.
820 1 : return &diskHealthCheckingDir{
821 1 : File: f,
822 1 : name: name,
823 1 : fs: d,
824 1 : }, nil
825 : }
826 :
827 : // PathBase implements the FS interface.
828 1 : func (d *diskHealthCheckingFS) PathBase(path string) string {
829 1 : return d.fs.PathBase(path)
830 1 : }
831 :
832 : // PathJoin implements the FS interface.
833 1 : func (d *diskHealthCheckingFS) PathJoin(elem ...string) string {
834 1 : return d.fs.PathJoin(elem...)
835 1 : }
836 :
837 : // PathDir implements the FS interface.
838 1 : func (d *diskHealthCheckingFS) PathDir(path string) string {
839 1 : return d.fs.PathDir(path)
840 1 : }
841 :
842 : // Remove implements the FS interface.
843 1 : func (d *diskHealthCheckingFS) Remove(name string) error {
844 1 : var err error
845 1 : d.timeFilesystemOp(name, OpTypeRemove, func() {
846 1 : err = d.fs.Remove(name)
847 1 : }, time.Now().UnixNano())
848 1 : return err
849 : }
850 :
851 : // RemoveAll implements the FS interface.
852 1 : func (d *diskHealthCheckingFS) RemoveAll(name string) error {
853 1 : var err error
854 1 : d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
855 1 : err = d.fs.RemoveAll(name)
856 1 : }, time.Now().UnixNano())
857 1 : return err
858 : }
859 :
860 : // Rename implements the FS interface.
861 1 : func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
862 1 : var err error
863 1 : d.timeFilesystemOp(newname, OpTypeRename, func() {
864 1 : err = d.fs.Rename(oldname, newname)
865 1 : }, time.Now().UnixNano())
866 1 : return err
867 : }
868 :
869 : // ReuseForWrite implements the FS interface.
870 : func (d *diskHealthCheckingFS) ReuseForWrite(
871 : oldname, newname string, category DiskWriteCategory,
872 1 : ) (File, error) {
873 1 : var f File
874 1 : var err error
875 1 : d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
876 1 : f, err = d.fs.ReuseForWrite(oldname, newname, category)
877 1 : }, time.Now().UnixNano())
878 1 : if err != nil {
879 1 : return f, err
880 1 : }
881 1 : if d.diskSlowThreshold == 0 {
882 0 : return f, nil
883 0 : }
884 1 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, category, d.statsCollector,
885 1 : func(opType OpType, writeSizeInBytes int, duration time.Duration) {
886 0 : d.onSlowDisk(
887 0 : DiskSlowInfo{
888 0 : Path: newname,
889 0 : OpType: opType,
890 0 : WriteSize: writeSizeInBytes,
891 0 : Duration: duration,
892 0 : })
893 0 : })
894 1 : checkingFile.startTicker()
895 1 : return checkingFile, nil
896 : }
897 :
898 : // Stat implements the FS interface.
899 1 : func (d *diskHealthCheckingFS) Stat(name string) (os.FileInfo, error) {
900 1 : return d.fs.Stat(name)
901 1 : }
902 :
903 : type noopCloser struct{}
904 :
905 0 : func (noopCloser) Close() error { return nil }
|