Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crtime"
14 : "github.com/cockroachdb/errors"
15 : errorsjoin "github.com/cockroachdb/errors/join"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/humanize"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/objstorage/remote"
22 : "github.com/cockroachdb/pebble/vfs"
23 : "github.com/cockroachdb/redact"
24 : )
25 :
26 : // TableNum is an identifier for a table within a database.
27 : type TableNum = base.TableNum
28 :
29 : // TableInfo exports the manifest.TableInfo type.
30 : type TableInfo = manifest.TableInfo
31 :
32 1 : func tablesTotalSize(tables []TableInfo) uint64 {
33 1 : var size uint64
34 1 : for i := range tables {
35 1 : size += tables[i].Size
36 1 : }
37 1 : return size
38 : }
39 :
40 1 : func formatFileNums(tables []TableInfo) string {
41 1 : var buf strings.Builder
42 1 : for i := range tables {
43 1 : if i > 0 {
44 1 : buf.WriteString(" ")
45 1 : }
46 1 : buf.WriteString(tables[i].FileNum.String())
47 : }
48 1 : return buf.String()
49 : }
50 :
51 : // DataCorruptionInfo contains the information for a DataCorruption event.
52 : type DataCorruptionInfo struct {
53 : // Path of the file that is corrupted. For remote files the path starts with
54 : // "remote://".
55 : Path string
56 : IsRemote bool
57 : // Locator is only set when IsRemote is true (note that an empty Locator is
58 : // valid even then).
59 : Locator remote.Locator
60 : // Bounds indicates the keyspace range that is affected.
61 : Bounds base.UserKeyBounds
62 : // Details of the error. See cockroachdb/error for how to format with or
63 : // without redaction.
64 : Details error
65 : }
66 :
67 1 : func (i DataCorruptionInfo) String() string {
68 1 : return redact.StringWithoutMarkers(i)
69 1 : }
70 :
71 : // SafeFormat implements redact.SafeFormatter.
72 1 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
73 1 : w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
74 1 : if i.IsRemote {
75 0 : w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
76 0 : }
77 1 : w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
78 : }
79 :
80 : // LevelInfo contains info pertaining to a particular level.
81 : type LevelInfo struct {
82 : Level int
83 : Tables []TableInfo
84 : Score float64
85 : }
86 :
87 0 : func (i LevelInfo) String() string {
88 0 : return redact.StringWithoutMarkers(i)
89 0 : }
90 :
91 : // SafeFormat implements redact.SafeFormatter.
92 1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
93 1 : w.Printf("L%d [%s] (%s) Score=%.2f",
94 1 : redact.Safe(i.Level),
95 1 : redact.Safe(formatFileNums(i.Tables)),
96 1 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
97 1 : redact.Safe(i.Score))
98 1 : }
99 :
100 : // BlobFileCreateInfo contains the info for a blob file creation event.
101 : type BlobFileCreateInfo struct {
102 : JobID int
103 : // Reason is the reason for the table creation: "compacting", "flushing", or
104 : // "ingesting".
105 : Reason string
106 : Path string
107 : FileNum base.DiskFileNum
108 : }
109 :
110 1 : func (i BlobFileCreateInfo) String() string {
111 1 : return redact.StringWithoutMarkers(i)
112 1 : }
113 :
114 : // SafeFormat implements redact.SafeFormatter.
115 1 : func (i BlobFileCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
116 1 : w.Printf("[JOB %d] %s: blob file created %s",
117 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
118 1 : }
119 :
120 : // BlobFileDeleteInfo contains the info for a blob file deletion event.
121 : type BlobFileDeleteInfo struct {
122 : JobID int
123 : Path string
124 : FileNum base.DiskFileNum
125 : Err error
126 : }
127 :
128 1 : func (i BlobFileDeleteInfo) String() string {
129 1 : return redact.StringWithoutMarkers(i)
130 1 : }
131 :
132 : // SafeFormat implements redact.SafeFormatter.
133 1 : func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
134 1 : if i.Err != nil {
135 0 : w.Printf("[JOB %d] blob file delete error %s: %s",
136 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
137 0 : return
138 0 : }
139 1 : w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
140 : }
141 :
142 : // CompactionInfo contains the info for a compaction event.
143 : type CompactionInfo struct {
144 : // JobID is the ID of the compaction job.
145 : JobID int
146 : // Reason is the reason for the compaction.
147 : Reason string
148 : // Input contains the input tables for the compaction organized by level.
149 : Input []LevelInfo
150 : // Output contains the output tables generated by the compaction. The output
151 : // tables are empty for the compaction begin event.
152 : Output LevelInfo
153 : // Duration is the time spent compacting, including reading and writing
154 : // sstables.
155 : Duration time.Duration
156 : // TotalDuration is the total wall-time duration of the compaction,
157 : // including applying the compaction to the database. TotalDuration is
158 : // always ≥ Duration.
159 : TotalDuration time.Duration
160 : Done bool
161 : // Err is set only if Done is true. If non-nil, indicates that the compaction
162 : // failed. Note that err can be ErrCancelledCompaction, which can happen
163 : // during normal operation.
164 : Err error
165 :
166 : SingleLevelOverlappingRatio float64
167 : MultiLevelOverlappingRatio float64
168 :
169 : // Annotations specifies additional info to appear in a compaction's event log line
170 : Annotations compactionAnnotations
171 : }
172 :
173 : type compactionAnnotations []string
174 :
175 : // SafeFormat implements redact.SafeFormatter.
176 1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
177 1 : if len(ca) == 0 {
178 0 : return
179 0 : }
180 1 : for i := range ca {
181 1 : if i != 0 {
182 0 : w.Print(" ")
183 0 : }
184 1 : w.Printf("%s", redact.SafeString(ca[i]))
185 : }
186 : }
187 :
188 1 : func (i CompactionInfo) String() string {
189 1 : return redact.StringWithoutMarkers(i)
190 1 : }
191 :
192 : // SafeFormat implements redact.SafeFormatter.
193 1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
194 1 : if i.Err != nil {
195 1 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
196 1 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
197 1 : return
198 1 : }
199 :
200 1 : if !i.Done {
201 1 : w.Printf("[JOB %d] compacting(%s) ",
202 1 : redact.Safe(i.JobID),
203 1 : redact.SafeString(i.Reason))
204 1 : if len(i.Annotations) > 0 {
205 1 : w.Printf("%s ", i.Annotations)
206 1 : }
207 1 : w.Printf("%s; ", levelInfos(i.Input))
208 1 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
209 1 : return
210 : }
211 1 : outputSize := tablesTotalSize(i.Output.Tables)
212 1 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
213 1 : if len(i.Annotations) > 0 {
214 1 : w.Printf("%s ", i.Annotations)
215 1 : }
216 1 : w.Print(levelInfos(i.Input))
217 1 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
218 1 : redact.Safe(i.Output.Level),
219 1 : redact.Safe(formatFileNums(i.Output.Tables)),
220 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
221 1 : redact.Safe(i.Duration.Seconds()),
222 1 : redact.Safe(i.TotalDuration.Seconds()),
223 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
224 : }
225 :
226 : type levelInfos []LevelInfo
227 :
228 1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
229 1 : for j, levelInfo := range i {
230 1 : if j > 0 {
231 1 : w.Printf(" + ")
232 1 : }
233 1 : w.Print(levelInfo)
234 : }
235 : }
236 :
237 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
238 : // file.
239 : type DiskSlowInfo = vfs.DiskSlowInfo
240 :
241 : // FlushInfo contains the info for a flush event.
242 : type FlushInfo struct {
243 : // JobID is the ID of the flush job.
244 : JobID int
245 : // Reason is the reason for the flush.
246 : Reason string
247 : // Input contains the count of input memtables that were flushed.
248 : Input int
249 : // InputBytes contains the total in-memory size of the memtable(s) that were
250 : // flushed. This size includes skiplist indexing data structures.
251 : InputBytes uint64
252 : // Output contains the ouptut table generated by the flush. The output info
253 : // is empty for the flush begin event.
254 : Output []TableInfo
255 : // Duration is the time spent flushing. This duration includes writing and
256 : // syncing all of the flushed keys to sstables.
257 : Duration time.Duration
258 : // TotalDuration is the total wall-time duration of the flush, including
259 : // applying the flush to the database. TotalDuration is always ≥ Duration.
260 : TotalDuration time.Duration
261 : // Ingest is set to true if the flush is handling tables that were added to
262 : // the flushable queue via an ingestion operation.
263 : Ingest bool
264 : // IngestLevels are the output levels for each ingested table in the flush.
265 : // This field is only populated when Ingest is true.
266 : IngestLevels []int
267 : Done bool
268 : Err error
269 : }
270 :
271 1 : func (i FlushInfo) String() string {
272 1 : return redact.StringWithoutMarkers(i)
273 1 : }
274 :
275 : // SafeFormat implements redact.SafeFormatter.
276 1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
277 1 : if i.Err != nil {
278 1 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
279 1 : return
280 1 : }
281 :
282 1 : plural := redact.SafeString("s")
283 1 : if i.Input == 1 {
284 1 : plural = ""
285 1 : }
286 1 : if !i.Done {
287 1 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
288 1 : if !i.Ingest {
289 1 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
290 1 : w.SafeString(plural)
291 1 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
292 1 : } else {
293 1 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
294 1 : }
295 1 : return
296 : }
297 :
298 1 : outputSize := tablesTotalSize(i.Output)
299 1 : if !i.Ingest {
300 1 : if invariants.Enabled && len(i.IngestLevels) > 0 {
301 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
302 : }
303 1 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
304 1 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
305 1 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
306 1 : redact.Safe(formatFileNums(i.Output)),
307 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
308 1 : redact.Safe(i.Duration.Seconds()),
309 1 : redact.Safe(i.TotalDuration.Seconds()),
310 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
311 1 : } else {
312 1 : if invariants.Enabled && len(i.IngestLevels) == 0 {
313 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
314 : }
315 1 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
316 1 : redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
317 1 : for j, level := range i.IngestLevels {
318 1 : file := i.Output[j]
319 1 : if j > 0 {
320 1 : w.Printf(" +")
321 1 : }
322 1 : w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
323 : }
324 1 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
325 1 : redact.Safe(i.Duration.Seconds()),
326 1 : redact.Safe(i.TotalDuration.Seconds()),
327 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
328 : }
329 : }
330 :
331 : // DownloadInfo contains the info for a DB.Download() event.
332 : type DownloadInfo struct {
333 : // JobID is the ID of the download job.
334 : JobID int
335 :
336 : Spans []DownloadSpan
337 :
338 : // Duration is the time since the operation was started.
339 : Duration time.Duration
340 : DownloadCompactionsLaunched int
341 :
342 : // RestartCount indicates that the download operation restarted because it
343 : // noticed that new external files were ingested. A DownloadBegin event with
344 : // RestartCount = 0 is the start of the operation; each time we restart it we
345 : // have another DownloadBegin event with RestartCount > 0.
346 : RestartCount int
347 : Done bool
348 : Err error
349 : }
350 :
351 1 : func (i DownloadInfo) String() string {
352 1 : return redact.StringWithoutMarkers(i)
353 1 : }
354 :
355 : // SafeFormat implements redact.SafeFormatter.
356 1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
357 1 : switch {
358 0 : case i.Err != nil:
359 0 : w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
360 :
361 1 : case i.Done:
362 1 : w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
363 1 : redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
364 :
365 1 : default:
366 1 : if i.RestartCount == 0 {
367 1 : w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
368 1 : } else {
369 0 : w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
370 0 : redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
371 0 : redact.Safe(i.DownloadCompactionsLaunched))
372 0 : }
373 : }
374 : }
375 :
376 : // ManifestCreateInfo contains info about a manifest creation event.
377 : type ManifestCreateInfo struct {
378 : // JobID is the ID of the job the caused the manifest to be created.
379 : JobID int
380 : Path string
381 : // The file number of the new Manifest.
382 : FileNum base.DiskFileNum
383 : Err error
384 : }
385 :
386 1 : func (i ManifestCreateInfo) String() string {
387 1 : return redact.StringWithoutMarkers(i)
388 1 : }
389 :
390 : // SafeFormat implements redact.SafeFormatter.
391 1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
392 1 : if i.Err != nil {
393 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
394 0 : return
395 0 : }
396 1 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
397 : }
398 :
399 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
400 : type ManifestDeleteInfo struct {
401 : // JobID is the ID of the job the caused the Manifest to be deleted.
402 : JobID int
403 : Path string
404 : FileNum base.DiskFileNum
405 : Err error
406 : }
407 :
408 1 : func (i ManifestDeleteInfo) String() string {
409 1 : return redact.StringWithoutMarkers(i)
410 1 : }
411 :
412 : // SafeFormat implements redact.SafeFormatter.
413 1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
414 1 : if i.Err != nil {
415 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
416 0 : return
417 0 : }
418 1 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
419 : }
420 :
421 : // TableCreateInfo contains the info for a table creation event.
422 : type TableCreateInfo struct {
423 : JobID int
424 : // Reason is the reason for the table creation: "compacting", "flushing", or
425 : // "ingesting".
426 : Reason string
427 : Path string
428 : FileNum base.DiskFileNum
429 : }
430 :
431 1 : func (i TableCreateInfo) String() string {
432 1 : return redact.StringWithoutMarkers(i)
433 1 : }
434 :
435 : // SafeFormat implements redact.SafeFormatter.
436 1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
437 1 : w.Printf("[JOB %d] %s: sstable created %s",
438 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
439 1 : }
440 :
441 : // TableDeleteInfo contains the info for a table deletion event.
442 : type TableDeleteInfo struct {
443 : JobID int
444 : Path string
445 : FileNum base.DiskFileNum
446 : Err error
447 : }
448 :
449 1 : func (i TableDeleteInfo) String() string {
450 1 : return redact.StringWithoutMarkers(i)
451 1 : }
452 :
453 : // SafeFormat implements redact.SafeFormatter.
454 1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
455 1 : if i.Err != nil {
456 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
457 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
458 0 : return
459 0 : }
460 1 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
461 : }
462 :
463 : // TableIngestInfo contains the info for a table ingestion event.
464 : type TableIngestInfo struct {
465 : // JobID is the ID of the job the caused the table to be ingested.
466 : JobID int
467 : Tables []struct {
468 : TableInfo
469 : Level int
470 : }
471 : // GlobalSeqNum is the sequence number that was assigned to all entries in
472 : // the ingested table.
473 : GlobalSeqNum base.SeqNum
474 : // flushable indicates whether the ingested sstable was treated as a
475 : // flushable.
476 : flushable bool
477 : Err error
478 : }
479 :
480 1 : func (i TableIngestInfo) String() string {
481 1 : return redact.StringWithoutMarkers(i)
482 1 : }
483 :
484 : // SafeFormat implements redact.SafeFormatter.
485 1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
486 1 : if i.Err != nil {
487 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
488 0 : return
489 0 : }
490 :
491 1 : if i.flushable {
492 1 : w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
493 1 : } else {
494 1 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
495 1 : }
496 :
497 1 : for j := range i.Tables {
498 1 : t := &i.Tables[j]
499 1 : if j > 0 {
500 1 : w.Printf(",")
501 1 : }
502 1 : levelStr := ""
503 1 : if !i.flushable {
504 1 : levelStr = fmt.Sprintf("L%d:", t.Level)
505 1 : }
506 1 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
507 1 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
508 : }
509 : }
510 :
511 : // TableStatsInfo contains the info for a table stats loaded event.
512 : type TableStatsInfo struct {
513 : // JobID is the ID of the job that finished loading the initial tables'
514 : // stats.
515 : JobID int
516 : }
517 :
518 1 : func (i TableStatsInfo) String() string {
519 1 : return redact.StringWithoutMarkers(i)
520 1 : }
521 :
522 : // SafeFormat implements redact.SafeFormatter.
523 1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
524 1 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
525 1 : }
526 :
527 : // TableValidatedInfo contains information on the result of a validation run
528 : // on an sstable.
529 : type TableValidatedInfo struct {
530 : JobID int
531 : Meta *tableMetadata
532 : }
533 :
534 1 : func (i TableValidatedInfo) String() string {
535 1 : return redact.StringWithoutMarkers(i)
536 1 : }
537 :
538 : // SafeFormat implements redact.SafeFormatter.
539 1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
540 1 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
541 1 : }
542 :
543 : // WALCreateInfo contains info about a WAL creation event.
544 : type WALCreateInfo struct {
545 : // JobID is the ID of the job the caused the WAL to be created.
546 : JobID int
547 : Path string
548 : // The file number of the new WAL.
549 : FileNum base.DiskFileNum
550 : // The file number of a previous WAL which was recycled to create this
551 : // one. Zero if recycling did not take place.
552 : RecycledFileNum base.DiskFileNum
553 : Err error
554 : }
555 :
556 1 : func (i WALCreateInfo) String() string {
557 1 : return redact.StringWithoutMarkers(i)
558 1 : }
559 :
560 : // SafeFormat implements redact.SafeFormatter.
561 1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
562 1 : if i.Err != nil {
563 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
564 0 : return
565 0 : }
566 :
567 1 : if i.RecycledFileNum == 0 {
568 1 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
569 1 : return
570 1 : }
571 :
572 1 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
573 1 : redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
574 : }
575 :
576 : // WALDeleteInfo contains the info for a WAL deletion event.
577 : //
578 : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
579 : // is insufficient to infer whether primary or secondary.
580 : type WALDeleteInfo struct {
581 : // JobID is the ID of the job the caused the WAL to be deleted.
582 : JobID int
583 : Path string
584 : FileNum base.DiskFileNum
585 : Err error
586 : }
587 :
588 1 : func (i WALDeleteInfo) String() string {
589 1 : return redact.StringWithoutMarkers(i)
590 1 : }
591 :
592 : // SafeFormat implements redact.SafeFormatter.
593 1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
594 1 : if i.Err != nil {
595 1 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
596 1 : return
597 1 : }
598 1 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
599 : }
600 :
601 : // WriteStallBeginInfo contains the info for a write stall begin event.
602 : type WriteStallBeginInfo struct {
603 : Reason string
604 : }
605 :
606 1 : func (i WriteStallBeginInfo) String() string {
607 1 : return redact.StringWithoutMarkers(i)
608 1 : }
609 :
610 : // SafeFormat implements redact.SafeFormatter.
611 1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
612 1 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
613 1 : }
614 :
615 : // LowDiskSpaceInfo contains the information for a LowDiskSpace
616 : // event.
617 : type LowDiskSpaceInfo struct {
618 : // AvailBytes is the disk space available to the current process in bytes.
619 : AvailBytes uint64
620 : // TotalBytes is the total disk space in bytes.
621 : TotalBytes uint64
622 : // PercentThreshold is one of a set of fixed percentages in the
623 : // lowDiskSpaceThresholds below. This event was issued because the disk
624 : // space went below this threshold.
625 : PercentThreshold int
626 : }
627 :
628 0 : func (i LowDiskSpaceInfo) String() string {
629 0 : return redact.StringWithoutMarkers(i)
630 0 : }
631 :
632 : // SafeFormat implements redact.SafeFormatter.
633 0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
634 0 : w.Printf(
635 0 : "available disk space under %d%% (%s of %s)",
636 0 : redact.Safe(i.PercentThreshold),
637 0 : redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
638 0 : redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
639 0 : )
640 0 : }
641 :
642 : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
643 : type PossibleAPIMisuseInfo struct {
644 : Kind APIMisuseKind
645 :
646 : // UserKey is set for the following kinds:
647 : // - IneffectualSingleDelete,
648 : // - NondeterministicSingleDelete,
649 : // - MissizedDelete.
650 : UserKey []byte
651 :
652 : // ExtraInfo is set for the following kinds:
653 : // - MissizedDelete: contains "elidedSize=<size>,expectedSize=<size>"
654 : ExtraInfo string
655 : }
656 :
657 1 : func (i PossibleAPIMisuseInfo) String() string {
658 1 : return redact.StringWithoutMarkers(i)
659 1 : }
660 :
661 : // SafeFormat implements redact.SafeFormatter.
662 1 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
663 1 : switch i.Kind {
664 1 : case IneffectualSingleDelete, NondeterministicSingleDelete:
665 1 : w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
666 1 : case MissizedDelete:
667 1 : w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, redact.Safe(i.ExtraInfo))
668 0 : default:
669 0 : if invariants.Enabled {
670 0 : panic("invalid API misuse event")
671 : }
672 0 : w.Printf("invalid API misuse event")
673 : }
674 : }
675 :
676 : // APIMisuseKind identifies the type of API misuse represented by a
677 : // PossibleAPIMisuse event.
678 : type APIMisuseKind int8
679 :
680 : const (
681 : // IneffectualSingleDelete is emitted in compactions/flushes if any
682 : // single delete is being elided without deleting a point set/merge.
683 : //
684 : // This event can sometimes be a false positive because of delete-only
685 : // compactions which can cause a recent RANGEDEL to peek below an older
686 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
687 : //
688 : // Example:
689 : // RANGEDEL [a, c)#10 in L0
690 : // SINGLEDEL b#5 in L1
691 : // SET b#3 in L6
692 : //
693 : // If the L6 file containing the SET is narrow and the L1 file containing
694 : // the SINGLEDEL is wide, a delete-only compaction can remove the file in
695 : // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
696 : // compacted down, it will not find any SET to delete, resulting in the
697 : // ineffectual callback.
698 : IneffectualSingleDelete APIMisuseKind = iota
699 :
700 : // NondeterministicSingleDelete is emitted in compactions/flushes if any
701 : // single delete has consumed a Set/Merge, and there is another immediately
702 : // older Set/SetWithDelete/Merge. The user of Pebble has violated the
703 : // invariant under which SingleDelete can be used correctly.
704 : //
705 : // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
706 : // ways some of these keys can first meet in a compaction.
707 : //
708 : // - All 3 keys in the same compaction: this callback will detect the
709 : // violation.
710 : //
711 : // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
712 : // disappear. The violation will not be detected, and the DB will have
713 : // Set#1 which is likely incorrect (from the user's perspective).
714 : //
715 : // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
716 : // which will later be consumed by SingleDelete#3. The violation will
717 : // not be detected and the DB will be correct.
718 : //
719 : // This event can sometimes be a false positive because of delete-only
720 : // compactions which can cause a recent RANGEDEL to peek below an older
721 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
722 : //
723 : // Example:
724 : // RANGEDEL [a, z)#60 in L0
725 : // SINGLEDEL g#50 in L1
726 : // SET g#40 in L2
727 : // RANGEDEL [g,h)#30 in L3
728 : // SET g#20 in L6
729 : //
730 : // In this example, the two SETs represent the same user write, and the
731 : // RANGEDELs are caused by the CockroachDB range being dropped. That is,
732 : // the user wrote to g once, range was dropped, then added back, which
733 : // caused the SET again, then at some point g was validly deleted using a
734 : // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
735 : // get fragmented due to compactions it has been part of. Say this L3 file
736 : // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
737 : // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
738 : // using a delete-only compaction, resulting in an LSM with state:
739 : //
740 : // RANGEDEL [a, z)#60 in L0
741 : // SINGLEDEL g#50 in L1
742 : // SET g#40 in L2
743 : // SET g#20 in L6
744 : //
745 : // A multi-level compaction involving L1, L2, L6 will cause the invariant
746 : // violation callback. This example doesn't need multi-level compactions:
747 : // say there was a Pebble snapshot at g#21 preventing g#20 from being
748 : // dropped when it meets g#40 in a compaction. That snapshot will not save
749 : // RANGEDEL [g,h)#30, so we can have:
750 : //
751 : // SINGLEDEL g#50 in L1
752 : // SET g#40, SET g#20 in L6
753 : //
754 : // And say the snapshot is removed and then the L1 and L6 compaction
755 : // happens, resulting in the invariant violation callback.
756 : NondeterministicSingleDelete
757 :
758 : // MissizedDelete is emitted when a DELSIZED tombstone is found that did
759 : // not accurately record the size of the value it deleted. This can lead to
760 : // incorrect behavior in compactions.
761 : MissizedDelete
762 : )
763 :
764 1 : func (k APIMisuseKind) String() string {
765 1 : switch k {
766 1 : case IneffectualSingleDelete:
767 1 : return "ineffectual SINGLEDEL"
768 0 : case NondeterministicSingleDelete:
769 0 : return "nondeterministic SINGLEDEL"
770 1 : case MissizedDelete:
771 1 : return "missized DELSIZED"
772 0 : default:
773 0 : return "unknown"
774 : }
775 : }
776 :
777 : // EventListener contains a set of functions that will be invoked when various
778 : // significant DB events occur. Note that the functions should not run for an
779 : // excessive amount of time as they are invoked synchronously by the DB and may
780 : // block continued DB work. For a similar reason it is advisable to not perform
781 : // any synchronous calls back into the DB.
782 : type EventListener struct {
783 : // BackgroundError is invoked whenever an error occurs during a background
784 : // operation such as flush or compaction.
785 : BackgroundError func(error)
786 :
787 : // BlobFileCreated is invoked after a blob file has been created.
788 : BlobFileCreated func(BlobFileCreateInfo)
789 :
790 : // BlobFileDeleted is invoked after a blob file has been deleted.
791 : BlobFileDeleted func(BlobFileDeleteInfo)
792 :
793 : // DataCorruption is invoked when an on-disk corruption is detected. It should
794 : // not block, as it is called synchronously in read paths.
795 : DataCorruption func(DataCorruptionInfo)
796 :
797 : // CompactionBegin is invoked after the inputs to a compaction have been
798 : // determined, but before the compaction has produced any output.
799 : CompactionBegin func(CompactionInfo)
800 :
801 : // CompactionEnd is invoked after a compaction has completed and the result
802 : // has been installed.
803 : CompactionEnd func(CompactionInfo)
804 :
805 : // DiskSlow is invoked after a disk write operation on a file created with a
806 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
807 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
808 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
809 : // MUST return without doing any IO, or blocking on anything (like a mutex)
810 : // that is waiting on IO. This is imperative in order to reliably monitor for
811 : // slowness, since if this goroutine gets stuck, the monitoring will stop
812 : // working.
813 : DiskSlow func(DiskSlowInfo)
814 :
815 : // FlushBegin is invoked after the inputs to a flush have been determined,
816 : // but before the flush has produced any output.
817 : FlushBegin func(FlushInfo)
818 :
819 : // FlushEnd is invoked after a flush has complated and the result has been
820 : // installed.
821 : FlushEnd func(FlushInfo)
822 :
823 : // DownloadBegin is invoked when a db.Download operation starts or restarts
824 : // (restarts are caused by new external tables being ingested during the
825 : // operation).
826 : DownloadBegin func(DownloadInfo)
827 :
828 : // DownloadEnd is invoked when a db.Download operation completes.
829 : DownloadEnd func(DownloadInfo)
830 :
831 : // FormatUpgrade is invoked after the database's FormatMajorVersion
832 : // is upgraded.
833 : FormatUpgrade func(FormatMajorVersion)
834 :
835 : // ManifestCreated is invoked after a manifest has been created.
836 : ManifestCreated func(ManifestCreateInfo)
837 :
838 : // ManifestDeleted is invoked after a manifest has been deleted.
839 : ManifestDeleted func(ManifestDeleteInfo)
840 :
841 : // TableCreated is invoked when a table has been created.
842 : TableCreated func(TableCreateInfo)
843 :
844 : // TableDeleted is invoked after a table has been deleted.
845 : TableDeleted func(TableDeleteInfo)
846 :
847 : // TableIngested is invoked after an externally created table has been
848 : // ingested via a call to DB.Ingest().
849 : TableIngested func(TableIngestInfo)
850 :
851 : // TableStatsLoaded is invoked at most once, when the table stats
852 : // collector has loaded statistics for all tables that existed at Open.
853 : TableStatsLoaded func(TableStatsInfo)
854 :
855 : // TableValidated is invoked after validation runs on an sstable.
856 : TableValidated func(TableValidatedInfo)
857 :
858 : // WALCreated is invoked after a WAL has been created.
859 : WALCreated func(WALCreateInfo)
860 :
861 : // WALDeleted is invoked after a WAL has been deleted.
862 : WALDeleted func(WALDeleteInfo)
863 :
864 : // WriteStallBegin is invoked when writes are intentionally delayed.
865 : WriteStallBegin func(WriteStallBeginInfo)
866 :
867 : // WriteStallEnd is invoked when delayed writes are released.
868 : WriteStallEnd func()
869 :
870 : // LowDiskSpace is invoked periodically when the disk space is running
871 : // low.
872 : LowDiskSpace func(LowDiskSpaceInfo)
873 :
874 : // PossibleAPIMisuse is invoked when a possible API misuse is detected.
875 : PossibleAPIMisuse func(PossibleAPIMisuseInfo)
876 : }
877 :
878 : // EnsureDefaults ensures that background error events are logged to the
879 : // specified logger if a handler for those events hasn't been otherwise
880 : // specified. Ensure all handlers are non-nil so that we don't have to check
881 : // for nil-ness before invoking.
882 1 : func (l *EventListener) EnsureDefaults(logger Logger) {
883 1 : if l.BackgroundError == nil {
884 1 : if logger != nil {
885 1 : l.BackgroundError = func(err error) {
886 1 : logger.Errorf("background error: %s", err)
887 1 : }
888 1 : } else {
889 1 : l.BackgroundError = func(error) {}
890 : }
891 : }
892 1 : if l.BlobFileCreated == nil {
893 1 : l.BlobFileCreated = func(info BlobFileCreateInfo) {}
894 : }
895 1 : if l.BlobFileDeleted == nil {
896 1 : l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
897 : }
898 1 : if l.DataCorruption == nil {
899 1 : if logger != nil {
900 1 : l.DataCorruption = func(info DataCorruptionInfo) {
901 1 : logger.Fatalf("%s", info)
902 1 : }
903 1 : } else {
904 1 : l.DataCorruption = func(info DataCorruptionInfo) {}
905 : }
906 : }
907 1 : if l.CompactionBegin == nil {
908 1 : l.CompactionBegin = func(info CompactionInfo) {}
909 : }
910 1 : if l.CompactionEnd == nil {
911 1 : l.CompactionEnd = func(info CompactionInfo) {}
912 : }
913 1 : if l.DiskSlow == nil {
914 1 : l.DiskSlow = func(info DiskSlowInfo) {}
915 : }
916 1 : if l.FlushBegin == nil {
917 1 : l.FlushBegin = func(info FlushInfo) {}
918 : }
919 1 : if l.FlushEnd == nil {
920 1 : l.FlushEnd = func(info FlushInfo) {}
921 : }
922 1 : if l.DownloadBegin == nil {
923 1 : l.DownloadBegin = func(info DownloadInfo) {}
924 : }
925 1 : if l.DownloadEnd == nil {
926 1 : l.DownloadEnd = func(info DownloadInfo) {}
927 : }
928 1 : if l.FormatUpgrade == nil {
929 1 : l.FormatUpgrade = func(v FormatMajorVersion) {}
930 : }
931 1 : if l.ManifestCreated == nil {
932 1 : l.ManifestCreated = func(info ManifestCreateInfo) {}
933 : }
934 1 : if l.ManifestDeleted == nil {
935 1 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
936 : }
937 1 : if l.TableCreated == nil {
938 1 : l.TableCreated = func(info TableCreateInfo) {}
939 : }
940 1 : if l.TableDeleted == nil {
941 1 : l.TableDeleted = func(info TableDeleteInfo) {}
942 : }
943 1 : if l.TableIngested == nil {
944 1 : l.TableIngested = func(info TableIngestInfo) {}
945 : }
946 1 : if l.TableStatsLoaded == nil {
947 1 : l.TableStatsLoaded = func(info TableStatsInfo) {}
948 : }
949 1 : if l.TableValidated == nil {
950 1 : l.TableValidated = func(validated TableValidatedInfo) {}
951 : }
952 1 : if l.WALCreated == nil {
953 1 : l.WALCreated = func(info WALCreateInfo) {}
954 : }
955 1 : if l.WALDeleted == nil {
956 1 : l.WALDeleted = func(info WALDeleteInfo) {}
957 : }
958 1 : if l.WriteStallBegin == nil {
959 1 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
960 : }
961 1 : if l.WriteStallEnd == nil {
962 1 : l.WriteStallEnd = func() {}
963 : }
964 1 : if l.LowDiskSpace == nil {
965 1 : l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
966 : }
967 1 : if l.PossibleAPIMisuse == nil {
968 1 : l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
969 : }
970 : }
971 :
972 : // MakeLoggingEventListener creates an EventListener that logs all events to the
973 : // specified logger.
974 1 : func MakeLoggingEventListener(logger Logger) EventListener {
975 1 : if logger == nil {
976 1 : logger = DefaultLogger
977 1 : }
978 :
979 1 : return EventListener{
980 1 : BackgroundError: func(err error) {
981 1 : logger.Errorf("background error: %s", err)
982 1 : },
983 1 : BlobFileCreated: func(info BlobFileCreateInfo) {
984 1 : logger.Infof("%s", info)
985 1 : },
986 1 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
987 1 : logger.Infof("%s", info)
988 1 : },
989 0 : DataCorruption: func(info DataCorruptionInfo) {
990 0 : logger.Errorf("%s", info)
991 0 : },
992 1 : CompactionBegin: func(info CompactionInfo) {
993 1 : logger.Infof("%s", info)
994 1 : },
995 1 : CompactionEnd: func(info CompactionInfo) {
996 1 : logger.Infof("%s", info)
997 1 : },
998 0 : DiskSlow: func(info DiskSlowInfo) {
999 0 : logger.Infof("%s", info)
1000 0 : },
1001 1 : FlushBegin: func(info FlushInfo) {
1002 1 : logger.Infof("%s", info)
1003 1 : },
1004 1 : FlushEnd: func(info FlushInfo) {
1005 1 : logger.Infof("%s", info)
1006 1 : },
1007 1 : DownloadBegin: func(info DownloadInfo) {
1008 1 : logger.Infof("%s", info)
1009 1 : },
1010 1 : DownloadEnd: func(info DownloadInfo) {
1011 1 : logger.Infof("%s", info)
1012 1 : },
1013 1 : FormatUpgrade: func(v FormatMajorVersion) {
1014 1 : logger.Infof("upgraded to format version: %s", v)
1015 1 : },
1016 1 : ManifestCreated: func(info ManifestCreateInfo) {
1017 1 : logger.Infof("%s", info)
1018 1 : },
1019 1 : ManifestDeleted: func(info ManifestDeleteInfo) {
1020 1 : logger.Infof("%s", info)
1021 1 : },
1022 1 : TableCreated: func(info TableCreateInfo) {
1023 1 : logger.Infof("%s", info)
1024 1 : },
1025 1 : TableDeleted: func(info TableDeleteInfo) {
1026 1 : logger.Infof("%s", info)
1027 1 : },
1028 1 : TableIngested: func(info TableIngestInfo) {
1029 1 : logger.Infof("%s", info)
1030 1 : },
1031 1 : TableStatsLoaded: func(info TableStatsInfo) {
1032 1 : logger.Infof("%s", info)
1033 1 : },
1034 1 : TableValidated: func(info TableValidatedInfo) {
1035 1 : logger.Infof("%s", info)
1036 1 : },
1037 1 : WALCreated: func(info WALCreateInfo) {
1038 1 : logger.Infof("%s", info)
1039 1 : },
1040 1 : WALDeleted: func(info WALDeleteInfo) {
1041 1 : logger.Infof("%s", info)
1042 1 : },
1043 1 : WriteStallBegin: func(info WriteStallBeginInfo) {
1044 1 : logger.Infof("%s", info)
1045 1 : },
1046 1 : WriteStallEnd: func() {
1047 1 : logger.Infof("write stall ending")
1048 1 : },
1049 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1050 0 : logger.Infof("%s", info)
1051 0 : },
1052 1 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1053 1 : logger.Infof("%s", info)
1054 1 : },
1055 : }
1056 : }
1057 :
1058 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
1059 1 : func TeeEventListener(a, b EventListener) EventListener {
1060 1 : a.EnsureDefaults(nil)
1061 1 : b.EnsureDefaults(nil)
1062 1 : return EventListener{
1063 1 : BackgroundError: func(err error) {
1064 0 : a.BackgroundError(err)
1065 0 : b.BackgroundError(err)
1066 0 : },
1067 0 : BlobFileCreated: func(info BlobFileCreateInfo) {
1068 0 : a.BlobFileCreated(info)
1069 0 : b.BlobFileCreated(info)
1070 0 : },
1071 1 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
1072 1 : a.BlobFileDeleted(info)
1073 1 : b.BlobFileDeleted(info)
1074 1 : },
1075 0 : DataCorruption: func(info DataCorruptionInfo) {
1076 0 : a.DataCorruption(info)
1077 0 : b.DataCorruption(info)
1078 0 : },
1079 1 : CompactionBegin: func(info CompactionInfo) {
1080 1 : a.CompactionBegin(info)
1081 1 : b.CompactionBegin(info)
1082 1 : },
1083 1 : CompactionEnd: func(info CompactionInfo) {
1084 1 : a.CompactionEnd(info)
1085 1 : b.CompactionEnd(info)
1086 1 : },
1087 0 : DiskSlow: func(info DiskSlowInfo) {
1088 0 : a.DiskSlow(info)
1089 0 : b.DiskSlow(info)
1090 0 : },
1091 1 : FlushBegin: func(info FlushInfo) {
1092 1 : a.FlushBegin(info)
1093 1 : b.FlushBegin(info)
1094 1 : },
1095 1 : FlushEnd: func(info FlushInfo) {
1096 1 : a.FlushEnd(info)
1097 1 : b.FlushEnd(info)
1098 1 : },
1099 0 : DownloadBegin: func(info DownloadInfo) {
1100 0 : a.DownloadBegin(info)
1101 0 : b.DownloadBegin(info)
1102 0 : },
1103 0 : DownloadEnd: func(info DownloadInfo) {
1104 0 : a.DownloadEnd(info)
1105 0 : b.DownloadEnd(info)
1106 0 : },
1107 0 : FormatUpgrade: func(v FormatMajorVersion) {
1108 0 : a.FormatUpgrade(v)
1109 0 : b.FormatUpgrade(v)
1110 0 : },
1111 1 : ManifestCreated: func(info ManifestCreateInfo) {
1112 1 : a.ManifestCreated(info)
1113 1 : b.ManifestCreated(info)
1114 1 : },
1115 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
1116 0 : a.ManifestDeleted(info)
1117 0 : b.ManifestDeleted(info)
1118 0 : },
1119 1 : TableCreated: func(info TableCreateInfo) {
1120 1 : a.TableCreated(info)
1121 1 : b.TableCreated(info)
1122 1 : },
1123 1 : TableDeleted: func(info TableDeleteInfo) {
1124 1 : a.TableDeleted(info)
1125 1 : b.TableDeleted(info)
1126 1 : },
1127 1 : TableIngested: func(info TableIngestInfo) {
1128 1 : a.TableIngested(info)
1129 1 : b.TableIngested(info)
1130 1 : },
1131 1 : TableStatsLoaded: func(info TableStatsInfo) {
1132 1 : a.TableStatsLoaded(info)
1133 1 : b.TableStatsLoaded(info)
1134 1 : },
1135 0 : TableValidated: func(info TableValidatedInfo) {
1136 0 : a.TableValidated(info)
1137 0 : b.TableValidated(info)
1138 0 : },
1139 1 : WALCreated: func(info WALCreateInfo) {
1140 1 : a.WALCreated(info)
1141 1 : b.WALCreated(info)
1142 1 : },
1143 1 : WALDeleted: func(info WALDeleteInfo) {
1144 1 : a.WALDeleted(info)
1145 1 : b.WALDeleted(info)
1146 1 : },
1147 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
1148 0 : a.WriteStallBegin(info)
1149 0 : b.WriteStallBegin(info)
1150 0 : },
1151 0 : WriteStallEnd: func() {
1152 0 : a.WriteStallEnd()
1153 0 : b.WriteStallEnd()
1154 0 : },
1155 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1156 0 : a.LowDiskSpace(info)
1157 0 : b.LowDiskSpace(info)
1158 0 : },
1159 0 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1160 0 : a.PossibleAPIMisuse(info)
1161 0 : b.PossibleAPIMisuse(info)
1162 0 : },
1163 : }
1164 : }
1165 :
1166 : // lowDiskSpaceReporter contains the logic to report low disk space events.
1167 : // Report is called whenever we get the disk usage statistics.
1168 : //
1169 : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
1170 : // whenever we reach a new threshold. We periodically repost the event every 30
1171 : // minutes until we are above all thresholds.
1172 : type lowDiskSpaceReporter struct {
1173 : mu struct {
1174 : sync.Mutex
1175 : lastNoticeThreshold int
1176 : lastNoticeTime crtime.Mono
1177 : }
1178 : }
1179 :
1180 : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
1181 :
1182 : const lowDiskSpaceFrequency = 30 * time.Minute
1183 :
1184 1 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
1185 1 : threshold, ok := r.findThreshold(availBytes, totalBytes)
1186 1 : if !ok {
1187 1 : // Normal path.
1188 1 : return
1189 1 : }
1190 1 : if r.shouldReport(threshold, crtime.NowMono()) {
1191 1 : el.LowDiskSpace(LowDiskSpaceInfo{
1192 1 : AvailBytes: availBytes,
1193 1 : TotalBytes: totalBytes,
1194 1 : PercentThreshold: threshold,
1195 1 : })
1196 1 : }
1197 : }
1198 :
1199 : // shouldReport returns true if we should report an event. Updates
1200 : // lastNoticeTime/lastNoticeThreshold appropriately.
1201 1 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
1202 1 : r.mu.Lock()
1203 1 : defer r.mu.Unlock()
1204 1 : if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
1205 1 : now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
1206 1 : r.mu.lastNoticeThreshold = threshold
1207 1 : r.mu.lastNoticeTime = now
1208 1 : return true
1209 1 : }
1210 1 : return false
1211 : }
1212 :
1213 : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
1214 : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
1215 : // there is more free space than the highest threshold).
1216 : func (r *lowDiskSpaceReporter) findThreshold(
1217 : availBytes, totalBytes uint64,
1218 1 : ) (threshold int, ok bool) {
1219 1 : // Note: in the normal path, we exit the loop during the first iteration.
1220 1 : for i, t := range lowDiskSpaceThresholds {
1221 1 : if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
1222 1 : break
1223 : }
1224 1 : threshold = t
1225 1 : ok = true
1226 : }
1227 1 : return threshold, ok
1228 : }
1229 :
1230 : // reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
1231 : // to the event listener and also adds a DataCorruptionInfo payload to the error.
1232 1 : func (d *DB) reportCorruption(meta any, err error) error {
1233 1 : switch meta := meta.(type) {
1234 1 : case *manifest.TableMetadata:
1235 1 : return d.reportFileCorruption(base.FileTypeTable, meta.TableBacking.DiskFileNum, meta.UserKeyBounds(), err)
1236 0 : case *manifest.BlobFileMetadata:
1237 0 : // TODO(jackson): Add bounds for blob files.
1238 0 : return d.reportFileCorruption(base.FileTypeBlob, meta.FileNum, base.UserKeyBounds{}, err)
1239 0 : default:
1240 0 : panic(fmt.Sprintf("unknown metadata type: %T", meta))
1241 : }
1242 : }
1243 :
1244 : func (d *DB) reportFileCorruption(
1245 : fileType base.FileType, fileNum base.DiskFileNum, userKeyBounds base.UserKeyBounds, err error,
1246 1 : ) error {
1247 1 : if invariants.Enabled && !IsCorruptionError(err) {
1248 0 : panic("not a corruption error")
1249 : }
1250 :
1251 1 : objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
1252 1 : if lookupErr != nil {
1253 1 : // If the object is not known to the provider, it must be a local object
1254 1 : // that was missing when we opened the store. Remote objects have their
1255 1 : // metadata in a catalog, so even if the backing object is deleted, the
1256 1 : // DiskFileNum would still be known.
1257 1 : objMeta = objstorage.ObjectMetadata{DiskFileNum: fileNum, FileType: fileType}
1258 1 : }
1259 1 : path := d.objProvider.Path(objMeta)
1260 1 : if objMeta.IsRemote() {
1261 1 : // Remote path (which include the locator and full path) might not always be
1262 1 : // safe.
1263 1 : err = errors.WithHintf(err, "path: %s", path)
1264 1 : } else {
1265 1 : // Local paths are safe: they start with the store directory and the
1266 1 : // filename is generated by Pebble.
1267 1 : err = errors.WithHintf(err, "path: %s", redact.Safe(path))
1268 1 : }
1269 1 : info := DataCorruptionInfo{
1270 1 : Path: path,
1271 1 : IsRemote: objMeta.IsRemote(),
1272 1 : Locator: objMeta.Remote.Locator,
1273 1 : Bounds: userKeyBounds,
1274 1 : Details: err,
1275 1 : }
1276 1 : d.opts.EventListener.DataCorruption(info)
1277 1 : // We don't use errors.Join() because that also annotates with this stack
1278 1 : // trace which would not be useful.
1279 1 : return errorsjoin.Join(err, &corruptionDetailError{info: info})
1280 : }
1281 :
1282 : type corruptionDetailError struct {
1283 : info DataCorruptionInfo
1284 : }
1285 :
1286 1 : func (e *corruptionDetailError) Error() string {
1287 1 : return "<corruption detail carrier>"
1288 1 : }
1289 :
1290 : // ExtractDataCorruptionInfo extracts the DataCorruptionInfo details from a
1291 : // corruption error. Returns nil if there is no such detail.
1292 1 : func ExtractDataCorruptionInfo(err error) *DataCorruptionInfo {
1293 1 : var e *corruptionDetailError
1294 1 : if errors.As(err, &e) {
1295 1 : return &e.info
1296 1 : }
1297 0 : return nil
1298 : }
|