Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package vfs
6 :
7 : import (
8 : "cmp"
9 : "fmt"
10 : "io"
11 : "os"
12 : "path/filepath"
13 : "slices"
14 : "sync"
15 : "sync/atomic"
16 : "time"
17 :
18 : "github.com/cockroachdb/redact"
19 : )
20 :
21 : const (
22 : // preallocatedSlotCount is the default number of slots available for
23 : // concurrent filesystem operations. The slot count may be exceeded, but
24 : // each additional slot will incur an additional allocation. We choose 16
25 : // here with the expectation that it is significantly more than required in
26 : // practice. See the comment above the diskHealthCheckingFS type definition.
27 : preallocatedSlotCount = 16
28 : // deltaBits is the number of bits in the packed 64-bit integer used for
29 : // identifying a delta from the file creation time in milliseconds.
30 : deltaBits = 40
31 : // writeSizeBits is the number of bits in the packed 64-bit integer used for
32 : // identifying the size of the write operation, if the operation is sized. See
33 : // writeSizePrecision below for precision of size.
34 : writeSizeBits = 20
35 : // Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
36 : writeSizePrecision = 1024
37 : )
38 :
39 : // Variables to enable testing.
40 : var (
41 : // defaultTickInterval is the default interval between two ticks of each
42 : // diskHealthCheckingFile loop iteration.
43 : defaultTickInterval = 2 * time.Second
44 : )
45 :
46 : // OpType is the type of IO operation being monitored by a
47 : // diskHealthCheckingFile.
48 : type OpType uint8
49 :
50 : // The following OpTypes is limited to the subset of file system operations that
51 : // a diskHealthCheckingFile supports (namely writes and syncs).
52 : const (
53 : OpTypeUnknown OpType = iota
54 : OpTypeWrite
55 : OpTypeSync
56 : OpTypeSyncData
57 : OpTypeSyncTo
58 : OpTypeCreate
59 : OpTypeLink
60 : OpTypeMkdirAll
61 : OpTypePreallocate
62 : OpTypeRemove
63 : OpTypeRemoveAll
64 : OpTypeRename
65 : OpTypeReuseForWrite
66 : // Note: opTypeMax is just used in tests. It must appear last in the list
67 : // of OpTypes.
68 : opTypeMax
69 : )
70 :
71 : // String implements fmt.Stringer.
72 1 : func (o OpType) String() string {
73 1 : switch o {
74 1 : case OpTypeWrite:
75 1 : return "write"
76 1 : case OpTypeSync:
77 1 : return "sync"
78 1 : case OpTypeSyncData:
79 1 : return "syncdata"
80 1 : case OpTypeSyncTo:
81 1 : return "syncto"
82 1 : case OpTypeCreate:
83 1 : return "create"
84 1 : case OpTypeLink:
85 1 : return "link"
86 1 : case OpTypeMkdirAll:
87 1 : return "mkdirall"
88 1 : case OpTypePreallocate:
89 1 : return "preallocate"
90 1 : case OpTypeRemove:
91 1 : return "remove"
92 1 : case OpTypeRemoveAll:
93 1 : return "removall"
94 1 : case OpTypeRename:
95 1 : return "rename"
96 1 : case OpTypeReuseForWrite:
97 1 : return "reuseforwrite"
98 1 : case OpTypeUnknown:
99 1 : return "unknown"
100 0 : default:
101 0 : panic(fmt.Sprintf("vfs: unknown op type: %d", o))
102 : }
103 : }
104 :
105 : // DiskWriteCategory is a user-understandable string used to identify and aggregate
106 : // stats for disk writes. The prefix "pebble-" is reserved for internal Pebble categories.
107 : //
108 : // Some examples include, pebble-wal, pebble-memtable-flush, pebble-manifest and in the
109 : // Cockroach context includes, sql-spill, range-snapshot, node-log.
110 : type DiskWriteCategory string
111 :
112 : // WriteCategoryUnspecified denotes a disk write without a significant category.
113 : const WriteCategoryUnspecified = "unspecified"
114 :
115 : // DiskWriteStatsAggregate is an aggregate of the bytes written to disk for a given category.
116 : type DiskWriteStatsAggregate struct {
117 : Category DiskWriteCategory
118 : BytesWritten uint64
119 : }
120 :
121 : // DiskWriteStatsCollector collects and aggregates disk write metrics per category.
122 : type DiskWriteStatsCollector struct {
123 : mu sync.Mutex
124 : statsMap map[DiskWriteCategory]*atomic.Uint64
125 : }
126 :
127 : // NewDiskWriteStatsCollector instantiates a new DiskWriteStatsCollector.
128 1 : func NewDiskWriteStatsCollector() *DiskWriteStatsCollector {
129 1 : return &DiskWriteStatsCollector{
130 1 : statsMap: make(map[DiskWriteCategory]*atomic.Uint64),
131 1 : }
132 1 : }
133 :
134 : // GetStats returns the aggregated metrics for all categories.
135 1 : func (d *DiskWriteStatsCollector) GetStats() []DiskWriteStatsAggregate {
136 1 : var stats []DiskWriteStatsAggregate
137 1 : d.mu.Lock()
138 1 : for category, numBytes := range d.statsMap {
139 1 : stats = append(stats, DiskWriteStatsAggregate{
140 1 : Category: category,
141 1 : BytesWritten: numBytes.Load(),
142 1 : })
143 1 : }
144 1 : d.mu.Unlock()
145 1 : slices.SortFunc(stats, func(a, b DiskWriteStatsAggregate) int { return cmp.Compare(a.Category, b.Category) })
146 1 : return stats
147 : }
148 :
149 : // diskHealthCheckingFile is a File wrapper to collect disk write stats,
150 : // detect slow disk operations, and call onSlowDisk if a disk operation
151 : // is seen to exceed diskSlowThreshold.
152 : //
153 : // This struct creates a goroutine (in startTicker()) that, at every tick
154 : // interval, sees if there's a disk operation taking longer than the specified
155 : // duration. This setup is preferable to creating a new timer at every disk
156 : // operation, as it reduces overhead per disk operation.
157 : type diskHealthCheckingFile struct {
158 : file File
159 : onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration)
160 : diskSlowThreshold time.Duration
161 : tickInterval time.Duration
162 :
163 : stopper chan struct{}
164 : // lastWritePacked is a 64-bit unsigned int. The most significant
165 : // 40 bits represent an delta (in milliseconds) from the creation
166 : // time of the diskHealthCheckingFile. The next most significant 20 bits
167 : // represent the size of the write in KBs, if the write has a size. (If
168 : // it doesn't, the 20 bits are zeroed). The least significant four bits
169 : // contains the OpType.
170 : //
171 : // The use of 40 bits for an delta provides ~34 years of effective
172 : // monitoring time before the uint wraps around, at millisecond precision.
173 : // ~34 years of process uptime "ought to be enough for anybody". Millisecond
174 : // writeSizePrecision is sufficient, given that we are monitoring for writes that take
175 : // longer than one millisecond.
176 : //
177 : // The use of 20 bits for the size in KBs allows representing sizes up
178 : // to nearly one GB. If the write is larger than that, we round down to ~one GB.
179 : //
180 : // The use of four bits for OpType allows for 16 operation types.
181 : //
182 : // NB: this packing scheme is not persisted, and is therefore safe to adjust
183 : // across process boundaries.
184 : lastWritePacked atomic.Uint64
185 : createTimeNanos int64
186 :
187 : // aggBytesWritten points to an atomic that aggregates the bytes written
188 : // for files that belong to a specific DiskWriteCategory. This pointer is also stored in the
189 : // DiskWriteStatsCollector for metric collection.
190 : aggBytesWritten *atomic.Uint64
191 : }
192 :
193 : // diskHealthCheckingFile implements File.
194 : var _ File = (*diskHealthCheckingFile)(nil)
195 :
196 : // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
197 : // specified time threshold and event listener.
198 : func newDiskHealthCheckingFile(
199 : file File,
200 : diskSlowThreshold time.Duration,
201 : category DiskWriteCategory,
202 : statsCollector *DiskWriteStatsCollector,
203 : onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
204 1 : ) *diskHealthCheckingFile {
205 1 : var bytesWritten *atomic.Uint64
206 1 : if statsCollector != nil {
207 1 : statsCollector.mu.Lock()
208 1 : if aggStats, ok := statsCollector.statsMap[category]; !ok {
209 1 : bytesWritten = new(atomic.Uint64)
210 1 : statsCollector.statsMap[category] = bytesWritten
211 1 : } else {
212 0 : bytesWritten = aggStats
213 0 : }
214 1 : statsCollector.mu.Unlock()
215 1 : } else {
216 1 : bytesWritten = new(atomic.Uint64)
217 1 : }
218 1 : return &diskHealthCheckingFile{
219 1 : file: file,
220 1 : onSlowDisk: onSlowDisk,
221 1 : diskSlowThreshold: diskSlowThreshold,
222 1 : tickInterval: defaultTickInterval,
223 1 :
224 1 : stopper: make(chan struct{}),
225 1 : createTimeNanos: time.Now().UnixNano(),
226 1 :
227 1 : aggBytesWritten: bytesWritten,
228 1 : }
229 : }
230 :
231 : // startTicker starts a new goroutine with a ticker to monitor disk operations.
232 : // Can only be called if the ticker goroutine isn't running already.
233 1 : func (d *diskHealthCheckingFile) startTicker() {
234 1 : if d.diskSlowThreshold == 0 {
235 0 : return
236 0 : }
237 :
238 1 : go func() {
239 1 : ticker := time.NewTicker(d.tickInterval)
240 1 : defer ticker.Stop()
241 1 :
242 1 : for {
243 1 : select {
244 1 : case <-d.stopper:
245 1 : return
246 :
247 1 : case <-ticker.C:
248 1 : packed := d.lastWritePacked.Load()
249 1 : if packed == 0 {
250 1 : continue
251 : }
252 1 : delta, writeSize, op := unpack(packed)
253 1 : lastWrite := time.Unix(0, d.createTimeNanos+delta.Nanoseconds())
254 1 : now := time.Now()
255 1 : if lastWrite.Add(d.diskSlowThreshold).Before(now) {
256 1 : // diskSlowThreshold was exceeded. Call the passed-in
257 1 : // listener.
258 1 : d.onSlowDisk(op, writeSize, now.Sub(lastWrite))
259 1 : }
260 : }
261 : }
262 : }()
263 : }
264 :
265 : // stopTicker stops the goroutine started in startTicker.
266 1 : func (d *diskHealthCheckingFile) stopTicker() {
267 1 : close(d.stopper)
268 1 : }
269 :
270 : // Fd implements (vfs.File).Fd.
271 1 : func (d *diskHealthCheckingFile) Fd() uintptr {
272 1 : return d.file.Fd()
273 1 : }
274 :
275 : // Read implements (vfs.File).Read
276 0 : func (d *diskHealthCheckingFile) Read(p []byte) (int, error) {
277 0 : return d.file.Read(p)
278 0 : }
279 :
280 : // ReadAt implements (vfs.File).ReadAt
281 0 : func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {
282 0 : return d.file.ReadAt(p, off)
283 0 : }
284 :
285 : // Write implements the io.Writer interface.
286 1 : func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
287 1 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
288 1 : n, err = d.file.Write(p)
289 1 : }, time.Now().UnixNano())
290 1 : d.aggBytesWritten.Add(uint64(n))
291 1 : return n, err
292 : }
293 :
294 : // WriteAt implements the io.WriterAt interface.
295 0 : func (d *diskHealthCheckingFile) WriteAt(p []byte, ofs int64) (n int, err error) {
296 0 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
297 0 : n, err = d.file.WriteAt(p, ofs)
298 0 : }, time.Now().UnixNano())
299 0 : d.aggBytesWritten.Add(uint64(n))
300 0 : return n, err
301 : }
302 :
303 : // Close implements the io.Closer interface.
304 1 : func (d *diskHealthCheckingFile) Close() error {
305 1 : d.stopTicker()
306 1 : return d.file.Close()
307 1 : }
308 :
309 : // Prefetch implements (vfs.File).Prefetch.
310 0 : func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
311 0 : return d.file.Prefetch(offset, length)
312 0 : }
313 :
314 : // Preallocate implements (vfs.File).Preallocate.
315 1 : func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
316 1 : d.timeDiskOp(OpTypePreallocate, n, func() {
317 1 : err = d.file.Preallocate(off, n)
318 1 : }, time.Now().UnixNano())
319 1 : return err
320 : }
321 :
322 : // Stat implements (vfs.File).Stat.
323 1 : func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) {
324 1 : return d.file.Stat()
325 1 : }
326 :
327 : // Sync implements the io.Syncer interface.
328 1 : func (d *diskHealthCheckingFile) Sync() (err error) {
329 1 : d.timeDiskOp(OpTypeSync, 0, func() {
330 1 : err = d.file.Sync()
331 1 : }, time.Now().UnixNano())
332 1 : return err
333 : }
334 :
335 : // SyncData implements (vfs.File).SyncData.
336 1 : func (d *diskHealthCheckingFile) SyncData() (err error) {
337 1 : d.timeDiskOp(OpTypeSyncData, 0, func() {
338 1 : err = d.file.SyncData()
339 1 : }, time.Now().UnixNano())
340 1 : return err
341 : }
342 :
343 : // SyncTo implements (vfs.File).SyncTo.
344 0 : func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
345 0 : d.timeDiskOp(OpTypeSyncTo, length, func() {
346 0 : fullSync, err = d.file.SyncTo(length)
347 0 : }, time.Now().UnixNano())
348 0 : return fullSync, err
349 : }
350 :
351 : // timeDiskOp runs the specified closure and makes its timing visible to the
352 : // monitoring goroutine, in case it exceeds one of the slow disk durations.
353 : // opType should always be set. writeSizeInBytes should be set if the write
354 : // operation is sized. If not, it should be set to zero.
355 : //
356 : // The start time is taken as a parameter in the form of nanoseconds since the
357 : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
358 : // is set appropriately), aiding postmortem debugging.
359 : func (d *diskHealthCheckingFile) timeDiskOp(
360 : opType OpType, writeSizeInBytes int64, op func(), startNanos int64,
361 1 : ) {
362 1 : if d == nil || d.diskSlowThreshold == 0 {
363 1 : op()
364 1 : return
365 1 : }
366 :
367 1 : delta := time.Duration(startNanos - d.createTimeNanos)
368 1 : packed := pack(delta, writeSizeInBytes, opType)
369 1 : if d.lastWritePacked.Swap(packed) != 0 {
370 0 : panic("concurrent write operations detected on file")
371 : }
372 1 : defer func() {
373 1 : if d.lastWritePacked.Swap(0) != packed {
374 0 : panic("concurrent write operations detected on file")
375 : }
376 : }()
377 1 : op()
378 : }
379 :
380 : // Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
381 : // callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
382 : // safe because the packing scheme implies we only actually need 32 bits.
383 1 : func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
384 1 : // We have no guarantee of clock monotonicity. If we have a small regression
385 1 : // in the clock, we set deltaMillis to zero, so we can still catch the operation
386 1 : // if happens to be slow.
387 1 : deltaMillis := delta.Milliseconds()
388 1 : if deltaMillis < 0 {
389 0 : deltaMillis = 0
390 0 : }
391 : // As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
392 : // of effective monitoring time before the uint wraps around, at millisecond
393 : // precision.
394 1 : if deltaMillis > 1<<deltaBits-1 {
395 1 : panic("vfs: last write delta would result in integer wraparound")
396 : }
397 :
398 : // See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
399 1 : writeSize := writeSizeInBytes / writeSizePrecision
400 1 : // If the size of the write is larger than we can store in the packed int, store the max
401 1 : // value we can store in the packed int.
402 1 : const writeSizeCeiling = 1<<writeSizeBits - 1
403 1 : if writeSize > writeSizeCeiling {
404 1 : writeSize = writeSizeCeiling
405 1 : }
406 :
407 1 : return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
408 : }
409 :
410 1 : func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
411 1 : delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
412 1 : wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
413 1 : // Given the packing scheme, converting wz to an int will not truncate anything.
414 1 : writeSizeInBytes = int(wz)
415 1 : opType = OpType(packed & 0xf)
416 1 : return delta, writeSizeInBytes, opType
417 1 : }
418 :
419 : // diskHealthCheckingDir implements disk-health checking for directories. Unlike
420 : // other files, we allow directories to receive concurrent write operations
421 : // (Syncs are the only write operations supported by a directory.) Since the
422 : // diskHealthCheckingFile's timeDiskOp can only track a single in-flight
423 : // operation at a time, we time the operation using the filesystem-level
424 : // timeFilesystemOp function instead.
425 : type diskHealthCheckingDir struct {
426 : File
427 : name string
428 : fs *diskHealthCheckingFS
429 : }
430 :
431 : // Sync implements the io.Syncer interface.
432 1 : func (d *diskHealthCheckingDir) Sync() (err error) {
433 1 : d.fs.timeFilesystemOp(d.name, OpTypeSync, func() {
434 1 : err = d.File.Sync()
435 1 : }, time.Now().UnixNano())
436 1 : return err
437 : }
438 :
439 : // DiskSlowInfo captures info about detected slow operations on the vfs.
440 : type DiskSlowInfo struct {
441 : // Path of file being written to.
442 : Path string
443 : // Operation being performed on the file.
444 : OpType OpType
445 : // Size of write in bytes, if the write is sized.
446 : WriteSize int
447 : // Duration that has elapsed since this disk operation started.
448 : Duration time.Duration
449 : }
450 :
451 0 : func (i DiskSlowInfo) String() string {
452 0 : return redact.StringWithoutMarkers(i)
453 0 : }
454 :
455 : // SafeFormat implements redact.SafeFormatter.
456 0 : func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
457 0 : switch i.OpType {
458 : // Operations for which i.WriteSize is meaningful.
459 0 : case OpTypeWrite, OpTypeSyncTo, OpTypePreallocate:
460 0 : w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
461 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
462 0 : redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds()))
463 0 : default:
464 0 : w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
465 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
466 0 : redact.Safe(i.Duration.Seconds()))
467 : }
468 : }
469 :
470 : // diskHealthCheckingFS adds disk-health checking facilities to a VFS.
471 : // It times disk write operations in two ways:
472 : //
473 : // 1. Wrapping vfs.Files.
474 : //
475 : // The bulk of write I/O activity is file writing and syncing, invoked through
476 : // the `vfs.File` interface. This VFS wraps all files open for writing with a
477 : // special diskHealthCheckingFile implementation of the vfs.File interface. See
478 : // above for the implementation.
479 : //
480 : // 2. Monitoring filesystem metadata operations.
481 : //
482 : // Filesystem metadata operations (create, link, remove, rename, etc) are also
483 : // sources of disk writes. Unlike a vfs.File which requires Write and Sync calls
484 : // to be sequential, a vfs.FS may receive these filesystem metadata operations
485 : // in parallel. To accommodate this parallelism, the diskHealthCheckingFS's
486 : // write-oriented filesystem operations record their start times into a 'slot'
487 : // on the filesystem. A single long-running goroutine periodically scans the
488 : // slots looking for slow operations.
489 : //
490 : // The number of slots on a diskHealthCheckingFS grows to a working set of the
491 : // maximum concurrent filesystem operations. This is expected to be very few
492 : // for these reasons:
493 : // 1. Pebble has limited write concurrency. Flushes, compactions and WAL
494 : // rotations are the primary sources of filesystem metadata operations. With
495 : // the default max-compaction concurrency, these operations require at most 5
496 : // concurrent slots if all 5 perform a filesystem metadata operation
497 : // simultaneously.
498 : // 2. Pebble's limited concurrent I/O writers spend most of their time
499 : // performing file I/O, not performing the filesystem metadata operations that
500 : // require recording a slot on the diskHealthCheckingFS.
501 : // 3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS
502 : // which provides a separate goroutine and set of slots.
503 : // 4. In CockroachDB, many of the additional sources of filesystem metadata
504 : // operations (like encryption-at-rest) are sequential with respect to Pebble's
505 : // threads.
506 : type diskHealthCheckingFS struct {
507 : tickInterval time.Duration
508 : diskSlowThreshold time.Duration
509 : statsCollector *DiskWriteStatsCollector
510 : onSlowDisk func(DiskSlowInfo)
511 : fs FS
512 : mu struct {
513 : sync.Mutex
514 : tickerRunning bool
515 : stopper chan struct{}
516 : inflight []*slot
517 : }
518 : // prealloc preallocates the memory for mu.inflight slots and the slice
519 : // itself. The contained fields are not accessed directly except by
520 : // WithDiskHealthChecks when initializing mu.inflight. The number of slots
521 : // in d.mu.inflight will grow to the maximum number of concurrent file
522 : // metadata operations (create, remove, link, etc). If the number of
523 : // concurrent operations never exceeds preallocatedSlotCount, we'll never
524 : // incur an additional allocation.
525 : prealloc struct {
526 : slots [preallocatedSlotCount]slot
527 : slotPtrSlice [preallocatedSlotCount]*slot
528 : }
529 : }
530 :
531 : type slot struct {
532 : name string
533 : opType OpType
534 : startNanos atomic.Int64
535 : }
536 :
537 : // diskHealthCheckingFS implements FS.
538 : var _ FS = (*diskHealthCheckingFS)(nil)
539 :
540 : // WithDiskHealthChecks wraps an FS and ensures that all write-oriented
541 : // operations on the FS are wrapped with disk health detection checks and
542 : // aggregated. Disk operations that are observed to take longer than
543 : // diskSlowThreshold trigger an onSlowDisk call.
544 : //
545 : // A threshold of zero disables disk-health checking.
546 : func WithDiskHealthChecks(
547 : innerFS FS,
548 : diskSlowThreshold time.Duration,
549 : statsCollector *DiskWriteStatsCollector,
550 : onSlowDisk func(info DiskSlowInfo),
551 1 : ) (FS, io.Closer) {
552 1 : if diskSlowThreshold == 0 {
553 0 : return innerFS, noopCloser{}
554 0 : }
555 :
556 1 : fs := &diskHealthCheckingFS{
557 1 : fs: innerFS,
558 1 : tickInterval: defaultTickInterval,
559 1 : diskSlowThreshold: diskSlowThreshold,
560 1 : statsCollector: statsCollector,
561 1 : onSlowDisk: onSlowDisk,
562 1 : }
563 1 : fs.mu.stopper = make(chan struct{})
564 1 : // The fs holds preallocated slots and a preallocated array of slot pointers
565 1 : // with equal length. Initialize the inflight slice to use a slice backed by
566 1 : // the preallocated array with each slot initialized to a preallocated slot.
567 1 : fs.mu.inflight = fs.prealloc.slotPtrSlice[:]
568 1 : for i := range fs.mu.inflight {
569 1 : fs.mu.inflight[i] = &fs.prealloc.slots[i]
570 1 : }
571 1 : return fs, fs
572 : }
573 :
574 : // timeFilesystemOp executes the provided closure, which should perform a
575 : // singular filesystem operation of a type matching opType on the named file. It
576 : // records the provided start time such that the long-lived disk-health checking
577 : // goroutine can observe if the operation is blocked for an inordinate time.
578 : //
579 : // The start time is taken as a parameter in the form of nanoseconds since the
580 : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
581 : // is set appropriately), aiding postmortem debugging.
582 : func (d *diskHealthCheckingFS) timeFilesystemOp(
583 : name string, opType OpType, op func(), startNanos int64,
584 1 : ) {
585 1 : if d == nil {
586 0 : op()
587 0 : return
588 0 : }
589 :
590 : // Record this operation's start time on the FS, so that the long-running
591 : // goroutine can monitor the filesystem operation.
592 : //
593 : // The diskHealthCheckingFile implementation uses a single field that is
594 : // atomically updated, taking advantage of the fact that writes to a single
595 : // vfs.File handle are not performed in parallel. The vfs.FS however may
596 : // receive write filesystem operations in parallel. To accommodate this
597 : // parallelism, writing goroutines append their start time to a
598 : // mutex-protected vector. On ticks, the long-running goroutine scans the
599 : // vector searching for start times older than the slow-disk threshold. When
600 : // a writing goroutine completes its operation, it atomically overwrites its
601 : // slot to signal completion.
602 1 : var s *slot
603 1 : func() {
604 1 : d.mu.Lock()
605 1 : defer d.mu.Unlock()
606 1 :
607 1 : // If there's no long-running goroutine to monitor this filesystem
608 1 : // operation, start one.
609 1 : if !d.mu.tickerRunning {
610 1 : d.startTickerLocked()
611 1 : }
612 :
613 1 : for i := 0; i < len(d.mu.inflight); i++ {
614 1 : if d.mu.inflight[i].startNanos.Load() == 0 {
615 1 : // This slot is not in use. Claim it.
616 1 : s = d.mu.inflight[i]
617 1 : s.name = name
618 1 : s.opType = opType
619 1 : s.startNanos.Store(startNanos)
620 1 : break
621 : }
622 : }
623 : // If we didn't find any unused slots, create a new slot and append it.
624 : // This slot will exist forever. The number of slots will grow to the
625 : // maximum number of concurrent filesystem operations over the lifetime
626 : // of the process. Only operations that grow the number of slots must
627 : // incur an allocation.
628 1 : if s == nil {
629 0 : s = &slot{
630 0 : name: name,
631 0 : opType: opType,
632 0 : }
633 0 : s.startNanos.Store(startNanos)
634 0 : d.mu.inflight = append(d.mu.inflight, s)
635 0 : }
636 : }()
637 :
638 1 : op()
639 1 :
640 1 : // Signal completion by zeroing the start time.
641 1 : s.startNanos.Store(0)
642 : }
643 :
644 : // startTickerLocked starts a new goroutine with a ticker to monitor disk
645 : // filesystem operations. Requires d.mu and !d.mu.tickerRunning.
646 1 : func (d *diskHealthCheckingFS) startTickerLocked() {
647 1 : d.mu.tickerRunning = true
648 1 : stopper := d.mu.stopper
649 1 : go func() {
650 1 : ticker := time.NewTicker(d.tickInterval)
651 1 : defer ticker.Stop()
652 1 : type exceededSlot struct {
653 1 : name string
654 1 : opType OpType
655 1 : startNanos int64
656 1 : }
657 1 : var exceededSlots []exceededSlot
658 1 :
659 1 : for {
660 1 : select {
661 1 : case <-ticker.C:
662 1 : // Scan the inflight slots for any slots recording a start
663 1 : // time older than the diskSlowThreshold.
664 1 : exceededSlots = exceededSlots[:0]
665 1 : d.mu.Lock()
666 1 : now := time.Now()
667 1 : for i := range d.mu.inflight {
668 1 : nanos := d.mu.inflight[i].startNanos.Load()
669 1 : if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) {
670 1 : // diskSlowThreshold was exceeded. Copy this inflightOp into
671 1 : // exceededSlots and call d.onSlowDisk after dropping the mutex.
672 1 : inflightOp := exceededSlot{
673 1 : name: d.mu.inflight[i].name,
674 1 : opType: d.mu.inflight[i].opType,
675 1 : startNanos: nanos,
676 1 : }
677 1 : exceededSlots = append(exceededSlots, inflightOp)
678 1 : }
679 : }
680 1 : d.mu.Unlock()
681 1 : for i := range exceededSlots {
682 1 : d.onSlowDisk(
683 1 : DiskSlowInfo{
684 1 : Path: exceededSlots[i].name,
685 1 : OpType: exceededSlots[i].opType,
686 1 : WriteSize: 0, // writes at the fs level are not sized
687 1 : Duration: now.Sub(time.Unix(0, exceededSlots[i].startNanos)),
688 1 : })
689 1 : }
690 1 : case <-stopper:
691 1 : return
692 : }
693 : }
694 : }()
695 : }
696 :
697 : // Close implements io.Closer. Close stops the long-running goroutine that
698 : // monitors for slow filesystem metadata operations. Close may be called
699 : // multiple times. If the filesystem is used after Close has been called, a new
700 : // long-running goroutine will be created.
701 1 : func (d *diskHealthCheckingFS) Close() error {
702 1 : d.mu.Lock()
703 1 : if !d.mu.tickerRunning {
704 1 : // Nothing to stop.
705 1 : d.mu.Unlock()
706 1 : return nil
707 1 : }
708 :
709 : // Grab the stopper so we can request the long-running goroutine to stop.
710 : // Replace the stopper in case this FS is reused. It's possible to Close and
711 : // reuse a disk-health checking FS. This is to accommodate the on-by-default
712 : // behavior in Pebble, and the possibility that users may continue to use
713 : // the Pebble default FS beyond the lifetime of a single DB.
714 1 : stopper := d.mu.stopper
715 1 : d.mu.stopper = make(chan struct{})
716 1 : d.mu.tickerRunning = false
717 1 : d.mu.Unlock()
718 1 :
719 1 : // Ask the long-running goroutine to stop. This is a synchronous channel
720 1 : // send.
721 1 : stopper <- struct{}{}
722 1 : close(stopper)
723 1 : return nil
724 : }
725 :
726 : // Create implements the FS interface.
727 1 : func (d *diskHealthCheckingFS) Create(name string) (File, error) {
728 1 : var f File
729 1 : var err error
730 1 : d.timeFilesystemOp(name, OpTypeCreate, func() {
731 1 : f, err = d.fs.Create(name)
732 1 : }, time.Now().UnixNano())
733 1 : if err != nil {
734 1 : return f, err
735 1 : }
736 1 : if d.diskSlowThreshold == 0 {
737 0 : return f, nil
738 0 : }
739 : // TODO(cheranm): add plumbing to pass down valid category.
740 1 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, WriteCategoryUnspecified, d.statsCollector,
741 1 : func(opType OpType, writeSizeInBytes int, duration time.Duration) {
742 1 : d.onSlowDisk(
743 1 : DiskSlowInfo{
744 1 : Path: name,
745 1 : OpType: opType,
746 1 : WriteSize: writeSizeInBytes,
747 1 : Duration: duration,
748 1 : })
749 1 : })
750 1 : checkingFile.startTicker()
751 1 : return checkingFile, nil
752 : }
753 :
754 : // GetDiskUsage implements the FS interface.
755 1 : func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
756 1 : return d.fs.GetDiskUsage(path)
757 1 : }
758 :
759 : // Link implements the FS interface.
760 1 : func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
761 1 : var err error
762 1 : d.timeFilesystemOp(newname, OpTypeLink, func() {
763 1 : err = d.fs.Link(oldname, newname)
764 1 : }, time.Now().UnixNano())
765 1 : return err
766 : }
767 :
768 : // List implements the FS interface.
769 1 : func (d *diskHealthCheckingFS) List(dir string) ([]string, error) {
770 1 : return d.fs.List(dir)
771 1 : }
772 :
773 : // Lock implements the FS interface.
774 1 : func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
775 1 : return d.fs.Lock(name)
776 1 : }
777 :
778 : // MkdirAll implements the FS interface.
779 1 : func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
780 1 : var err error
781 1 : d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
782 1 : err = d.fs.MkdirAll(dir, perm)
783 1 : }, time.Now().UnixNano())
784 1 : return err
785 : }
786 :
787 : // Open implements the FS interface.
788 1 : func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) {
789 1 : return d.fs.Open(name, opts...)
790 1 : }
791 :
792 : // OpenReadWrite implements the FS interface.
793 0 : func (d *diskHealthCheckingFS) OpenReadWrite(name string, opts ...OpenOption) (File, error) {
794 0 : f, err := d.fs.OpenReadWrite(name, opts...)
795 0 : if err != nil {
796 0 : return nil, err
797 0 : }
798 : // TODO(cheranm): add plumbing to pass down valid category.
799 0 : return newDiskHealthCheckingFile(f, 0, WriteCategoryUnspecified, d.statsCollector, func(opType OpType, writeSizeInBytes int, duration time.Duration) {}), nil
800 : }
801 :
802 : // OpenDir implements the FS interface.
803 1 : func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
804 1 : f, err := d.fs.OpenDir(name)
805 1 : if err != nil {
806 0 : return f, err
807 0 : }
808 : // Directories opened with OpenDir must be opened with health checking,
809 : // because they may be explicitly synced.
810 1 : return &diskHealthCheckingDir{
811 1 : File: f,
812 1 : name: name,
813 1 : fs: d,
814 1 : }, nil
815 : }
816 :
817 : // PathBase implements the FS interface.
818 1 : func (d *diskHealthCheckingFS) PathBase(path string) string {
819 1 : return d.fs.PathBase(path)
820 1 : }
821 :
822 : // PathJoin implements the FS interface.
823 1 : func (d *diskHealthCheckingFS) PathJoin(elem ...string) string {
824 1 : return d.fs.PathJoin(elem...)
825 1 : }
826 :
827 : // PathDir implements the FS interface.
828 1 : func (d *diskHealthCheckingFS) PathDir(path string) string {
829 1 : return d.fs.PathDir(path)
830 1 : }
831 :
832 : // Remove implements the FS interface.
833 1 : func (d *diskHealthCheckingFS) Remove(name string) error {
834 1 : var err error
835 1 : d.timeFilesystemOp(name, OpTypeRemove, func() {
836 1 : err = d.fs.Remove(name)
837 1 : }, time.Now().UnixNano())
838 1 : return err
839 : }
840 :
841 : // RemoveAll implements the FS interface.
842 1 : func (d *diskHealthCheckingFS) RemoveAll(name string) error {
843 1 : var err error
844 1 : d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
845 1 : err = d.fs.RemoveAll(name)
846 1 : }, time.Now().UnixNano())
847 1 : return err
848 : }
849 :
850 : // Rename implements the FS interface.
851 1 : func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
852 1 : var err error
853 1 : d.timeFilesystemOp(newname, OpTypeRename, func() {
854 1 : err = d.fs.Rename(oldname, newname)
855 1 : }, time.Now().UnixNano())
856 1 : return err
857 : }
858 :
859 : // ReuseForWrite implements the FS interface.
860 1 : func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) {
861 1 : var f File
862 1 : var err error
863 1 : d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
864 1 : f, err = d.fs.ReuseForWrite(oldname, newname)
865 1 : }, time.Now().UnixNano())
866 1 : if err != nil {
867 1 : return f, err
868 1 : }
869 1 : if d.diskSlowThreshold == 0 {
870 0 : return f, nil
871 0 : }
872 : // TODO(cheranm): add plumbing to pass down valid category.
873 1 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, WriteCategoryUnspecified, d.statsCollector,
874 1 : func(opType OpType, writeSizeInBytes int, duration time.Duration) {
875 0 : d.onSlowDisk(
876 0 : DiskSlowInfo{
877 0 : Path: newname,
878 0 : OpType: opType,
879 0 : WriteSize: writeSizeInBytes,
880 0 : Duration: duration,
881 0 : })
882 0 : })
883 1 : checkingFile.startTicker()
884 1 : return checkingFile, nil
885 : }
886 :
887 : // Stat implements the FS interface.
888 1 : func (d *diskHealthCheckingFS) Stat(name string) (os.FileInfo, error) {
889 1 : return d.fs.Stat(name)
890 1 : }
891 :
892 : type noopCloser struct{}
893 :
894 0 : func (noopCloser) Close() error { return nil }
|