Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package vfs
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "os"
11 : "path/filepath"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/redact"
17 : )
18 :
19 : const (
20 : // preallocatedSlotCount is the default number of slots available for
21 : // concurrent filesystem operations. The slot count may be exceeded, but
22 : // each additional slot will incur an additional allocation. We choose 16
23 : // here with the expectation that it is significantly more than required in
24 : // practice. See the comment above the diskHealthCheckingFS type definition.
25 : preallocatedSlotCount = 16
26 : // deltaBits is the number of bits in the packed 64-bit integer used for
27 : // identifying a delta from the file creation time in milliseconds.
28 : deltaBits = 40
29 : // writeSizeBits is the number of bits in the packed 64-bit integer used for
30 : // identifying the size of the write operation, if the operation is sized. See
31 : // writeSizePrecision below for precision of size.
32 : writeSizeBits = 20
33 : // Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
34 : writeSizePrecision = 1024
35 : )
36 :
37 : // Variables to enable testing.
38 : var (
39 : // defaultTickInterval is the default interval between two ticks of each
40 : // diskHealthCheckingFile loop iteration.
41 : defaultTickInterval = 2 * time.Second
42 : )
43 :
44 : // OpType is the type of IO operation being monitored by a
45 : // diskHealthCheckingFile.
46 : type OpType uint8
47 :
48 : // The following OpTypes is limited to the subset of file system operations that
49 : // a diskHealthCheckingFile supports (namely writes and syncs).
50 : const (
51 : OpTypeUnknown OpType = iota
52 : OpTypeWrite
53 : OpTypeSync
54 : OpTypeSyncData
55 : OpTypeSyncTo
56 : OpTypeCreate
57 : OpTypeLink
58 : OpTypeMkdirAll
59 : OpTypePreallocate
60 : OpTypeRemove
61 : OpTypeRemoveAll
62 : OpTypeRename
63 : OpTypeReuseForWrite
64 : // Note: opTypeMax is just used in tests. It must appear last in the list
65 : // of OpTypes.
66 : opTypeMax
67 : )
68 :
69 : // String implements fmt.Stringer.
70 1 : func (o OpType) String() string {
71 1 : switch o {
72 1 : case OpTypeWrite:
73 1 : return "write"
74 1 : case OpTypeSync:
75 1 : return "sync"
76 1 : case OpTypeSyncData:
77 1 : return "syncdata"
78 1 : case OpTypeSyncTo:
79 1 : return "syncto"
80 1 : case OpTypeCreate:
81 1 : return "create"
82 1 : case OpTypeLink:
83 1 : return "link"
84 1 : case OpTypeMkdirAll:
85 1 : return "mkdirall"
86 1 : case OpTypePreallocate:
87 1 : return "preallocate"
88 1 : case OpTypeRemove:
89 1 : return "remove"
90 1 : case OpTypeRemoveAll:
91 1 : return "removall"
92 1 : case OpTypeRename:
93 1 : return "rename"
94 1 : case OpTypeReuseForWrite:
95 1 : return "reuseforwrite"
96 1 : case OpTypeUnknown:
97 1 : return "unknown"
98 0 : default:
99 0 : panic(fmt.Sprintf("vfs: unknown op type: %d", o))
100 : }
101 : }
102 :
103 : // diskHealthCheckingFile is a File wrapper to detect slow disk operations, and
104 : // call onSlowDisk if a disk operation is seen to exceed diskSlowThreshold.
105 : //
106 : // This struct creates a goroutine (in startTicker()) that, at every tick
107 : // interval, sees if there's a disk operation taking longer than the specified
108 : // duration. This setup is preferable to creating a new timer at every disk
109 : // operation, as it reduces overhead per disk operation.
110 : type diskHealthCheckingFile struct {
111 : file File
112 : onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration)
113 : diskSlowThreshold time.Duration
114 : tickInterval time.Duration
115 :
116 : stopper chan struct{}
117 : // lastWritePacked is a 64-bit unsigned int. The most significant
118 : // 40 bits represent an delta (in milliseconds) from the creation
119 : // time of the diskHealthCheckingFile. The next most significant 20 bits
120 : // represent the size of the write in KBs, if the write has a size. (If
121 : // it doesn't, the 20 bits are zeroed). The least significant four bits
122 : // contains the OpType.
123 : //
124 : // The use of 40 bits for an delta provides ~34 years of effective
125 : // monitoring time before the uint wraps around, at millisecond precision.
126 : // ~34 years of process uptime "ought to be enough for anybody". Millisecond
127 : // writeSizePrecision is sufficient, given that we are monitoring for writes that take
128 : // longer than one millisecond.
129 : //
130 : // The use of 20 bits for the size in KBs allows representing sizes up
131 : // to nearly one GB. If the write is larger than that, we round down to ~one GB.
132 : //
133 : // The use of four bits for OpType allows for 16 operation types.
134 : //
135 : // NB: this packing scheme is not persisted, and is therefore safe to adjust
136 : // across process boundaries.
137 : lastWritePacked atomic.Uint64
138 : createTimeNanos int64
139 : }
140 :
141 : // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
142 : // specified time threshold and event listener.
143 : func newDiskHealthCheckingFile(
144 : file File,
145 : diskSlowThreshold time.Duration,
146 : onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
147 2 : ) *diskHealthCheckingFile {
148 2 : return &diskHealthCheckingFile{
149 2 : file: file,
150 2 : onSlowDisk: onSlowDisk,
151 2 : diskSlowThreshold: diskSlowThreshold,
152 2 : tickInterval: defaultTickInterval,
153 2 :
154 2 : stopper: make(chan struct{}),
155 2 : createTimeNanos: time.Now().UnixNano(),
156 2 : }
157 2 : }
158 :
159 : // startTicker starts a new goroutine with a ticker to monitor disk operations.
160 : // Can only be called if the ticker goroutine isn't running already.
161 2 : func (d *diskHealthCheckingFile) startTicker() {
162 2 : if d.diskSlowThreshold == 0 {
163 0 : return
164 0 : }
165 :
166 2 : go func() {
167 2 : ticker := time.NewTicker(d.tickInterval)
168 2 : defer ticker.Stop()
169 2 :
170 2 : for {
171 2 : select {
172 2 : case <-d.stopper:
173 2 : return
174 :
175 2 : case <-ticker.C:
176 2 : packed := d.lastWritePacked.Load()
177 2 : if packed == 0 {
178 2 : continue
179 : }
180 1 : delta, writeSize, op := unpack(packed)
181 1 : lastWrite := time.Unix(0, d.createTimeNanos+delta.Nanoseconds())
182 1 : now := time.Now()
183 1 : if lastWrite.Add(d.diskSlowThreshold).Before(now) {
184 1 : // diskSlowThreshold was exceeded. Call the passed-in
185 1 : // listener.
186 1 : d.onSlowDisk(op, writeSize, now.Sub(lastWrite))
187 1 : }
188 : }
189 : }
190 : }()
191 : }
192 :
193 : // stopTicker stops the goroutine started in startTicker.
194 2 : func (d *diskHealthCheckingFile) stopTicker() {
195 2 : close(d.stopper)
196 2 : }
197 :
198 : // Fd implements (vfs.File).Fd.
199 2 : func (d *diskHealthCheckingFile) Fd() uintptr {
200 2 : return d.file.Fd()
201 2 : }
202 :
203 : // Read implements (vfs.File).Read
204 0 : func (d *diskHealthCheckingFile) Read(p []byte) (int, error) {
205 0 : return d.file.Read(p)
206 0 : }
207 :
208 : // ReadAt implements (vfs.File).ReadAt
209 0 : func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {
210 0 : return d.file.ReadAt(p, off)
211 0 : }
212 :
213 : // Write implements the io.Writer interface.
214 2 : func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
215 2 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
216 2 : n, err = d.file.Write(p)
217 2 : }, time.Now().UnixNano())
218 2 : return n, err
219 : }
220 :
221 : // Write implements the io.WriterAt interface.
222 0 : func (d *diskHealthCheckingFile) WriteAt(p []byte, ofs int64) (n int, err error) {
223 0 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
224 0 : n, err = d.file.WriteAt(p, ofs)
225 0 : }, time.Now().UnixNano())
226 0 : return n, err
227 : }
228 :
229 : // Close implements the io.Closer interface.
230 2 : func (d *diskHealthCheckingFile) Close() error {
231 2 : d.stopTicker()
232 2 : return d.file.Close()
233 2 : }
234 :
235 : // Prefetch implements (vfs.File).Prefetch.
236 0 : func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
237 0 : return d.file.Prefetch(offset, length)
238 0 : }
239 :
240 : // Preallocate implements (vfs.File).Preallocate.
241 2 : func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
242 2 : d.timeDiskOp(OpTypePreallocate, n, func() {
243 2 : err = d.file.Preallocate(off, n)
244 2 : }, time.Now().UnixNano())
245 2 : return err
246 : }
247 :
248 : // Stat implements (vfs.File).Stat.
249 1 : func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) {
250 1 : return d.file.Stat()
251 1 : }
252 :
253 : // Sync implements the io.Syncer interface.
254 2 : func (d *diskHealthCheckingFile) Sync() (err error) {
255 2 : d.timeDiskOp(OpTypeSync, 0, func() {
256 2 : err = d.file.Sync()
257 2 : }, time.Now().UnixNano())
258 2 : return err
259 : }
260 :
261 : // SyncData implements (vfs.File).SyncData.
262 2 : func (d *diskHealthCheckingFile) SyncData() (err error) {
263 2 : d.timeDiskOp(OpTypeSyncData, 0, func() {
264 2 : err = d.file.SyncData()
265 2 : }, time.Now().UnixNano())
266 2 : return err
267 : }
268 :
269 : // SyncTo implements (vfs.File).SyncTo.
270 0 : func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
271 0 : d.timeDiskOp(OpTypeSyncTo, length, func() {
272 0 : fullSync, err = d.file.SyncTo(length)
273 0 : }, time.Now().UnixNano())
274 0 : return fullSync, err
275 : }
276 :
277 : // timeDiskOp runs the specified closure and makes its timing visible to the
278 : // monitoring goroutine, in case it exceeds one of the slow disk durations.
279 : // opType should always be set. writeSizeInBytes should be set if the write
280 : // operation is sized. If not, it should be set to zero.
281 : //
282 : // The start time is taken as a parameter in the form of nanoseconds since the
283 : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
284 : // is set appropriately), aiding postmortem debugging.
285 : func (d *diskHealthCheckingFile) timeDiskOp(
286 : opType OpType, writeSizeInBytes int64, op func(), startNanos int64,
287 2 : ) {
288 2 : if d == nil {
289 0 : op()
290 0 : return
291 0 : }
292 :
293 2 : delta := time.Duration(startNanos - d.createTimeNanos)
294 2 : packed := pack(delta, writeSizeInBytes, opType)
295 2 : if d.lastWritePacked.Swap(packed) != 0 {
296 0 : panic("concurrent write operations detected on file")
297 : }
298 2 : defer func() {
299 2 : if d.lastWritePacked.Swap(0) != packed {
300 0 : panic("concurrent write operations detected on file")
301 : }
302 : }()
303 2 : op()
304 : }
305 :
306 : // Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
307 : // callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
308 : // safe because the packing scheme implies we only actually need 32 bits.
309 2 : func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
310 2 : // We have no guarantee of clock monotonicity. If we have a small regression
311 2 : // in the clock, we set deltaMillis to zero, so we can still catch the operation
312 2 : // if happens to be slow.
313 2 : deltaMillis := delta.Milliseconds()
314 2 : if deltaMillis < 0 {
315 0 : deltaMillis = 0
316 0 : }
317 : // As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
318 : // of effective monitoring time before the uint wraps around, at millisecond
319 : // precision.
320 2 : if deltaMillis > 1<<deltaBits-1 {
321 1 : panic("vfs: last write delta would result in integer wraparound")
322 : }
323 :
324 : // See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
325 2 : writeSize := writeSizeInBytes / writeSizePrecision
326 2 : // If the size of the write is larger than we can store in the packed int, store the max
327 2 : // value we can store in the packed int.
328 2 : const writeSizeCeiling = 1<<writeSizeBits - 1
329 2 : if writeSize > writeSizeCeiling {
330 1 : writeSize = writeSizeCeiling
331 1 : }
332 :
333 2 : return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
334 : }
335 :
336 1 : func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
337 1 : delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
338 1 : wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
339 1 : // Given the packing scheme, converting wz to an int will not truncate anything.
340 1 : writeSizeInBytes = int(wz)
341 1 : opType = OpType(packed & 0xf)
342 1 : return delta, writeSizeInBytes, opType
343 1 : }
344 :
345 : // diskHealthCheckingDir implements disk-health checking for directories. Unlike
346 : // other files, we allow directories to receive concurrent write operations
347 : // (Syncs are the only write operations supported by a directory.) Since the
348 : // diskHealthCheckingFile's timeDiskOp can only track a single in-flight
349 : // operation at a time, we time the operation using the filesystem-level
350 : // timeFilesystemOp function instead.
351 : type diskHealthCheckingDir struct {
352 : File
353 : name string
354 : fs *diskHealthCheckingFS
355 : }
356 :
357 : // Sync implements the io.Syncer interface.
358 2 : func (d *diskHealthCheckingDir) Sync() (err error) {
359 2 : d.fs.timeFilesystemOp(d.name, OpTypeSync, func() {
360 2 : err = d.File.Sync()
361 2 : }, time.Now().UnixNano())
362 2 : return err
363 : }
364 :
365 : // DiskSlowInfo captures info about detected slow operations on the vfs.
366 : type DiskSlowInfo struct {
367 : // Path of file being written to.
368 : Path string
369 : // Operation being performed on the file.
370 : OpType OpType
371 : // Size of write in bytes, if the write is sized.
372 : WriteSize int
373 : // Duration that has elapsed since this disk operation started.
374 : Duration time.Duration
375 : }
376 :
377 0 : func (i DiskSlowInfo) String() string {
378 0 : return redact.StringWithoutMarkers(i)
379 0 : }
380 :
381 : // SafeFormat implements redact.SafeFormatter.
382 0 : func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
383 0 : switch i.OpType {
384 : // Operations for which i.WriteSize is meaningful.
385 0 : case OpTypeWrite, OpTypeSyncTo, OpTypePreallocate:
386 0 : w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
387 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
388 0 : redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds()))
389 0 : default:
390 0 : w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
391 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
392 0 : redact.Safe(i.Duration.Seconds()))
393 : }
394 : }
395 :
396 : // diskHealthCheckingFS adds disk-health checking facilities to a VFS.
397 : // It times disk write operations in two ways:
398 : //
399 : // 1. Wrapping vfs.Files.
400 : //
401 : // The bulk of write I/O activity is file writing and syncing, invoked through
402 : // the `vfs.File` interface. This VFS wraps all files open for writing with a
403 : // special diskHealthCheckingFile implementation of the vfs.File interface. See
404 : // above for the implementation.
405 : //
406 : // 2. Monitoring filesystem metadata operations.
407 : //
408 : // Filesystem metadata operations (create, link, remove, rename, etc) are also
409 : // sources of disk writes. Unlike a vfs.File which requires Write and Sync calls
410 : // to be sequential, a vfs.FS may receive these filesystem metadata operations
411 : // in parallel. To accommodate this parallelism, the diskHealthCheckingFS's
412 : // write-oriented filesystem operations record their start times into a 'slot'
413 : // on the filesystem. A single long-running goroutine periodically scans the
414 : // slots looking for slow operations.
415 : //
416 : // The number of slots on a diskHealthCheckingFS grows to a working set of the
417 : // maximum concurrent filesystem operations. This is expected to be very few
418 : // for these reasons:
419 : // 1. Pebble has limited write concurrency. Flushes, compactions and WAL
420 : // rotations are the primary sources of filesystem metadata operations. With
421 : // the default max-compaction concurrency, these operations require at most 5
422 : // concurrent slots if all 5 perform a filesystem metadata operation
423 : // simultaneously.
424 : // 2. Pebble's limited concurrent I/O writers spend most of their time
425 : // performing file I/O, not performing the filesystem metadata operations that
426 : // require recording a slot on the diskHealthCheckingFS.
427 : // 3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS
428 : // which provides a separate goroutine and set of slots.
429 : // 4. In CockroachDB, many of the additional sources of filesystem metadata
430 : // operations (like encryption-at-rest) are sequential with respect to Pebble's
431 : // threads.
432 : type diskHealthCheckingFS struct {
433 : tickInterval time.Duration
434 : diskSlowThreshold time.Duration
435 : onSlowDisk func(DiskSlowInfo)
436 : fs FS
437 : mu struct {
438 : sync.Mutex
439 : tickerRunning bool
440 : stopper chan struct{}
441 : inflight []*slot
442 : }
443 : // prealloc preallocates the memory for mu.inflight slots and the slice
444 : // itself. The contained fields are not accessed directly except by
445 : // WithDiskHealthChecks when initializing mu.inflight. The number of slots
446 : // in d.mu.inflight will grow to the maximum number of concurrent file
447 : // metadata operations (create, remove, link, etc). If the number of
448 : // concurrent operations never exceeds preallocatedSlotCount, we'll never
449 : // incur an additional allocation.
450 : prealloc struct {
451 : slots [preallocatedSlotCount]slot
452 : slotPtrSlice [preallocatedSlotCount]*slot
453 : }
454 : }
455 :
456 : type slot struct {
457 : name string
458 : opType OpType
459 : startNanos atomic.Int64
460 : }
461 :
462 : // diskHealthCheckingFS implements FS.
463 : var _ FS = (*diskHealthCheckingFS)(nil)
464 :
465 : // WithDiskHealthChecks wraps an FS and ensures that all write-oriented
466 : // operations on the FS are wrapped with disk health detection checks. Disk
467 : // operations that are observed to take longer than diskSlowThreshold trigger an
468 : // onSlowDisk call.
469 : //
470 : // A threshold of zero disables disk-health checking.
471 : func WithDiskHealthChecks(
472 : innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(info DiskSlowInfo),
473 2 : ) (FS, io.Closer) {
474 2 : if diskSlowThreshold == 0 {
475 0 : return innerFS, noopCloser{}
476 0 : }
477 :
478 2 : fs := &diskHealthCheckingFS{
479 2 : fs: innerFS,
480 2 : tickInterval: defaultTickInterval,
481 2 : diskSlowThreshold: diskSlowThreshold,
482 2 : onSlowDisk: onSlowDisk,
483 2 : }
484 2 : fs.mu.stopper = make(chan struct{})
485 2 : // The fs holds preallocated slots and a preallocated array of slot pointers
486 2 : // with equal length. Initialize the inflight slice to use a slice backed by
487 2 : // the preallocated array with each slot initialized to a preallocated slot.
488 2 : fs.mu.inflight = fs.prealloc.slotPtrSlice[:]
489 2 : for i := range fs.mu.inflight {
490 2 : fs.mu.inflight[i] = &fs.prealloc.slots[i]
491 2 : }
492 2 : return fs, fs
493 : }
494 :
495 : // timeFilesystemOp executes the provided closure, which should perform a
496 : // singular filesystem operation of a type matching opType on the named file. It
497 : // records the provided start time such that the long-lived disk-health checking
498 : // goroutine can observe if the operation is blocked for an inordinate time.
499 : //
500 : // The start time is taken as a parameter in the form of nanoseconds since the
501 : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
502 : // is set appropriately), aiding postmortem debugging.
503 : func (d *diskHealthCheckingFS) timeFilesystemOp(
504 : name string, opType OpType, op func(), startNanos int64,
505 2 : ) {
506 2 : if d == nil {
507 0 : op()
508 0 : return
509 0 : }
510 :
511 : // Record this operation's start time on the FS, so that the long-running
512 : // goroutine can monitor the filesystem operation.
513 : //
514 : // The diskHealthCheckingFile implementation uses a single field that is
515 : // atomically updated, taking advantage of the fact that writes to a single
516 : // vfs.File handle are not performed in parallel. The vfs.FS however may
517 : // receive write filesystem operations in parallel. To accommodate this
518 : // parallelism, writing goroutines append their start time to a
519 : // mutex-protected vector. On ticks, the long-running goroutine scans the
520 : // vector searching for start times older than the slow-disk threshold. When
521 : // a writing goroutine completes its operation, it atomically overwrites its
522 : // slot to signal completion.
523 2 : var s *slot
524 2 : func() {
525 2 : d.mu.Lock()
526 2 : defer d.mu.Unlock()
527 2 :
528 2 : // If there's no long-running goroutine to monitor this filesystem
529 2 : // operation, start one.
530 2 : if !d.mu.tickerRunning {
531 2 : d.startTickerLocked()
532 2 : }
533 :
534 2 : for i := 0; i < len(d.mu.inflight); i++ {
535 2 : if d.mu.inflight[i].startNanos.Load() == 0 {
536 2 : // This slot is not in use. Claim it.
537 2 : s = d.mu.inflight[i]
538 2 : s.name = name
539 2 : s.opType = opType
540 2 : s.startNanos.Store(startNanos)
541 2 : break
542 : }
543 : }
544 : // If we didn't find any unused slots, create a new slot and append it.
545 : // This slot will exist forever. The number of slots will grow to the
546 : // maximum number of concurrent filesystem operations over the lifetime
547 : // of the process. Only operations that grow the number of slots must
548 : // incur an allocation.
549 2 : if s == nil {
550 0 : s = &slot{
551 0 : name: name,
552 0 : opType: opType,
553 0 : }
554 0 : s.startNanos.Store(startNanos)
555 0 : d.mu.inflight = append(d.mu.inflight, s)
556 0 : }
557 : }()
558 :
559 2 : op()
560 2 :
561 2 : // Signal completion by zeroing the start time.
562 2 : s.startNanos.Store(0)
563 : }
564 :
565 : // startTickerLocked starts a new goroutine with a ticker to monitor disk
566 : // filesystem operations. Requires d.mu and !d.mu.tickerRunning.
567 2 : func (d *diskHealthCheckingFS) startTickerLocked() {
568 2 : d.mu.tickerRunning = true
569 2 : stopper := d.mu.stopper
570 2 : go func() {
571 2 : ticker := time.NewTicker(d.tickInterval)
572 2 : defer ticker.Stop()
573 2 : type exceededSlot struct {
574 2 : name string
575 2 : opType OpType
576 2 : startNanos int64
577 2 : }
578 2 : var exceededSlots []exceededSlot
579 2 :
580 2 : for {
581 2 : select {
582 2 : case <-ticker.C:
583 2 : // Scan the inflight slots for any slots recording a start
584 2 : // time older than the diskSlowThreshold.
585 2 : exceededSlots = exceededSlots[:0]
586 2 : d.mu.Lock()
587 2 : now := time.Now()
588 2 : for i := range d.mu.inflight {
589 2 : nanos := d.mu.inflight[i].startNanos.Load()
590 2 : if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) {
591 1 : // diskSlowThreshold was exceeded. Copy this inflightOp into
592 1 : // exceededSlots and call d.onSlowDisk after dropping the mutex.
593 1 : inflightOp := exceededSlot{
594 1 : name: d.mu.inflight[i].name,
595 1 : opType: d.mu.inflight[i].opType,
596 1 : startNanos: nanos,
597 1 : }
598 1 : exceededSlots = append(exceededSlots, inflightOp)
599 1 : }
600 : }
601 2 : d.mu.Unlock()
602 2 : for i := range exceededSlots {
603 1 : d.onSlowDisk(
604 1 : DiskSlowInfo{
605 1 : Path: exceededSlots[i].name,
606 1 : OpType: exceededSlots[i].opType,
607 1 : WriteSize: 0, // writes at the fs level are not sized
608 1 : Duration: now.Sub(time.Unix(0, exceededSlots[i].startNanos)),
609 1 : })
610 1 : }
611 2 : case <-stopper:
612 2 : return
613 : }
614 : }
615 : }()
616 : }
617 :
618 : // Close implements io.Closer. Close stops the long-running goroutine that
619 : // monitors for slow filesystem metadata operations. Close may be called
620 : // multiple times. If the filesystem is used after Close has been called, a new
621 : // long-running goroutine will be created.
622 2 : func (d *diskHealthCheckingFS) Close() error {
623 2 : d.mu.Lock()
624 2 : if !d.mu.tickerRunning {
625 2 : // Nothing to stop.
626 2 : d.mu.Unlock()
627 2 : return nil
628 2 : }
629 :
630 : // Grab the stopper so we can request the long-running goroutine to stop.
631 : // Replace the stopper in case this FS is reused. It's possible to Close and
632 : // reuse a disk-health checking FS. This is to accommodate the on-by-default
633 : // behavior in Pebble, and the possibility that users may continue to use
634 : // the Pebble default FS beyond the lifetime of a single DB.
635 2 : stopper := d.mu.stopper
636 2 : d.mu.stopper = make(chan struct{})
637 2 : d.mu.tickerRunning = false
638 2 : d.mu.Unlock()
639 2 :
640 2 : // Ask the long-running goroutine to stop. This is a synchronous channel
641 2 : // send.
642 2 : stopper <- struct{}{}
643 2 : close(stopper)
644 2 : return nil
645 : }
646 :
647 : // Create implements the FS interface.
648 2 : func (d *diskHealthCheckingFS) Create(name string) (File, error) {
649 2 : var f File
650 2 : var err error
651 2 : d.timeFilesystemOp(name, OpTypeCreate, func() {
652 2 : f, err = d.fs.Create(name)
653 2 : }, time.Now().UnixNano())
654 2 : if err != nil {
655 1 : return f, err
656 1 : }
657 2 : if d.diskSlowThreshold == 0 {
658 0 : return f, nil
659 0 : }
660 2 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
661 1 : d.onSlowDisk(
662 1 : DiskSlowInfo{
663 1 : Path: name,
664 1 : OpType: opType,
665 1 : WriteSize: writeSizeInBytes,
666 1 : Duration: duration,
667 1 : })
668 1 : })
669 2 : checkingFile.startTicker()
670 2 : return checkingFile, nil
671 : }
672 :
673 : // GetDiskUsage implements the FS interface.
674 2 : func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
675 2 : return d.fs.GetDiskUsage(path)
676 2 : }
677 :
678 : // Link implements the FS interface.
679 2 : func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
680 2 : var err error
681 2 : d.timeFilesystemOp(newname, OpTypeLink, func() {
682 2 : err = d.fs.Link(oldname, newname)
683 2 : }, time.Now().UnixNano())
684 2 : return err
685 : }
686 :
687 : // List implements the FS interface.
688 2 : func (d *diskHealthCheckingFS) List(dir string) ([]string, error) {
689 2 : return d.fs.List(dir)
690 2 : }
691 :
692 : // Lock implements the FS interface.
693 2 : func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
694 2 : return d.fs.Lock(name)
695 2 : }
696 :
697 : // MkdirAll implements the FS interface.
698 2 : func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
699 2 : var err error
700 2 : d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
701 2 : err = d.fs.MkdirAll(dir, perm)
702 2 : }, time.Now().UnixNano())
703 2 : return err
704 : }
705 :
706 : // Open implements the FS interface.
707 2 : func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) {
708 2 : return d.fs.Open(name, opts...)
709 2 : }
710 :
711 : // OpenReadWrite implements the FS interface.
712 1 : func (d *diskHealthCheckingFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) {
713 1 : return d.fs.OpenReadWrite(name, opts...)
714 1 : }
715 :
716 : // OpenDir implements the FS interface.
717 2 : func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
718 2 : f, err := d.fs.OpenDir(name)
719 2 : if err != nil {
720 0 : return f, err
721 0 : }
722 : // Directories opened with OpenDir must be opened with health checking,
723 : // because they may be explicitly synced.
724 2 : return &diskHealthCheckingDir{
725 2 : File: f,
726 2 : name: name,
727 2 : fs: d,
728 2 : }, nil
729 : }
730 :
731 : // PathBase implements the FS interface.
732 2 : func (d *diskHealthCheckingFS) PathBase(path string) string {
733 2 : return d.fs.PathBase(path)
734 2 : }
735 :
736 : // PathJoin implements the FS interface.
737 2 : func (d *diskHealthCheckingFS) PathJoin(elem ...string) string {
738 2 : return d.fs.PathJoin(elem...)
739 2 : }
740 :
741 : // PathDir implements the FS interface.
742 2 : func (d *diskHealthCheckingFS) PathDir(path string) string {
743 2 : return d.fs.PathDir(path)
744 2 : }
745 :
746 : // Remove implements the FS interface.
747 2 : func (d *diskHealthCheckingFS) Remove(name string) error {
748 2 : var err error
749 2 : d.timeFilesystemOp(name, OpTypeRemove, func() {
750 2 : err = d.fs.Remove(name)
751 2 : }, time.Now().UnixNano())
752 2 : return err
753 : }
754 :
755 : // RemoveAll implements the FS interface.
756 1 : func (d *diskHealthCheckingFS) RemoveAll(name string) error {
757 1 : var err error
758 1 : d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
759 1 : err = d.fs.RemoveAll(name)
760 1 : }, time.Now().UnixNano())
761 1 : return err
762 : }
763 :
764 : // Rename implements the FS interface.
765 2 : func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
766 2 : var err error
767 2 : d.timeFilesystemOp(newname, OpTypeRename, func() {
768 2 : err = d.fs.Rename(oldname, newname)
769 2 : }, time.Now().UnixNano())
770 2 : return err
771 : }
772 :
773 : // ReuseForWrite implements the FS interface.
774 1 : func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) {
775 1 : var f File
776 1 : var err error
777 1 : d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
778 1 : f, err = d.fs.ReuseForWrite(oldname, newname)
779 1 : }, time.Now().UnixNano())
780 1 : if err != nil {
781 1 : return f, err
782 1 : }
783 1 : if d.diskSlowThreshold == 0 {
784 0 : return f, nil
785 0 : }
786 1 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
787 0 : d.onSlowDisk(
788 0 : DiskSlowInfo{
789 0 : Path: newname,
790 0 : OpType: opType,
791 0 : WriteSize: writeSizeInBytes,
792 0 : Duration: duration,
793 0 : })
794 0 : })
795 1 : checkingFile.startTicker()
796 1 : return checkingFile, nil
797 : }
798 :
799 : // Stat implements the FS interface.
800 2 : func (d *diskHealthCheckingFS) Stat(name string) (os.FileInfo, error) {
801 2 : return d.fs.Stat(name)
802 2 : }
803 :
804 : type noopCloser struct{}
805 :
806 0 : func (noopCloser) Close() error { return nil }
|