Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package vfs
6 :
7 : import (
8 : "cmp"
9 : "fmt"
10 : "io"
11 : "os"
12 : "path/filepath"
13 : "slices"
14 : "sync"
15 : "sync/atomic"
16 : "time"
17 :
18 : "github.com/cockroachdb/crlib/crtime"
19 : "github.com/cockroachdb/redact"
20 : )
21 :
22 : const (
23 : // preallocatedSlotCount is the default number of slots available for
24 : // concurrent filesystem operations. The slot count may be exceeded, but
25 : // each additional slot will incur an additional allocation. We choose 16
26 : // here with the expectation that it is significantly more than required in
27 : // practice. See the comment above the diskHealthCheckingFS type definition.
28 : preallocatedSlotCount = 16
29 : // deltaBits is the number of bits in the packed 64-bit integer used for
30 : // identifying a delta from the file creation time in milliseconds.
31 : deltaBits = 40
32 : // writeSizeBits is the number of bits in the packed 64-bit integer used for
33 : // identifying the size of the write operation, if the operation is sized. See
34 : // writeSizePrecision below for precision of size.
35 : writeSizeBits = 20
36 : // Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
37 : writeSizePrecision = 1024
38 : )
39 :
40 : // Variables to enable testing.
41 : var (
42 : // defaultTickInterval is the default interval between two ticks of each
43 : // diskHealthCheckingFile loop iteration.
44 : defaultTickInterval = 2 * time.Second
45 : )
46 :
47 : // OpType is the type of IO operation being monitored by a
48 : // diskHealthCheckingFile.
49 : type OpType uint8
50 :
51 : // The following OpTypes is limited to the subset of file system operations that
52 : // a diskHealthCheckingFile supports (namely writes and syncs).
53 : const (
54 : OpTypeUnknown OpType = iota
55 : OpTypeWrite
56 : OpTypeSync
57 : OpTypeSyncData
58 : OpTypeSyncTo
59 : OpTypeCreate
60 : OpTypeLink
61 : OpTypeMkdirAll
62 : OpTypePreallocate
63 : OpTypeRemove
64 : OpTypeRemoveAll
65 : OpTypeRename
66 : OpTypeReuseForWrite
67 : // Note: opTypeMax is just used in tests. It must appear last in the list
68 : // of OpTypes.
69 : opTypeMax
70 : )
71 :
72 : // String implements fmt.Stringer.
73 1 : func (o OpType) String() string {
74 1 : switch o {
75 1 : case OpTypeWrite:
76 1 : return "write"
77 1 : case OpTypeSync:
78 1 : return "sync"
79 1 : case OpTypeSyncData:
80 1 : return "syncdata"
81 1 : case OpTypeSyncTo:
82 1 : return "syncto"
83 1 : case OpTypeCreate:
84 1 : return "create"
85 1 : case OpTypeLink:
86 1 : return "link"
87 1 : case OpTypeMkdirAll:
88 1 : return "mkdirall"
89 1 : case OpTypePreallocate:
90 1 : return "preallocate"
91 1 : case OpTypeRemove:
92 1 : return "remove"
93 1 : case OpTypeRemoveAll:
94 1 : return "removall"
95 1 : case OpTypeRename:
96 1 : return "rename"
97 1 : case OpTypeReuseForWrite:
98 1 : return "reuseforwrite"
99 1 : case OpTypeUnknown:
100 1 : return "unknown"
101 0 : default:
102 0 : panic(fmt.Sprintf("vfs: unknown op type: %d", o))
103 : }
104 : }
105 :
106 : // DiskWriteCategory is a user-understandable string used to identify and aggregate
107 : // stats for disk writes. The prefix "pebble-" is reserved for internal Pebble categories.
108 : //
109 : // Some examples include, pebble-wal, pebble-memtable-flush, pebble-manifest and in the
110 : // Cockroach context includes, sql-row-spill, range-snapshot, crdb-log.
111 : type DiskWriteCategory string
112 :
113 : // WriteCategoryUnspecified denotes a disk write without a significant category.
114 : const WriteCategoryUnspecified DiskWriteCategory = "unspecified"
115 :
116 : // DiskWriteStatsAggregate is an aggregate of the bytes written to disk for a given category.
117 : type DiskWriteStatsAggregate struct {
118 : Category DiskWriteCategory
119 : BytesWritten uint64
120 : }
121 :
122 : // DiskWriteStatsCollector collects and aggregates disk write metrics per category.
123 : type DiskWriteStatsCollector struct {
124 : mu sync.Mutex
125 : statsMap map[DiskWriteCategory]*atomic.Uint64
126 : }
127 :
128 : // NewDiskWriteStatsCollector instantiates a new DiskWriteStatsCollector.
129 1 : func NewDiskWriteStatsCollector() *DiskWriteStatsCollector {
130 1 : return &DiskWriteStatsCollector{
131 1 : statsMap: make(map[DiskWriteCategory]*atomic.Uint64),
132 1 : }
133 1 : }
134 :
135 : // CreateStat inserts a new category to the statsMap if it doesn't already exist, otherwise
136 : // it returns a pointer to the current stats.
137 1 : func (d *DiskWriteStatsCollector) CreateStat(category DiskWriteCategory) *atomic.Uint64 {
138 1 : var bytesWritten *atomic.Uint64
139 1 : d.mu.Lock()
140 1 : defer d.mu.Unlock()
141 1 : if aggStats, ok := d.statsMap[category]; !ok {
142 1 : bytesWritten = new(atomic.Uint64)
143 1 : d.statsMap[category] = bytesWritten
144 1 : } else {
145 0 : bytesWritten = aggStats
146 0 : }
147 1 : return bytesWritten
148 : }
149 :
150 : // GetStats returns the aggregated metrics for all categories.
151 1 : func (d *DiskWriteStatsCollector) GetStats() []DiskWriteStatsAggregate {
152 1 : var stats []DiskWriteStatsAggregate
153 1 : d.mu.Lock()
154 1 : for category, numBytes := range d.statsMap {
155 1 : stats = append(stats, DiskWriteStatsAggregate{
156 1 : Category: category,
157 1 : BytesWritten: numBytes.Load(),
158 1 : })
159 1 : }
160 1 : d.mu.Unlock()
161 1 : slices.SortFunc(stats, func(a, b DiskWriteStatsAggregate) int { return cmp.Compare(a.Category, b.Category) })
162 1 : return stats
163 : }
164 :
165 : // diskHealthCheckingFile is a File wrapper to collect disk write stats,
166 : // detect slow disk operations, and call onSlowDisk if a disk operation
167 : // is seen to exceed diskSlowThreshold.
168 : //
169 : // This struct creates a goroutine (in startTicker()) that, at every tick
170 : // interval, sees if there's a disk operation taking longer than the specified
171 : // duration. This setup is preferable to creating a new timer at every disk
172 : // operation, as it reduces overhead per disk operation.
173 : type diskHealthCheckingFile struct {
174 : file File
175 : onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration)
176 : diskSlowThreshold time.Duration
177 : tickInterval time.Duration
178 :
179 : stopper chan struct{}
180 : // lastWritePacked is a 64-bit unsigned int. The most significant
181 : // 40 bits represent an delta (in milliseconds) from the creation
182 : // time of the diskHealthCheckingFile. The next most significant 20 bits
183 : // represent the size of the write in KBs, if the write has a size. (If
184 : // it doesn't, the 20 bits are zeroed). The least significant four bits
185 : // contains the OpType.
186 : //
187 : // The use of 40 bits for an delta provides ~34 years of effective
188 : // monitoring time before the uint wraps around, at millisecond precision.
189 : // ~34 years of process uptime "ought to be enough for anybody". Millisecond
190 : // writeSizePrecision is sufficient, given that we are monitoring for writes that take
191 : // longer than one millisecond.
192 : //
193 : // The use of 20 bits for the size in KBs allows representing sizes up
194 : // to nearly one GB. If the write is larger than that, we round down to ~one GB.
195 : //
196 : // The use of four bits for OpType allows for 16 operation types.
197 : //
198 : // NB: this packing scheme is not persisted, and is therefore safe to adjust
199 : // across process boundaries.
200 : lastWritePacked atomic.Uint64
201 : createTime crtime.Mono
202 :
203 : // aggBytesWritten points to an atomic that aggregates the bytes written for files
204 : // that belong to a specific DiskWriteCategory. This pointer is also stored in the
205 : // DiskWriteStatsCollector for metric collection.
206 : aggBytesWritten *atomic.Uint64
207 : category DiskWriteCategory
208 : }
209 :
210 : // diskHealthCheckingFile implements File.
211 : var _ File = (*diskHealthCheckingFile)(nil)
212 :
213 : // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
214 : // specified time threshold and event listener.
215 : func newDiskHealthCheckingFile(
216 : file File,
217 : diskSlowThreshold time.Duration,
218 : category DiskWriteCategory,
219 : statsCollector *DiskWriteStatsCollector,
220 : onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
221 1 : ) *diskHealthCheckingFile {
222 1 : var bytesWritten *atomic.Uint64
223 1 : if statsCollector != nil {
224 1 : bytesWritten = statsCollector.CreateStat(category)
225 1 : } else {
226 1 : bytesWritten = new(atomic.Uint64)
227 1 : }
228 1 : return &diskHealthCheckingFile{
229 1 : file: file,
230 1 : onSlowDisk: onSlowDisk,
231 1 : diskSlowThreshold: diskSlowThreshold,
232 1 : tickInterval: defaultTickInterval,
233 1 :
234 1 : stopper: make(chan struct{}),
235 1 : createTime: crtime.NowMono(),
236 1 :
237 1 : aggBytesWritten: bytesWritten,
238 1 : category: category,
239 1 : }
240 : }
241 :
242 : // startTicker starts a new goroutine with a ticker to monitor disk operations.
243 : // Can only be called if the ticker goroutine isn't running already.
244 1 : func (d *diskHealthCheckingFile) startTicker() {
245 1 : if d.diskSlowThreshold == 0 {
246 0 : return
247 0 : }
248 :
249 1 : go func() {
250 1 : ticker := time.NewTicker(d.tickInterval)
251 1 : defer ticker.Stop()
252 1 :
253 1 : for {
254 1 : select {
255 1 : case <-d.stopper:
256 1 : return
257 :
258 1 : case <-ticker.C:
259 1 : packed := d.lastWritePacked.Load()
260 1 : if packed == 0 {
261 1 : continue
262 : }
263 1 : delta, writeSize, op := unpack(packed)
264 1 : now := crtime.NowMono()
265 1 : // The last write started at d.createTime + delta and ended now.
266 1 : lastWriteDuration := now.Sub(d.createTime) - delta
267 1 : if lastWriteDuration > d.diskSlowThreshold {
268 1 : // diskSlowThreshold was exceeded. Call the passed-in
269 1 : // listener.
270 1 : d.onSlowDisk(op, writeSize, lastWriteDuration)
271 1 : }
272 : }
273 : }
274 : }()
275 : }
276 :
277 : // stopTicker stops the goroutine started in startTicker.
278 1 : func (d *diskHealthCheckingFile) stopTicker() {
279 1 : close(d.stopper)
280 1 : }
281 :
282 : // Fd implements (vfs.File).Fd.
283 1 : func (d *diskHealthCheckingFile) Fd() uintptr {
284 1 : return d.file.Fd()
285 1 : }
286 :
287 : // Read implements (vfs.File).Read
288 0 : func (d *diskHealthCheckingFile) Read(p []byte) (int, error) {
289 0 : return d.file.Read(p)
290 0 : }
291 :
292 : // ReadAt implements (vfs.File).ReadAt
293 0 : func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {
294 0 : return d.file.ReadAt(p, off)
295 0 : }
296 :
297 : // Write implements the io.Writer interface.
298 1 : func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
299 1 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
300 1 : n, err = d.file.Write(p)
301 1 : }, crtime.NowMono())
302 1 : d.aggBytesWritten.Add(uint64(n))
303 1 : return n, err
304 : }
305 :
306 : // WriteAt implements the io.WriterAt interface.
307 0 : func (d *diskHealthCheckingFile) WriteAt(p []byte, ofs int64) (n int, err error) {
308 0 : d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
309 0 : n, err = d.file.WriteAt(p, ofs)
310 0 : }, crtime.NowMono())
311 0 : d.aggBytesWritten.Add(uint64(n))
312 0 : return n, err
313 : }
314 :
315 : // Close implements the io.Closer interface.
316 1 : func (d *diskHealthCheckingFile) Close() error {
317 1 : d.stopTicker()
318 1 : return d.file.Close()
319 1 : }
320 :
321 : // Prefetch implements (vfs.File).Prefetch.
322 0 : func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
323 0 : return d.file.Prefetch(offset, length)
324 0 : }
325 :
326 : // Preallocate implements (vfs.File).Preallocate.
327 1 : func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
328 1 : d.timeDiskOp(OpTypePreallocate, n, func() {
329 1 : err = d.file.Preallocate(off, n)
330 1 : }, crtime.NowMono())
331 1 : return err
332 : }
333 :
334 : // Stat implements (vfs.File).Stat.
335 1 : func (d *diskHealthCheckingFile) Stat() (FileInfo, error) {
336 1 : return d.file.Stat()
337 1 : }
338 :
339 : // Sync implements the io.Syncer interface.
340 1 : func (d *diskHealthCheckingFile) Sync() (err error) {
341 1 : d.timeDiskOp(OpTypeSync, 0, func() {
342 1 : err = d.file.Sync()
343 1 : }, crtime.NowMono())
344 1 : return err
345 : }
346 :
347 : // SyncData implements (vfs.File).SyncData.
348 1 : func (d *diskHealthCheckingFile) SyncData() (err error) {
349 1 : d.timeDiskOp(OpTypeSyncData, 0, func() {
350 1 : err = d.file.SyncData()
351 1 : }, crtime.NowMono())
352 1 : return err
353 : }
354 :
355 : // SyncTo implements (vfs.File).SyncTo.
356 0 : func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
357 0 : d.timeDiskOp(OpTypeSyncTo, length, func() {
358 0 : fullSync, err = d.file.SyncTo(length)
359 0 : }, crtime.NowMono())
360 0 : return fullSync, err
361 : }
362 :
363 : // timeDiskOp runs the specified closure and makes its timing visible to the
364 : // monitoring goroutine, in case it exceeds one of the slow disk durations.
365 : // opType should always be set. writeSizeInBytes should be set if the write
366 : // operation is sized. If not, it should be set to zero.
367 : //
368 : // The start time is taken as a parameter in the form of nanoseconds since the
369 : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
370 : // is set appropriately), aiding postmortem debugging.
371 : func (d *diskHealthCheckingFile) timeDiskOp(
372 : opType OpType, writeSizeInBytes int64, op func(), start crtime.Mono,
373 1 : ) {
374 1 : if d == nil || d.diskSlowThreshold == 0 {
375 0 : op()
376 0 : return
377 0 : }
378 :
379 1 : delta := start.Sub(d.createTime)
380 1 : packed := pack(delta, writeSizeInBytes, opType)
381 1 : if d.lastWritePacked.Swap(packed) != 0 {
382 0 : panic("concurrent write operations detected on file")
383 : }
384 1 : defer func() {
385 1 : if d.lastWritePacked.Swap(0) != packed {
386 0 : panic("concurrent write operations detected on file")
387 : }
388 : }()
389 1 : op()
390 : }
391 :
392 : // Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
393 : // callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
394 : // safe because the packing scheme implies we only actually need 32 bits.
395 1 : func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
396 1 : // We have no guarantee of clock monotonicity. If we have a small regression
397 1 : // in the clock, we set deltaMillis to zero, so we can still catch the operation
398 1 : // if happens to be slow.
399 1 : deltaMillis := delta.Milliseconds()
400 1 : if deltaMillis < 0 {
401 0 : deltaMillis = 0
402 0 : }
403 : // As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
404 : // of effective monitoring time before the uint wraps around, at millisecond
405 : // precision.
406 1 : if deltaMillis > 1<<deltaBits-1 {
407 1 : panic("vfs: last write delta would result in integer wraparound")
408 : }
409 :
410 : // See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
411 1 : writeSize := writeSizeInBytes / writeSizePrecision
412 1 : // If the size of the write is larger than we can store in the packed int, store the max
413 1 : // value we can store in the packed int.
414 1 : const writeSizeCeiling = 1<<writeSizeBits - 1
415 1 : if writeSize > writeSizeCeiling {
416 1 : writeSize = writeSizeCeiling
417 1 : }
418 :
419 1 : return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
420 : }
421 :
422 1 : func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
423 1 : delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
424 1 : wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
425 1 : // Given the packing scheme, converting wz to an int will not truncate anything.
426 1 : writeSizeInBytes = int(wz)
427 1 : opType = OpType(packed & 0xf)
428 1 : return delta, writeSizeInBytes, opType
429 1 : }
430 :
431 : // diskHealthCheckingDir implements disk-health checking for directories. Unlike
432 : // other files, we allow directories to receive concurrent write operations
433 : // (Syncs are the only write operations supported by a directory.) Since the
434 : // diskHealthCheckingFile's timeDiskOp can only track a single in-flight
435 : // operation at a time, we time the operation using the filesystem-level
436 : // timeFilesystemOp function instead.
437 : type diskHealthCheckingDir struct {
438 : File
439 : name string
440 : fs *diskHealthCheckingFS
441 : }
442 :
443 : // Sync implements the io.Syncer interface.
444 1 : func (d *diskHealthCheckingDir) Sync() (err error) {
445 1 : d.fs.timeFilesystemOp(d.name, OpTypeSync, func() {
446 1 : err = d.File.Sync()
447 1 : }, crtime.NowMono())
448 1 : return err
449 : }
450 :
451 : // DiskSlowInfo captures info about detected slow operations on the vfs.
452 : type DiskSlowInfo struct {
453 : // Path of file being written to.
454 : Path string
455 : // Operation being performed on the file.
456 : OpType OpType
457 : // Size of write in bytes, if the write is sized.
458 : WriteSize int
459 : // Duration that has elapsed since this disk operation started.
460 : Duration time.Duration
461 : }
462 :
463 0 : func (i DiskSlowInfo) String() string {
464 0 : return redact.StringWithoutMarkers(i)
465 0 : }
466 :
467 : // SafeFormat implements redact.SafeFormatter.
468 0 : func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
469 0 : switch i.OpType {
470 : // Operations for which i.WriteSize is meaningful.
471 0 : case OpTypeWrite, OpTypeSyncTo, OpTypePreallocate:
472 0 : w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
473 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
474 0 : redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds()))
475 0 : default:
476 0 : w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
477 0 : redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
478 0 : redact.Safe(i.Duration.Seconds()))
479 : }
480 : }
481 :
482 : // diskHealthCheckingFS adds disk-health checking facilities to a VFS.
483 : // It times disk write operations in two ways:
484 : //
485 : // 1. Wrapping vfs.Files.
486 : //
487 : // The bulk of write I/O activity is file writing and syncing, invoked through
488 : // the `vfs.File` interface. This VFS wraps all files open for writing with a
489 : // special diskHealthCheckingFile implementation of the vfs.File interface. See
490 : // above for the implementation.
491 : //
492 : // 2. Monitoring filesystem metadata operations.
493 : //
494 : // Filesystem metadata operations (create, link, remove, rename, etc) are also
495 : // sources of disk writes. Unlike a vfs.File which requires Write and Sync calls
496 : // to be sequential, a vfs.FS may receive these filesystem metadata operations
497 : // in parallel. To accommodate this parallelism, the diskHealthCheckingFS's
498 : // write-oriented filesystem operations record their start times into a 'slot'
499 : // on the filesystem. A single long-running goroutine periodically scans the
500 : // slots looking for slow operations.
501 : //
502 : // The number of slots on a diskHealthCheckingFS grows to a working set of the
503 : // maximum concurrent filesystem operations. This is expected to be very few
504 : // for these reasons:
505 : // 1. Pebble has limited write concurrency. Flushes, compactions and WAL
506 : // rotations are the primary sources of filesystem metadata operations. With
507 : // the default max-compaction concurrency, these operations require at most 5
508 : // concurrent slots if all 5 perform a filesystem metadata operation
509 : // simultaneously.
510 : // 2. Pebble's limited concurrent I/O writers spend most of their time
511 : // performing file I/O, not performing the filesystem metadata operations that
512 : // require recording a slot on the diskHealthCheckingFS.
513 : // 3. In CockroachDB, each additional store/Pebble instance has its own vfs.FS
514 : // which provides a separate goroutine and set of slots.
515 : // 4. In CockroachDB, many of the additional sources of filesystem metadata
516 : // operations (like encryption-at-rest) are sequential with respect to Pebble's
517 : // threads.
518 : type diskHealthCheckingFS struct {
519 : tickInterval time.Duration
520 : diskSlowThreshold time.Duration
521 : statsCollector *DiskWriteStatsCollector
522 : onSlowDisk func(DiskSlowInfo)
523 : fs FS
524 : mu struct {
525 : sync.Mutex
526 : tickerRunning bool
527 : stopper chan struct{}
528 : inflight []*slot
529 : }
530 : // prealloc preallocates the memory for mu.inflight slots and the slice
531 : // itself. The contained fields are not accessed directly except by
532 : // WithDiskHealthChecks when initializing mu.inflight. The number of slots
533 : // in d.mu.inflight will grow to the maximum number of concurrent file
534 : // metadata operations (create, remove, link, etc). If the number of
535 : // concurrent operations never exceeds preallocatedSlotCount, we'll never
536 : // incur an additional allocation.
537 : prealloc struct {
538 : slots [preallocatedSlotCount]slot
539 : slotPtrSlice [preallocatedSlotCount]*slot
540 : }
541 : }
542 :
543 : type slot struct {
544 : name string
545 : opType OpType
546 : startTime crtime.AtomicMono
547 : }
548 :
549 : // diskHealthCheckingFS implements FS.
550 : var _ FS = (*diskHealthCheckingFS)(nil)
551 :
552 : // WithDiskHealthChecks wraps an FS and ensures that all write-oriented
553 : // operations on the FS are wrapped with disk health detection checks and
554 : // aggregated. Disk operations that are observed to take longer than
555 : // diskSlowThreshold trigger an onSlowDisk call.
556 : //
557 : // A threshold of zero disables disk-health checking.
558 : func WithDiskHealthChecks(
559 : innerFS FS,
560 : diskSlowThreshold time.Duration,
561 : statsCollector *DiskWriteStatsCollector,
562 : onSlowDisk func(info DiskSlowInfo),
563 1 : ) (FS, io.Closer) {
564 1 : if diskSlowThreshold == 0 {
565 0 : return innerFS, noopCloser{}
566 0 : }
567 :
568 1 : fs := &diskHealthCheckingFS{
569 1 : fs: innerFS,
570 1 : tickInterval: defaultTickInterval,
571 1 : diskSlowThreshold: diskSlowThreshold,
572 1 : statsCollector: statsCollector,
573 1 : onSlowDisk: onSlowDisk,
574 1 : }
575 1 : fs.mu.stopper = make(chan struct{})
576 1 : // The fs holds preallocated slots and a preallocated array of slot pointers
577 1 : // with equal length. Initialize the inflight slice to use a slice backed by
578 1 : // the preallocated array with each slot initialized to a preallocated slot.
579 1 : fs.mu.inflight = fs.prealloc.slotPtrSlice[:]
580 1 : for i := range fs.mu.inflight {
581 1 : fs.mu.inflight[i] = &fs.prealloc.slots[i]
582 1 : }
583 1 : return fs, fs
584 : }
585 :
586 : // Unwrap is part of the FS interface.
587 1 : func (d *diskHealthCheckingFS) Unwrap() FS {
588 1 : return d.fs
589 1 : }
590 :
591 : // timeFilesystemOp executes the provided closure, which should perform a
592 : // singular filesystem operation of a type matching opType on the named file. It
593 : // records the provided start time such that the long-lived disk-health checking
594 : // goroutine can observe if the operation is blocked for an inordinate time.
595 : //
596 : // The start time is taken as a parameter in the form of nanoseconds since the
597 : // unix epoch so that it appears in stack traces during crashes (if GOTRACEBACK
598 : // is set appropriately), aiding postmortem debugging.
599 : func (d *diskHealthCheckingFS) timeFilesystemOp(
600 : name string, opType OpType, op func(), start crtime.Mono,
601 1 : ) {
602 1 : if d == nil {
603 0 : op()
604 0 : return
605 0 : }
606 :
607 : // Record this operation's start time on the FS, so that the long-running
608 : // goroutine can monitor the filesystem operation.
609 : //
610 : // The diskHealthCheckingFile implementation uses a single field that is
611 : // atomically updated, taking advantage of the fact that writes to a single
612 : // vfs.File handle are not performed in parallel. The vfs.FS however may
613 : // receive write filesystem operations in parallel. To accommodate this
614 : // parallelism, writing goroutines append their start time to a
615 : // mutex-protected vector. On ticks, the long-running goroutine scans the
616 : // vector searching for start times older than the slow-disk threshold. When
617 : // a writing goroutine completes its operation, it atomically overwrites its
618 : // slot to signal completion.
619 1 : var s *slot
620 1 : func() {
621 1 : d.mu.Lock()
622 1 : defer d.mu.Unlock()
623 1 :
624 1 : // If there's no long-running goroutine to monitor this filesystem
625 1 : // operation, start one.
626 1 : if !d.mu.tickerRunning {
627 1 : d.startTickerLocked()
628 1 : }
629 :
630 1 : for i := 0; i < len(d.mu.inflight); i++ {
631 1 : if d.mu.inflight[i].startTime.Load() == 0 {
632 1 : // This slot is not in use. Claim it.
633 1 : s = d.mu.inflight[i]
634 1 : s.name = name
635 1 : s.opType = opType
636 1 : s.startTime.Store(start)
637 1 : break
638 : }
639 : }
640 : // If we didn't find any unused slots, create a new slot and append it.
641 : // This slot will exist forever. The number of slots will grow to the
642 : // maximum number of concurrent filesystem operations over the lifetime
643 : // of the process. Only operations that grow the number of slots must
644 : // incur an allocation.
645 1 : if s == nil {
646 0 : s = &slot{
647 0 : name: name,
648 0 : opType: opType,
649 0 : }
650 0 : s.startTime.Store(start)
651 0 : d.mu.inflight = append(d.mu.inflight, s)
652 0 : }
653 : }()
654 :
655 1 : op()
656 1 :
657 1 : // Signal completion by zeroing the start time.
658 1 : s.startTime.Store(0)
659 : }
660 :
661 : // startTickerLocked starts a new goroutine with a ticker to monitor disk
662 : // filesystem operations. Requires d.mu and !d.mu.tickerRunning.
663 1 : func (d *diskHealthCheckingFS) startTickerLocked() {
664 1 : d.mu.tickerRunning = true
665 1 : stopper := d.mu.stopper
666 1 : go func() {
667 1 : ticker := time.NewTicker(d.tickInterval)
668 1 : defer ticker.Stop()
669 1 : type exceededSlot struct {
670 1 : name string
671 1 : opType OpType
672 1 : startTime crtime.Mono
673 1 : }
674 1 : var exceededSlots []exceededSlot
675 1 :
676 1 : for {
677 1 : select {
678 1 : case <-ticker.C:
679 1 : // Scan the inflight slots for any slots recording a start
680 1 : // time older than the diskSlowThreshold.
681 1 : exceededSlots = exceededSlots[:0]
682 1 : d.mu.Lock()
683 1 : now := crtime.NowMono()
684 1 : for i := range d.mu.inflight {
685 1 : startTime := d.mu.inflight[i].startTime.Load()
686 1 : if startTime != 0 && now.Sub(startTime) > d.diskSlowThreshold {
687 1 : // diskSlowThreshold was exceeded. Copy this inflightOp into
688 1 : // exceededSlots and call d.onSlowDisk after dropping the mutex.
689 1 : inflightOp := exceededSlot{
690 1 : name: d.mu.inflight[i].name,
691 1 : opType: d.mu.inflight[i].opType,
692 1 : startTime: startTime,
693 1 : }
694 1 : exceededSlots = append(exceededSlots, inflightOp)
695 1 : }
696 : }
697 1 : d.mu.Unlock()
698 1 : for i := range exceededSlots {
699 1 : d.onSlowDisk(
700 1 : DiskSlowInfo{
701 1 : Path: exceededSlots[i].name,
702 1 : OpType: exceededSlots[i].opType,
703 1 : WriteSize: 0, // writes at the fs level are not sized
704 1 : Duration: now.Sub(exceededSlots[i].startTime),
705 1 : })
706 1 : }
707 1 : case <-stopper:
708 1 : return
709 : }
710 : }
711 : }()
712 : }
713 :
714 : // Close implements io.Closer. Close stops the long-running goroutine that
715 : // monitors for slow filesystem metadata operations. Close may be called
716 : // multiple times. If the filesystem is used after Close has been called, a new
717 : // long-running goroutine will be created.
718 1 : func (d *diskHealthCheckingFS) Close() error {
719 1 : d.mu.Lock()
720 1 : if !d.mu.tickerRunning {
721 1 : // Nothing to stop.
722 1 : d.mu.Unlock()
723 1 : return nil
724 1 : }
725 :
726 : // Grab the stopper so we can request the long-running goroutine to stop.
727 : // Replace the stopper in case this FS is reused. It's possible to Close and
728 : // reuse a disk-health checking FS. This is to accommodate the on-by-default
729 : // behavior in Pebble, and the possibility that users may continue to use
730 : // the Pebble default FS beyond the lifetime of a single DB.
731 1 : stopper := d.mu.stopper
732 1 : d.mu.stopper = make(chan struct{})
733 1 : d.mu.tickerRunning = false
734 1 : d.mu.Unlock()
735 1 :
736 1 : // Ask the long-running goroutine to stop. This is a synchronous channel
737 1 : // send.
738 1 : stopper <- struct{}{}
739 1 : close(stopper)
740 1 : return nil
741 : }
742 :
743 : // Create implements the FS interface.
744 1 : func (d *diskHealthCheckingFS) Create(name string, category DiskWriteCategory) (File, error) {
745 1 : var f File
746 1 : var err error
747 1 : d.timeFilesystemOp(name, OpTypeCreate, func() {
748 1 : f, err = d.fs.Create(name, category)
749 1 : }, crtime.NowMono())
750 1 : if err != nil {
751 1 : return f, err
752 1 : }
753 1 : if d.diskSlowThreshold == 0 {
754 0 : return f, nil
755 0 : }
756 1 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, category, d.statsCollector,
757 1 : func(opType OpType, writeSizeInBytes int, duration time.Duration) {
758 1 : d.onSlowDisk(
759 1 : DiskSlowInfo{
760 1 : Path: name,
761 1 : OpType: opType,
762 1 : WriteSize: writeSizeInBytes,
763 1 : Duration: duration,
764 1 : })
765 1 : })
766 1 : checkingFile.startTicker()
767 1 : return checkingFile, nil
768 : }
769 :
770 : // GetDiskUsage implements the FS interface.
771 1 : func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
772 1 : return d.fs.GetDiskUsage(path)
773 1 : }
774 :
775 : // Link implements the FS interface.
776 1 : func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
777 1 : var err error
778 1 : d.timeFilesystemOp(newname, OpTypeLink, func() {
779 1 : err = d.fs.Link(oldname, newname)
780 1 : }, crtime.NowMono())
781 1 : return err
782 : }
783 :
784 : // List implements the FS interface.
785 1 : func (d *diskHealthCheckingFS) List(dir string) ([]string, error) {
786 1 : return d.fs.List(dir)
787 1 : }
788 :
789 : // Lock implements the FS interface.
790 1 : func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
791 1 : return d.fs.Lock(name)
792 1 : }
793 :
794 : // MkdirAll implements the FS interface.
795 1 : func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
796 1 : var err error
797 1 : d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
798 1 : err = d.fs.MkdirAll(dir, perm)
799 1 : }, crtime.NowMono())
800 1 : return err
801 : }
802 :
803 : // Open implements the FS interface.
804 1 : func (d *diskHealthCheckingFS) Open(name string, opts ...OpenOption) (File, error) {
805 1 : return d.fs.Open(name, opts...)
806 1 : }
807 :
808 : // OpenReadWrite implements the FS interface.
809 : func (d *diskHealthCheckingFS) OpenReadWrite(
810 : name string, category DiskWriteCategory, opts ...OpenOption,
811 0 : ) (File, error) {
812 0 : f, err := d.fs.OpenReadWrite(name, category, opts...)
813 0 : if err != nil {
814 0 : return nil, err
815 0 : }
816 0 : return newDiskHealthCheckingFile(f, 0, category, d.statsCollector, func(opType OpType, writeSizeInBytes int, duration time.Duration) {}), nil
817 : }
818 :
819 : // OpenDir implements the FS interface.
820 1 : func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
821 1 : f, err := d.fs.OpenDir(name)
822 1 : if err != nil {
823 0 : return f, err
824 0 : }
825 : // Directories opened with OpenDir must be opened with health checking,
826 : // because they may be explicitly synced.
827 1 : return &diskHealthCheckingDir{
828 1 : File: f,
829 1 : name: name,
830 1 : fs: d,
831 1 : }, nil
832 : }
833 :
834 : // PathBase implements the FS interface.
835 1 : func (d *diskHealthCheckingFS) PathBase(path string) string {
836 1 : return d.fs.PathBase(path)
837 1 : }
838 :
839 : // PathJoin implements the FS interface.
840 1 : func (d *diskHealthCheckingFS) PathJoin(elem ...string) string {
841 1 : return d.fs.PathJoin(elem...)
842 1 : }
843 :
844 : // PathDir implements the FS interface.
845 1 : func (d *diskHealthCheckingFS) PathDir(path string) string {
846 1 : return d.fs.PathDir(path)
847 1 : }
848 :
849 : // Remove implements the FS interface.
850 1 : func (d *diskHealthCheckingFS) Remove(name string) error {
851 1 : var err error
852 1 : d.timeFilesystemOp(name, OpTypeRemove, func() {
853 1 : err = d.fs.Remove(name)
854 1 : }, crtime.NowMono())
855 1 : return err
856 : }
857 :
858 : // RemoveAll implements the FS interface.
859 1 : func (d *diskHealthCheckingFS) RemoveAll(name string) error {
860 1 : var err error
861 1 : d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
862 1 : err = d.fs.RemoveAll(name)
863 1 : }, crtime.NowMono())
864 1 : return err
865 : }
866 :
867 : // Rename implements the FS interface.
868 1 : func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
869 1 : var err error
870 1 : d.timeFilesystemOp(newname, OpTypeRename, func() {
871 1 : err = d.fs.Rename(oldname, newname)
872 1 : }, crtime.NowMono())
873 1 : return err
874 : }
875 :
876 : // ReuseForWrite implements the FS interface.
877 : func (d *diskHealthCheckingFS) ReuseForWrite(
878 : oldname, newname string, category DiskWriteCategory,
879 1 : ) (File, error) {
880 1 : var f File
881 1 : var err error
882 1 : d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
883 1 : f, err = d.fs.ReuseForWrite(oldname, newname, category)
884 1 : }, crtime.NowMono())
885 1 : if err != nil {
886 1 : return f, err
887 1 : }
888 1 : if d.diskSlowThreshold == 0 {
889 0 : return f, nil
890 0 : }
891 1 : checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, category, d.statsCollector,
892 1 : func(opType OpType, writeSizeInBytes int, duration time.Duration) {
893 0 : d.onSlowDisk(
894 0 : DiskSlowInfo{
895 0 : Path: newname,
896 0 : OpType: opType,
897 0 : WriteSize: writeSizeInBytes,
898 0 : Duration: duration,
899 0 : })
900 0 : })
901 1 : checkingFile.startTicker()
902 1 : return checkingFile, nil
903 : }
904 :
905 : // Stat implements the FS interface.
906 1 : func (d *diskHealthCheckingFS) Stat(name string) (FileInfo, error) {
907 1 : return d.fs.Stat(name)
908 1 : }
909 :
910 : type noopCloser struct{}
911 :
912 0 : func (noopCloser) Close() error { return nil }
|