Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package vfs
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "os"
11 : "path/filepath"
12 : "sync"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/redact"
17 : )
18 :
19 : const (
20 : // preallocatedSlotCount is the default number of slots available for
21 : // concurrent filesystem operations. The slot count may be exceeded, but
22 : // each additional slot will incur an additional allocation. We choose 16
23 : // here with the expectation that it is significantly more than required in
24 : // practice. See the comment above the diskHealthCheckingFS type definition.
25 : preallocatedSlotCount = 16
26 : // deltaBits is the number of bits in the packed 64-bit integer used for
27 : // identifying a delta from the file creation time in milliseconds.
28 : deltaBits = 40
29 : // writeSizeBits is the number of bits in the packed 64-bit integer used for
30 : // identifying the size of the write operation, if the operation is sized. See
31 : // writeSizePrecision below for precision of size.
32 : writeSizeBits = 20
33 : // Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
34 : writeSizePrecision = 1024
35 : )
36 :
37 : // Variables to enable testing.
38 : var (
39 : // defaultTickInterval is the default interval between two ticks of each
40 : // diskHealthCheckingFile loop iteration.
41 : defaultTickInterval = 2 * time.Second
42 : )
43 :
44 : // OpType is the type of IO operation being monitored by a
45 : // diskHealthCheckingFile.
46 : type OpType uint8
47 :
48 : // The following OpTypes is limited to the subset of file system operations that
49 : // a diskHealthCheckingFile supports (namely writes and syncs).
50 : const (
51 : OpTypeUnknown OpType = iota
52 : OpTypeWrite
53 : OpTypeSync
54 : OpTypeSyncData
55 : OpTypeSyncTo
56 : OpTypeCreate
57 : OpTypeLink
58 : OpTypeMkdirAll
59 : OpTypePreallocate
60 : OpTypeRemove
61 : OpTypeRemoveAll
62 : OpTypeRename
63 : OpTypeReuseForWrite
64 : // Note: opTypeMax is just used in tests. It must appear last in the list
65 : // of OpTypes.
66 : opTypeMax
67 : )
68 :
69 : // String implements fmt.Stringer.
70 0 : func (o OpType) String() string {
71 0 : switch o {
72 0 : case OpTypeWrite:
73 0 : return "write"
74 0 : case OpTypeSync:
75 0 : return "sync"
76 0 : case OpTypeSyncData:
77 0 : return "syncdata"
78 0 : case OpTypeSyncTo:
79 0 : return "syncto"
80 0 : case OpTypeCreate:
81 0 : return "create"
82 0 : case OpTypeLink:
83 0 : return "link"
84 0 : case OpTypeMkdirAll:
85 0 : return "mkdirall"
86 0 : case OpTypePreallocate:
87 0 : return "preallocate"
88 0 : case OpTypeRemove:
89 0 : return "remove"
90 0 : case OpTypeRemoveAll:
91 0 : return "removall"
92 0 : case OpTypeRename:
93 0 : return "rename"
94 0 : case OpTypeReuseForWrite:
95 0 : return "reuseforwrite"
96 0 : case OpTypeUnknown:
97 0 : return "unknown"
98 0 : default:
99 0 : panic(fmt.Sprintf("vfs: unknown op type: %d", o))
100 : }
101 : }
102 :
103 : // diskHealthCheckingFile is a File wrapper to detect slow disk operations, and
104 : // call onSlowDisk if a disk operation is seen to exceed diskSlowThreshold.
105 : //
106 : // This struct creates a goroutine (in startTicker()) that, at every tick
107 : // interval, sees if there's a disk operation taking longer than the specified
108 : // duration. This setup is preferable to creating a new timer at every disk
109 : // operation, as it reduces overhead per disk operation.
110 : type diskHealthCheckingFile struct {
111 : file File
112 : onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration)
113 : diskSlowThreshold time.Duration
114 : tickInterval time.Duration
115 :
116 : stopper chan struct{}
117 : // lastWritePacked is a 64-bit unsigned int. The most significant
118 : // 40 bits represent an delta (in milliseconds) from the creation
119 : // time of the diskHealthCheckingFile. The next most significant 20 bits
120 : // represent the size of the write in KBs, if the write has a size. (If
121 : // it doesn't, the 20 bits are zeroed). The least significant four bits
122 : // contains the OpType.
123 : //
124 : // The use of 40 bits for an delta provides ~34 years of effective
125 : // monitoring time before the uint wraps around, at millisecond precision.
126 : // ~34 years of process uptime "ought to be enough for anybody". Millisecond
127 : // writeSizePrecision is sufficient, given that we are monitoring for writes that take
128 : // longer than one millisecond.
129 : //
130 : // The use of 20 bits for the size in KBs allows representing sizes up
131 : // to nearly one GB. If the write is larger than that, we round down to ~one GB.
132 : //
133 : // The use of four bits for OpType allows for 16 operation types.
134 : //
135 : // NB: this packing scheme is not persisted, and is therefore safe to adjust
136 : // across process boundaries.
137 : lastWritePacked atomic.Uint64
138 : createTime time.Time
139 : }
140 :
141 : // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
142 : // specified time threshold and event listener.
143 : func newDiskHealthCheckingFile(
144 : file File,
145 : diskSlowThreshold time.Duration,
146 : onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
147 1 : ) *diskHealthCheckingFile {
148 1 : return &diskHealthCheckingFile{
149 1 : file: file,
150 1 : onSlowDisk: onSlowDisk,
151 1 : diskSlowThreshold: diskSlowThreshold,
152 1 : tickInterval: defaultTickInterval,
153 1 :
154 1 : stopper: make(chan struct{}),
155 1 : createTime: time.Now(),
156 1 : }
157 1 : }
158 :
159 : // startTicker starts a new goroutine with a ticker to monitor disk operations.
160 : // Can only be called if the ticker goroutine isn't running already.
161 1 : func (d *diskHealthCheckingFile) startTicker() {
162 1 : if d.diskSlowThreshold == 0 {
163 0 : return
164 0 : }
165 :
166 1 : go func() {
167 1 : ticker := time.NewTicker(d.tickInterval)
168 1 : defer ticker.Stop()
169 1 :
170 1 : for {
171 1 : select {
172 1 : case <-d.stopper:
173 1 : return
174 :
175 1 : case <-ticker.C:
176 1 : packed := d.lastWritePacked.Load()
177 1 : if packed == 0 {
178 1 : continue
179 : }
180 0 : delta, writeSize, op := unpack(packed)
181 0 : lastWrite := d.createTime.Add(delta)
182 0 : now := time.Now()
183 0 : if lastWrite.Add(d.diskSlowThreshold).Before(now) {
184 0 : // diskSlowThreshold was exceeded. Call the passed-in
185 0 : // listener.
186 0 : d.onSlowDisk(op, writeSize, now.Sub(lastWrite))
187 0 : }
188 : }
189 : }
190 : }()
191 : }
192 :
193 : // stopTicker stops the goroutine started in startTicker.
194 1 : func (d *diskHealthCheckingFile) stopTicker() {
195 1 : close(d.stopper)
196 1 : }
197 :
198 : // Fd implements (vfs.File).Fd.
199 1 : func (d *diskHealthCheckingFile) Fd() uintptr {
200 1 : return d.file.Fd()
201 1 : }
202 :
203 : // Read implements (vfs.File).Read
204 0 : func (d *diskHealthCheckingFile) Read(p []byte) (int, error) {
205 0 : return d.file.Read(p)
206 0 : }
207 :
208 : // ReadAt implements (vfs.File).ReadAt
209 0 : func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {
210 0 : return d.file.ReadAt(p, off)
211 0 : }
212 :
213 : // Write implements the io.Writer interface.
214 1 : func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
215 1 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
216 1 : n, err = d.file.Write(p)
217 1 : })
218 1 : return n, err
219 : }
220 :
221 : // Write implements the io.WriterAt interface.
222 0 : func (d *diskHealthCheckingFile) WriteAt(p []byte, ofs int64) (n int, err error) {
223 0 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
224 0 : n, err = d.file.WriteAt(p, ofs)
225 0 : })
226 0 : return n, err
227 : }
228 :
229 : // Close implements the io.Closer interface.
230 1 : func (d *diskHealthCheckingFile) Close() error {
231 1 : d.stopTicker()
232 1 : return d.file.Close()
233 1 : }
234 :
235 : // Prefetch implements (vfs.File).Prefetch.
236 0 : func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
237 0 : return d.file.Prefetch(offset, length)
238 0 : }
239 :
240 : // Preallocate implements (vfs.File).Preallocate.
241 1 : func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
242 1 : d.timeDiskOp(OpTypePreallocate, n, func() {
243 1 : err = d.file.Preallocate(off, n)
244 1 : })
245 1 : return err
246 : }
247 :
248 : // Stat implements (vfs.File).Stat.
249 0 : func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) {
250 0 : return d.file.Stat()
251 0 : }
252 :
253 : // Sync implements the io.Syncer interface.
254 1 : func (d *diskHealthCheckingFile) Sync() (err error) {
255 1 : d.timeDiskOp(OpTypeSync, 0, func() {
256 1 : err = d.file.Sync()
257 1 : })
258 1 : return err
259 : }
260 :
261 : // SyncData implements (vfs.File).SyncData.
262 1 : func (d *diskHealthCheckingFile) SyncData() (err error) {
263 1 : d.timeDiskOp(OpTypeSyncData, 0, func() {
264 1 : err = d.file.SyncData()
265 1 : })
266 1 : return err
267 : }
268 :
269 : // SyncTo implements (vfs.File).SyncTo.
270 0 : func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
271 0 : d.timeDiskOp(OpTypeSyncTo, length, func() {
272 0 : fullSync, err = d.file.SyncTo(length)
273 0 : })
274 0 : return fullSync, err
275 : }
276 :
277 : // timeDiskOp runs the specified closure and makes its timing visible to the
278 : // monitoring goroutine, in case it exceeds one of the slow disk durations.
279 : // opType should always be set. writeSizeInBytes should be set if the write
280 : // operation is sized. If not, it should be set to zero.
281 1 : func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, writeSizeInBytes int64, op func()) {
282 1 : if d == nil {
283 0 : op()
284 0 : return
285 0 : }
286 :
287 1 : delta := time.Since(d.createTime)
288 1 : packed := pack(delta, writeSizeInBytes, opType)
289 1 : if d.lastWritePacked.Swap(packed) != 0 {
290 0 : panic("concurrent write operations detected on file")
291 : }
292 1 : defer func() {
293 1 : if d.lastWritePacked.Swap(0) != packed {
294 0 : panic("concurrent write operations detected on file")
295 : }
296 : }()
297 1 : op()
298 : }
299 :
300 : // Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
301 : // callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
302 : // safe because the packing scheme implies we only actually need 32 bits.
303 1 : func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
304 1 : // We have no guarantee of clock monotonicity. If we have a small regression
305 1 : // in the clock, we set deltaMillis to zero, so we can still catch the operation
306 1 : // if happens to be slow.
307 1 : deltaMillis := delta.Milliseconds()
308 1 : if deltaMillis < 0 {
309 0 : deltaMillis = 0
310 0 : }
311 : // As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
312 : // of effective monitoring time before the uint wraps around, at millisecond
313 : // precision.
314 1 : if deltaMillis > 1<<deltaBits-1 {
315 0 : panic("vfs: last write delta would result in integer wraparound")
316 : }
317 :
318 : // See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
319 1 : writeSize := writeSizeInBytes / writeSizePrecision
320 1 : // If the size of the write is larger than we can store in the packed int, store the max
321 1 : // value we can store in the packed int.
322 1 : const writeSizeCeiling = 1<<writeSizeBits - 1
323 1 : if writeSize > writeSizeCeiling {
324 0 : writeSize = writeSizeCeiling
325 0 : }
326 :
327 1 : return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
328 : }
329 :
330 0 : func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
331 0 : delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
332 0 : wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
333 0 : // Given the packing scheme, converting wz to an int will not truncate anything.
334 0 : writeSizeInBytes = int(wz)
335 0 : opType = OpType(packed & 0xf)
336 0 : return delta, writeSizeInBytes, opType
337 0 : }
338 :
339 : // diskHealthCheckingDir implements disk-health checking for directories. Unlike
340 : // other files, we allow directories to receive concurrent write operations
341 : // (Syncs are the only write operations supported by a directory.) Since the
342 : // diskHealthCheckingFile's timeDiskOp can only track a single in-flight
343 : // operation at a time, we time the operation using the filesystem-level
344 : // timeFilesystemOp function instead.
345 : type diskHealthCheckingDir struct {
346 : File
347 : name string
348 : fs *diskHealthCheckingFS
349 : }
350 :
351 : // Sync implements the io.Syncer interface.
352 1 : func (d *diskHealthCheckingDir) Sync() (err error) {
353 1 : d.fs.timeFilesystemOp(d.name, OpTypeSync, func() {
354 1 : err = d.File.Sync()
355 1 : })
356 1 : return err
357 : }
358 :
359 : // DiskSlowInfo captures info about detected slow operations on the vfs.
360 : type DiskSlowInfo struct {
361 : // Path of file being written to.
362 : Path string
363 : // Operation being performed on the file.
364 : OpType OpType
365 : // Size of write in bytes, if the write is sized.
366 : WriteSize int
367 : // Duration that has elapsed since this disk operation started.
368 : Duration time.Duration
369 : }
370 :
371 0 : func (i DiskSlowInfo) String() string {
372 0 : return redact.StringWithoutMarkers(i)
373 0 : }
374 :
375 : // SafeFormat implements redact.SafeFormatter.
376 0 : func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
377 0 : switch i.OpType {
378 : // Operations for which i.WriteSize is meaningful.
379 0 : case OpTypeWrite, OpTypeSyncTo, OpTypePreallocate:
380 0 : w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
381 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
382 0 : redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds()))
383 0 : default:
384 0 : w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
385 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
386 0 : redact.Safe(i.Duration.Seconds()))
387 : }
388 : }
389 :
390 : // diskHealthCheckingFS adds disk-health checking facilities to a VFS.
391 : // It times disk write operations in two ways:
392 : //
393 : // 1. Wrapping vfs.Files.
394 : //
395 : // The bulk of write I/O activity is file writing and syncing, invoked through
396 : // the `vfs.File` interface. This VFS wraps all files open for writing with a
397 : // special diskHealthCheckingFile implementation of the vfs.File interface. See
398 : // above for the implementation.
399 : //
400 : // 2. Monitoring filesystem metadata operations.
401 : //
402 : // Filesystem metadata operations (create, link, remove, rename, etc) are also
403 : // sources of disk writes. Unlike a vfs.File which requires Write and Sync calls
404 : // to be sequential, a vfs.FS may receive these filesystem metadata operations
405 : // in parallel. To accommodate this parallelism, the diskHealthCheckingFS's
406 : // write-oriented filesystem operations record their start times into a 'slot'
407 : // on the filesystem. A single long-running goroutine periodically scans the
408 : // slots looking for slow operations.
409 : //
410 : // The number of slots on a diskHealthCheckingFS grows to a working set of the
411 : // maximum concurrent filesystem operations. This is expected to be very few
412 : // for these reasons:
413 : // 1. Pebble has limited write concurrency. Flushes, compactions and WAL
414 : // rotations are the primary sources of filesystem metadata operations. With
415 : // the default max-compaction concurrency, these operations require at most 5
416 : // concurrent slots if all 5 perform a filesystem metadata operation
417 : // simultaneously.
418 : // 2. Pebble's limited concurrent I/O writers spend most of their time
419 : // performing file I/O, not performing the filesystem metadata operations that
420 : // require recording a slot on the diskHealthCheckingFS.
421 : // 3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS
422 : // which provides a separate goroutine and set of slots.
423 : // 4. In CockroachDB, many of the additional sources of filesystem metadata
424 : // operations (like encryption-at-rest) are sequential with respect to Pebble's
425 : // threads.
426 : type diskHealthCheckingFS struct {
427 : tickInterval time.Duration
428 : diskSlowThreshold time.Duration
429 : onSlowDisk func(DiskSlowInfo)
430 : fs FS
431 : mu struct {
432 : sync.Mutex
433 : tickerRunning bool
434 : stopper chan struct{}
435 : inflight []*slot
436 : }
437 : // prealloc preallocates the memory for mu.inflight slots and the slice
438 : // itself. The contained fields are not accessed directly except by
439 : // WithDiskHealthChecks when initializing mu.inflight. The number of slots
440 : // in d.mu.inflight will grow to the maximum number of concurrent file
441 : // metadata operations (create, remove, link, etc). If the number of
442 : // concurrent operations never exceeds preallocatedSlotCount, we'll never
443 : // incur an additional allocation.
444 : prealloc struct {
445 : slots [preallocatedSlotCount]slot
446 : slotPtrSlice [preallocatedSlotCount]*slot
447 : }
448 : }
449 :
450 : type slot struct {
451 : name string
452 : opType OpType
453 : startNanos atomic.Int64
454 : }
455 :
456 : // diskHealthCheckingFS implements FS.
457 : var _ FS = (*diskHealthCheckingFS)(nil)
458 :
459 : // WithDiskHealthChecks wraps an FS and ensures that all write-oriented
460 : // operations on the FS are wrapped with disk health detection checks. Disk
461 : // operations that are observed to take longer than diskSlowThreshold trigger an
462 : // onSlowDisk call.
463 : //
464 : // A threshold of zero disables disk-health checking.
465 : func WithDiskHealthChecks(
466 : innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(info DiskSlowInfo),
467 1 : ) (FS, io.Closer) {
468 1 : if diskSlowThreshold == 0 {
469 0 : return innerFS, noopCloser{}
470 0 : }
471 :
472 1 : fs := &diskHealthCheckingFS{
473 1 : fs: innerFS,
474 1 : tickInterval: defaultTickInterval,
475 1 : diskSlowThreshold: diskSlowThreshold,
476 1 : onSlowDisk: onSlowDisk,
477 1 : }
478 1 : fs.mu.stopper = make(chan struct{})
479 1 : // The fs holds preallocated slots and a preallocated array of slot pointers
480 1 : // with equal length. Initialize the inflight slice to use a slice backed by
481 1 : // the preallocated array with each slot initialized to a preallocated slot.
482 1 : fs.mu.inflight = fs.prealloc.slotPtrSlice[:]
483 1 : for i := range fs.mu.inflight {
484 1 : fs.mu.inflight[i] = &fs.prealloc.slots[i]
485 1 : }
486 1 : return fs, fs
487 : }
488 :
489 1 : func (d *diskHealthCheckingFS) timeFilesystemOp(name string, opType OpType, op func()) {
490 1 : if d == nil {
491 0 : op()
492 0 : return
493 0 : }
494 :
495 : // Record this operation's start time on the FS, so that the long-running
496 : // goroutine can monitor the filesystem operation.
497 : //
498 : // The diskHealthCheckingFile implementation uses a single field that is
499 : // atomically updated, taking advantage of the fact that writes to a single
500 : // vfs.File handle are not performed in parallel. The vfs.FS however may
501 : // receive write filesystem operations in parallel. To accommodate this
502 : // parallelism, writing goroutines append their start time to a
503 : // mutex-protected vector. On ticks, the long-running goroutine scans the
504 : // vector searching for start times older than the slow-disk threshold. When
505 : // a writing goroutine completes its operation, it atomically overwrites its
506 : // slot to signal completion.
507 1 : var s *slot
508 1 : func() {
509 1 : d.mu.Lock()
510 1 : defer d.mu.Unlock()
511 1 :
512 1 : // If there's no long-running goroutine to monitor this filesystem
513 1 : // operation, start one.
514 1 : if !d.mu.tickerRunning {
515 1 : d.startTickerLocked()
516 1 : }
517 :
518 1 : startNanos := time.Now().UnixNano()
519 1 : for i := 0; i < len(d.mu.inflight); i++ {
520 1 : if d.mu.inflight[i].startNanos.Load() == 0 {
521 1 : // This slot is not in use. Claim it.
522 1 : s = d.mu.inflight[i]
523 1 : s.name = name
524 1 : s.opType = opType
525 1 : s.startNanos.Store(startNanos)
526 1 : break
527 : }
528 : }
529 : // If we didn't find any unused slots, create a new slot and append it.
530 : // This slot will exist forever. The number of slots will grow to the
531 : // maximum number of concurrent filesystem operations over the lifetime
532 : // of the process. Only operations that grow the number of slots must
533 : // incur an allocation.
534 1 : if s == nil {
535 0 : s = &slot{
536 0 : name: name,
537 0 : opType: opType,
538 0 : }
539 0 : s.startNanos.Store(startNanos)
540 0 : d.mu.inflight = append(d.mu.inflight, s)
541 0 : }
542 : }()
543 :
544 1 : op()
545 1 :
546 1 : // Signal completion by zeroing the start time.
547 1 : s.startNanos.Store(0)
548 : }
549 :
550 : // startTickerLocked starts a new goroutine with a ticker to monitor disk
551 : // filesystem operations. Requires d.mu and !d.mu.tickerRunning.
552 1 : func (d *diskHealthCheckingFS) startTickerLocked() {
553 1 : d.mu.tickerRunning = true
554 1 : stopper := d.mu.stopper
555 1 : go func() {
556 1 : ticker := time.NewTicker(d.tickInterval)
557 1 : defer ticker.Stop()
558 1 : type exceededSlot struct {
559 1 : name string
560 1 : opType OpType
561 1 : startNanos int64
562 1 : }
563 1 : var exceededSlots []exceededSlot
564 1 :
565 1 : for {
566 1 : select {
567 1 : case <-ticker.C:
568 1 : // Scan the inflight slots for any slots recording a start
569 1 : // time older than the diskSlowThreshold.
570 1 : exceededSlots = exceededSlots[:0]
571 1 : d.mu.Lock()
572 1 : now := time.Now()
573 1 : for i := range d.mu.inflight {
574 1 : nanos := d.mu.inflight[i].startNanos.Load()
575 1 : if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) {
576 0 : // diskSlowThreshold was exceeded. Copy this inflightOp into
577 0 : // exceededSlots and call d.onSlowDisk after dropping the mutex.
578 0 : inflightOp := exceededSlot{
579 0 : name: d.mu.inflight[i].name,
580 0 : opType: d.mu.inflight[i].opType,
581 0 : startNanos: nanos,
582 0 : }
583 0 : exceededSlots = append(exceededSlots, inflightOp)
584 0 : }
585 : }
586 1 : d.mu.Unlock()
587 1 : for i := range exceededSlots {
588 0 : d.onSlowDisk(
589 0 : DiskSlowInfo{
590 0 : Path: exceededSlots[i].name,
591 0 : OpType: exceededSlots[i].opType,
592 0 : WriteSize: 0, // writes at the fs level are not sized
593 0 : Duration: now.Sub(time.Unix(0, exceededSlots[i].startNanos)),
594 0 : })
595 0 : }
596 1 : case <-stopper:
597 1 : return
598 : }
599 : }
600 : }()
601 : }
602 :
603 : // Close implements io.Closer. Close stops the long-running goroutine that
604 : // monitors for slow filesystem metadata operations. Close may be called
605 : // multiple times. If the filesystem is used after Close has been called, a new
606 : // long-running goroutine will be created.
607 1 : func (d *diskHealthCheckingFS) Close() error {
608 1 : d.mu.Lock()
609 1 : if !d.mu.tickerRunning {
610 0 : // Nothing to stop.
611 0 : d.mu.Unlock()
612 0 : return nil
613 0 : }
614 :
615 : // Grab the stopper so we can request the long-running goroutine to stop.
616 : // Replace the stopper in case this FS is reused. It's possible to Close and
617 : // reuse a disk-health checking FS. This is to accommodate the on-by-default
618 : // behavior in Pebble, and the possibility that users may continue to use
619 : // the Pebble default FS beyond the lifetime of a single DB.
620 1 : stopper := d.mu.stopper
621 1 : d.mu.stopper = make(chan struct{})
622 1 : d.mu.tickerRunning = false
623 1 : d.mu.Unlock()
624 1 :
625 1 : // Ask the long-running goroutine to stop. This is a synchronous channel
626 1 : // send.
627 1 : stopper <- struct{}{}
628 1 : close(stopper)
629 1 : return nil
630 : }
631 :
632 : // Create implements the FS interface.
633 1 : func (d *diskHealthCheckingFS) Create(name string) (File, error) {
634 1 : var f File
635 1 : var err error
636 1 : d.timeFilesystemOp(name, OpTypeCreate, func() {
637 1 : f, err = d.fs.Create(name)
638 1 : })
639 1 : if err != nil {
640 0 : return f, err
641 0 : }
642 1 : if d.diskSlowThreshold == 0 {
643 0 : return f, nil
644 0 : }
645 1 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
646 0 : d.onSlowDisk(
647 0 : DiskSlowInfo{
648 0 : Path: name,
649 0 : OpType: opType,
650 0 : WriteSize: writeSizeInBytes,
651 0 : Duration: duration,
652 0 : })
653 0 : })
654 1 : checkingFile.startTicker()
655 1 : return checkingFile, nil
656 : }
657 :
658 : // GetDiskUsage implements the FS interface.
659 1 : func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
660 1 : return d.fs.GetDiskUsage(path)
661 1 : }
662 :
663 : // Link implements the FS interface.
664 1 : func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
665 1 : var err error
666 1 : d.timeFilesystemOp(newname, OpTypeLink, func() {
667 1 : err = d.fs.Link(oldname, newname)
668 1 : })
669 1 : return err
670 : }
671 :
672 : // List implements the FS interface.
673 1 : func (d *diskHealthCheckingFS) List(dir string) ([]string, error) {
674 1 : return d.fs.List(dir)
675 1 : }
676 :
677 : // Lock implements the FS interface.
678 1 : func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
679 1 : return d.fs.Lock(name)
680 1 : }
681 :
682 : // MkdirAll implements the FS interface.
683 1 : func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
684 1 : var err error
685 1 : d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
686 1 : err = d.fs.MkdirAll(dir, perm)
687 1 : })
688 1 : return err
689 : }
690 :
691 : // Open implements the FS interface.
692 1 : func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) {
693 1 : return d.fs.Open(name, opts...)
694 1 : }
695 :
696 : // OpenReadWrite implements the FS interface.
697 1 : func (d *diskHealthCheckingFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) {
698 1 : return d.fs.OpenReadWrite(name, opts...)
699 1 : }
700 :
701 : // OpenDir implements the FS interface.
702 1 : func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
703 1 : f, err := d.fs.OpenDir(name)
704 1 : if err != nil {
705 0 : return f, err
706 0 : }
707 : // Directories opened with OpenDir must be opened with health checking,
708 : // because they may be explicitly synced.
709 1 : return &diskHealthCheckingDir{
710 1 : File: f,
711 1 : name: name,
712 1 : fs: d,
713 1 : }, nil
714 : }
715 :
716 : // PathBase implements the FS interface.
717 1 : func (d *diskHealthCheckingFS) PathBase(path string) string {
718 1 : return d.fs.PathBase(path)
719 1 : }
720 :
721 : // PathJoin implements the FS interface.
722 1 : func (d *diskHealthCheckingFS) PathJoin(elem ...string) string {
723 1 : return d.fs.PathJoin(elem...)
724 1 : }
725 :
726 : // PathDir implements the FS interface.
727 1 : func (d *diskHealthCheckingFS) PathDir(path string) string {
728 1 : return d.fs.PathDir(path)
729 1 : }
730 :
731 : // Remove implements the FS interface.
732 1 : func (d *diskHealthCheckingFS) Remove(name string) error {
733 1 : var err error
734 1 : d.timeFilesystemOp(name, OpTypeRemove, func() {
735 1 : err = d.fs.Remove(name)
736 1 : })
737 1 : return err
738 : }
739 :
740 : // RemoveAll implements the FS interface.
741 0 : func (d *diskHealthCheckingFS) RemoveAll(name string) error {
742 0 : var err error
743 0 : d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
744 0 : err = d.fs.RemoveAll(name)
745 0 : })
746 0 : return err
747 : }
748 :
749 : // Rename implements the FS interface.
750 1 : func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
751 1 : var err error
752 1 : d.timeFilesystemOp(newname, OpTypeRename, func() {
753 1 : err = d.fs.Rename(oldname, newname)
754 1 : })
755 1 : return err
756 : }
757 :
758 : // ReuseForWrite implements the FS interface.
759 0 : func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) {
760 0 : var f File
761 0 : var err error
762 0 : d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
763 0 : f, err = d.fs.ReuseForWrite(oldname, newname)
764 0 : })
765 0 : if err != nil {
766 0 : return f, err
767 0 : }
768 0 : if d.diskSlowThreshold == 0 {
769 0 : return f, nil
770 0 : }
771 0 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
772 0 : d.onSlowDisk(
773 0 : DiskSlowInfo{
774 0 : Path: newname,
775 0 : OpType: opType,
776 0 : WriteSize: writeSizeInBytes,
777 0 : Duration: duration,
778 0 : })
779 0 : })
780 0 : checkingFile.startTicker()
781 0 : return checkingFile, nil
782 : }
783 :
784 : // Stat implements the FS interface.
785 1 : func (d *diskHealthCheckingFS) Stat(name string) (os.FileInfo, error) {
786 1 : return d.fs.Stat(name)
787 1 : }
788 :
789 : type noopCloser struct{}
790 :
791 0 : func (noopCloser) Close() error { return nil }
|