Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crhumanize"
14 : "github.com/cockroachdb/crlib/crtime"
15 : "github.com/cockroachdb/errors"
16 : errorsjoin "github.com/cockroachdb/errors/join"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/humanize"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/manifest"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/objstorage/remote"
23 : "github.com/cockroachdb/pebble/vfs"
24 : "github.com/cockroachdb/redact"
25 : )
26 :
27 : // TableNum is an identifier for a table within a database.
28 : type TableNum = base.TableNum
29 :
30 : // TableInfo exports the manifest.TableInfo type.
31 : type TableInfo = manifest.TableInfo
32 :
33 1 : func tablesTotalSize(tables []TableInfo) uint64 {
34 1 : var size uint64
35 1 : for i := range tables {
36 1 : size += tables[i].Size
37 1 : }
38 1 : return size
39 : }
40 :
41 1 : func formatFileNums(tables []TableInfo) string {
42 1 : var buf strings.Builder
43 1 : for i := range tables {
44 1 : if i > 0 {
45 1 : buf.WriteString(" ")
46 1 : }
47 1 : buf.WriteString(tables[i].FileNum.String())
48 : }
49 1 : return buf.String()
50 : }
51 :
52 : // DataCorruptionInfo contains the information for a DataCorruption event.
53 : type DataCorruptionInfo struct {
54 : // Path of the file that is corrupted. For remote files the path starts with
55 : // "remote://".
56 : Path string
57 : IsRemote bool
58 : // Locator is only set when IsRemote is true (note that an empty Locator is
59 : // valid even then).
60 : Locator remote.Locator
61 : // Bounds indicates the keyspace range that is affected.
62 : Bounds base.UserKeyBounds
63 : // Details of the error. See cockroachdb/error for how to format with or
64 : // without redaction.
65 : Details error
66 : }
67 :
68 1 : func (i DataCorruptionInfo) String() string {
69 1 : return redact.StringWithoutMarkers(i)
70 1 : }
71 :
72 : // SafeFormat implements redact.SafeFormatter.
73 1 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
74 1 : w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
75 1 : if i.IsRemote {
76 1 : w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
77 1 : }
78 1 : w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
79 : }
80 :
81 : // LevelInfo contains info pertaining to a particular level.
82 : type LevelInfo struct {
83 : Level int
84 : Tables []TableInfo
85 : Blobs []BlobFileInfo
86 : Score float64
87 : }
88 :
89 0 : func (i LevelInfo) String() string {
90 0 : return redact.StringWithoutMarkers(i)
91 0 : }
92 :
93 : // SafeFormat implements redact.SafeFormatter.
94 1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
95 1 : blobInfo := redact.SafeString("")
96 1 : if len(i.Blobs) > 0 {
97 0 : pluralBlob := redact.SafeString("s")
98 0 : if len(i.Blobs) == 1 {
99 0 : pluralBlob = ""
100 0 : }
101 0 : blobInfo = redact.SafeString(fmt.Sprintf(" blob%s [%s] (%s)",
102 0 : pluralBlob, formatBlobFileNums(i.Blobs), humanize.Bytes.Uint64(blobsTotalSize(i.Blobs))))
103 : }
104 1 : w.Printf("L%d [%s] (%s)%s Score=%.2f",
105 1 : redact.Safe(i.Level),
106 1 : redact.Safe(formatFileNums(i.Tables)),
107 1 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
108 1 : blobInfo,
109 1 : redact.Safe(i.Score))
110 : }
111 :
112 : // BlobFileCreateInfo contains the info for a blob file creation event.
113 : type BlobFileCreateInfo struct {
114 : JobID int
115 : // Reason is the reason for the table creation: "compacting", "flushing", or
116 : // "ingesting".
117 : Reason string
118 : Path string
119 : FileNum base.DiskFileNum
120 : }
121 :
122 1 : func (i BlobFileCreateInfo) String() string {
123 1 : return redact.StringWithoutMarkers(i)
124 1 : }
125 :
126 : // SafeFormat implements redact.SafeFormatter.
127 1 : func (i BlobFileCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
128 1 : w.Printf("[JOB %d] %s: blob file created %s",
129 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
130 1 : }
131 :
132 : // BlobFileDeleteInfo contains the info for a blob file deletion event.
133 : type BlobFileDeleteInfo struct {
134 : JobID int
135 : Path string
136 : FileNum base.DiskFileNum
137 : Err error
138 : }
139 :
140 1 : func (i BlobFileDeleteInfo) String() string {
141 1 : return redact.StringWithoutMarkers(i)
142 1 : }
143 :
144 : // SafeFormat implements redact.SafeFormatter.
145 1 : func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
146 1 : if i.Err != nil {
147 0 : w.Printf("[JOB %d] blob file delete error %s: %s",
148 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
149 0 : return
150 0 : }
151 1 : w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
152 : }
153 :
154 : // BlobFileRewriteInfo contains the info for a blob file rewrite event.
155 : type BlobFileRewriteInfo struct {
156 : // JobID is the ID of the job.
157 : JobID int
158 : // Input contains the input tables for the compaction organized by level.
159 : Input BlobFileInfo
160 : // Output contains the output tables generated by the compaction. The output
161 : // info is empty for the compaction begin event.
162 : Output BlobFileInfo
163 : // Duration is the time spent compacting, including reading and writing
164 : // files.
165 : Duration time.Duration
166 : // TotalDuration is the total wall-time duration of the compaction,
167 : // including applying the compaction to the database. TotalDuration is
168 : // always ≥ Duration.
169 : TotalDuration time.Duration
170 : Done bool
171 : // Err is set only if Done is true. If non-nil, indicates that the compaction
172 : // failed. Note that err can be ErrCancelledCompaction, which can happen
173 : // during normal operation.
174 : Err error
175 : }
176 :
177 1 : func (i BlobFileRewriteInfo) String() string {
178 1 : return redact.StringWithoutMarkers(i)
179 1 : }
180 :
181 : // SafeFormat implements redact.SafeFormatter.
182 1 : func (i BlobFileRewriteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
183 1 : if i.Err != nil {
184 0 : w.Printf("[JOB %d] blob file (%s, %s) rewrite error: %s",
185 0 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum, i.Err)
186 0 : return
187 0 : }
188 :
189 1 : if !i.Done {
190 1 : w.Printf("[JOB %d] rewriting blob file %s (physical file %s)",
191 1 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum)
192 1 : return
193 1 : }
194 1 : w.Printf("[JOB %d] rewrote blob file (%s, %s) (%s) -> (%s, %s) (%s), in %.1fs (%.1fs total)",
195 1 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum,
196 1 : redact.Safe(humanize.Bytes.Uint64(i.Input.Size)),
197 1 : i.Output.BlobFileID, i.Output.DiskFileNum,
198 1 : redact.Safe(humanize.Bytes.Uint64(i.Output.Size)),
199 1 : redact.Safe(i.Duration.Seconds()),
200 1 : redact.Safe(i.TotalDuration.Seconds()))
201 : }
202 :
203 : // BlobFileInfo describes a blob file.
204 : type BlobFileInfo struct {
205 : // BlobFileID is the logical ID of the blob file.
206 : BlobFileID base.BlobFileID
207 : // DiskFileNum is the file number of the blob file on disk.
208 : DiskFileNum base.DiskFileNum
209 : // Size is the physical size of the file in bytes.
210 : Size uint64
211 : // ValueSize is the pre-compressed size of the values in the blob file in
212 : // bytes.
213 : ValueSize uint64
214 : // MVCCGarbageSize is the pre-compressed size of potential MVCC garbage in '
215 : // the blob file in bytes.
216 : MVCCGarbageSize uint64
217 : }
218 :
219 1 : func blobsTotalSize(blobs []BlobFileInfo) uint64 {
220 1 : var size uint64
221 1 : for i := range blobs {
222 1 : size += blobs[i].Size
223 1 : }
224 1 : return size
225 : }
226 :
227 1 : func formatBlobFileNums(blobs []BlobFileInfo) string {
228 1 : var buf strings.Builder
229 1 : for i, blob := range blobs {
230 1 : if i > 0 {
231 1 : buf.WriteString(" ")
232 1 : }
233 1 : buf.WriteString(blob.DiskFileNum.String())
234 1 : if blob.MVCCGarbageSize > 0 {
235 1 : buf.WriteString(fmt.Sprintf(" (MVCCGarbage: %s)",
236 1 : redact.Safe(crhumanize.Percent(blob.MVCCGarbageSize, blob.ValueSize))))
237 1 : }
238 : }
239 1 : return buf.String()
240 : }
241 :
242 : // CompactionInfo contains the info for a compaction event.
243 : type CompactionInfo struct {
244 : // JobID is the ID of the compaction job.
245 : JobID int
246 : // Reason is the reason for the compaction.
247 : Reason string
248 : // Input contains the input tables for the compaction organized by level.
249 : Input []LevelInfo
250 : // Output contains the output tables generated by the compaction. The output
251 : // tables are empty for the compaction begin event.
252 : Output LevelInfo
253 : // Duration is the time spent compacting, including reading and writing
254 : // sstables.
255 : Duration time.Duration
256 : // TotalDuration is the total wall-time duration of the compaction,
257 : // including applying the compaction to the database. TotalDuration is
258 : // always ≥ Duration.
259 : TotalDuration time.Duration
260 : Done bool
261 : // Err is set only if Done is true. If non-nil, indicates that the compaction
262 : // failed. Note that err can be ErrCancelledCompaction, which can happen
263 : // during normal operation.
264 : Err error
265 :
266 : SingleLevelOverlappingRatio float64
267 : MultiLevelOverlappingRatio float64
268 :
269 : // Annotations specifies additional info to appear in a compaction's event log line
270 : Annotations compactionAnnotations
271 : }
272 :
273 : type compactionAnnotations []string
274 :
275 : // SafeFormat implements redact.SafeFormatter.
276 1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
277 1 : if len(ca) == 0 {
278 0 : return
279 0 : }
280 1 : for i := range ca {
281 1 : if i != 0 {
282 1 : w.Print(" ")
283 1 : }
284 1 : w.Printf("%s", redact.SafeString(ca[i]))
285 : }
286 : }
287 :
288 1 : func (i CompactionInfo) String() string {
289 1 : return redact.StringWithoutMarkers(i)
290 1 : }
291 :
292 : // SafeFormat implements redact.SafeFormatter.
293 1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
294 1 : if i.Err != nil {
295 1 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
296 1 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
297 1 : return
298 1 : }
299 :
300 1 : if !i.Done {
301 1 : w.Printf("[JOB %d] compacting(%s) ",
302 1 : redact.Safe(i.JobID),
303 1 : redact.SafeString(i.Reason))
304 1 : if len(i.Annotations) > 0 {
305 1 : w.Printf("%s ", i.Annotations)
306 1 : }
307 1 : w.Printf("%s; ", levelInfos(i.Input))
308 1 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
309 1 : return
310 : }
311 1 : outputSize := tablesTotalSize(i.Output.Tables)
312 1 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
313 1 : if len(i.Annotations) > 0 {
314 1 : w.Printf("%s ", i.Annotations)
315 1 : }
316 1 : w.Print(levelInfos(i.Input))
317 1 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
318 1 : redact.Safe(i.Output.Level),
319 1 : redact.Safe(formatFileNums(i.Output.Tables)),
320 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
321 1 : redact.Safe(i.Duration.Seconds()),
322 1 : redact.Safe(i.TotalDuration.Seconds()),
323 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
324 : }
325 :
326 : type levelInfos []LevelInfo
327 :
328 1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
329 1 : for j, levelInfo := range i {
330 1 : if j > 0 {
331 1 : w.Printf(" + ")
332 1 : }
333 1 : w.Print(levelInfo)
334 : }
335 : }
336 :
337 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
338 : // file.
339 : type DiskSlowInfo = vfs.DiskSlowInfo
340 :
341 : // FlushInfo contains the info for a flush event.
342 : type FlushInfo struct {
343 : // JobID is the ID of the flush job.
344 : JobID int
345 : // Reason is the reason for the flush.
346 : Reason string
347 : // Input contains the count of input memtables that were flushed.
348 : Input int
349 : // InputBytes contains the total in-memory size of the memtable(s) that were
350 : // flushed. This size includes skiplist indexing data structures.
351 : InputBytes uint64
352 : // OutputTables contains the output table generated by the flush. The output info
353 : // is empty for the flush begin event.
354 : OutputTables []TableInfo
355 : // OutputBlobs contains the output table generated by the flush. The output info
356 : // is empty for the flush begin event.
357 : OutputBlobs []BlobFileInfo
358 : // Duration is the time spent flushing. This duration includes writing and
359 : // syncing all of the flushed keys to sstables.
360 : Duration time.Duration
361 : // TotalDuration is the total wall-time duration of the flush, including
362 : // applying the flush to the database. TotalDuration is always ≥ Duration.
363 : TotalDuration time.Duration
364 : // Ingest is set to true if the flush is handling tables that were added to
365 : // the flushable queue via an ingestion operation.
366 : Ingest bool
367 : // IngestLevels are the output levels for each ingested table in the flush.
368 : // This field is only populated when Ingest is true.
369 : IngestLevels []int
370 : Done bool
371 : Err error
372 : }
373 :
374 1 : func (i FlushInfo) String() string {
375 1 : return redact.StringWithoutMarkers(i)
376 1 : }
377 :
378 : // SafeFormat implements redact.SafeFormatter.
379 1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
380 1 : if i.Err != nil {
381 1 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
382 1 : return
383 1 : }
384 :
385 1 : plural := redact.SafeString("s")
386 1 : if i.Input == 1 {
387 1 : plural = ""
388 1 : }
389 1 : blobInfo := redact.SafeString("")
390 1 : if len(i.OutputBlobs) > 0 {
391 1 : pluralBlob := redact.SafeString("s")
392 1 : if len(i.OutputBlobs) == 1 {
393 1 : pluralBlob = ""
394 1 : }
395 1 : blobInfo = redact.SafeString(fmt.Sprintf(" blob%s [%s] (%s)",
396 1 : pluralBlob, formatBlobFileNums(i.OutputBlobs), humanize.Bytes.Uint64(blobsTotalSize(i.OutputBlobs))))
397 : }
398 1 : if !i.Done {
399 1 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
400 1 : if !i.Ingest {
401 1 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
402 1 : w.SafeString(plural)
403 1 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
404 1 : } else {
405 1 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
406 1 : }
407 1 : return
408 : }
409 :
410 1 : outputSize := tablesTotalSize(i.OutputTables)
411 1 : if !i.Ingest {
412 1 : if invariants.Enabled && len(i.IngestLevels) > 0 {
413 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
414 : }
415 1 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s)%s, in %.1fs (%.1fs total), output rate %s/s",
416 1 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
417 1 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
418 1 : redact.Safe(formatFileNums(i.OutputTables)),
419 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
420 1 : blobInfo,
421 1 : redact.Safe(i.Duration.Seconds()),
422 1 : redact.Safe(i.TotalDuration.Seconds()),
423 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
424 1 : } else {
425 1 : if invariants.Enabled && len(i.IngestLevels) == 0 {
426 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
427 : }
428 1 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
429 1 : redact.Safe(i.JobID), redact.Safe(len(i.OutputTables)), plural)
430 1 : for j, level := range i.IngestLevels {
431 1 : file := i.OutputTables[j]
432 1 : if j > 0 {
433 1 : w.Printf(" +")
434 1 : }
435 1 : w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
436 : }
437 1 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
438 1 : redact.Safe(i.Duration.Seconds()),
439 1 : redact.Safe(i.TotalDuration.Seconds()),
440 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
441 : }
442 : }
443 :
444 : // DownloadInfo contains the info for a DB.Download() event.
445 : type DownloadInfo struct {
446 : // JobID is the ID of the download job.
447 : JobID int
448 :
449 : Spans []DownloadSpan
450 :
451 : // Duration is the time since the operation was started.
452 : Duration time.Duration
453 : DownloadCompactionsLaunched int
454 :
455 : // RestartCount indicates that the download operation restarted because it
456 : // noticed that new external files were ingested. A DownloadBegin event with
457 : // RestartCount = 0 is the start of the operation; each time we restart it we
458 : // have another DownloadBegin event with RestartCount > 0.
459 : RestartCount int
460 : Done bool
461 : Err error
462 : }
463 :
464 1 : func (i DownloadInfo) String() string {
465 1 : return redact.StringWithoutMarkers(i)
466 1 : }
467 :
468 : // SafeFormat implements redact.SafeFormatter.
469 1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
470 1 : switch {
471 0 : case i.Err != nil:
472 0 : w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
473 :
474 1 : case i.Done:
475 1 : w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
476 1 : redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
477 :
478 1 : default:
479 1 : if i.RestartCount == 0 {
480 1 : w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
481 1 : } else {
482 0 : w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
483 0 : redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
484 0 : redact.Safe(i.DownloadCompactionsLaunched))
485 0 : }
486 : }
487 : }
488 :
489 : // ManifestCreateInfo contains info about a manifest creation event.
490 : type ManifestCreateInfo struct {
491 : // JobID is the ID of the job the caused the manifest to be created.
492 : JobID int
493 : Path string
494 : // The file number of the new Manifest.
495 : FileNum base.DiskFileNum
496 : Err error
497 : }
498 :
499 1 : func (i ManifestCreateInfo) String() string {
500 1 : return redact.StringWithoutMarkers(i)
501 1 : }
502 :
503 : // SafeFormat implements redact.SafeFormatter.
504 1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
505 1 : if i.Err != nil {
506 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
507 0 : return
508 0 : }
509 1 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
510 : }
511 :
512 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
513 : type ManifestDeleteInfo struct {
514 : // JobID is the ID of the job the caused the Manifest to be deleted.
515 : JobID int
516 : Path string
517 : FileNum base.DiskFileNum
518 : Err error
519 : }
520 :
521 1 : func (i ManifestDeleteInfo) String() string {
522 1 : return redact.StringWithoutMarkers(i)
523 1 : }
524 :
525 : // SafeFormat implements redact.SafeFormatter.
526 1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
527 1 : if i.Err != nil {
528 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
529 0 : return
530 0 : }
531 1 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
532 : }
533 :
534 : // TableCreateInfo contains the info for a table creation event.
535 : type TableCreateInfo struct {
536 : JobID int
537 : // Reason is the reason for the table creation: "compacting", "flushing", or
538 : // "ingesting".
539 : Reason string
540 : Path string
541 : FileNum base.DiskFileNum
542 : }
543 :
544 1 : func (i TableCreateInfo) String() string {
545 1 : return redact.StringWithoutMarkers(i)
546 1 : }
547 :
548 : // SafeFormat implements redact.SafeFormatter.
549 1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
550 1 : w.Printf("[JOB %d] %s: sstable created %s",
551 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
552 1 : }
553 :
554 : // TableDeleteInfo contains the info for a table deletion event.
555 : type TableDeleteInfo struct {
556 : JobID int
557 : Path string
558 : FileNum base.DiskFileNum
559 : Err error
560 : }
561 :
562 1 : func (i TableDeleteInfo) String() string {
563 1 : return redact.StringWithoutMarkers(i)
564 1 : }
565 :
566 : // SafeFormat implements redact.SafeFormatter.
567 1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
568 1 : if i.Err != nil {
569 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
570 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
571 0 : return
572 0 : }
573 1 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
574 : }
575 :
576 : // TableIngestInfo contains the info for a table ingestion event.
577 : type TableIngestInfo struct {
578 : // JobID is the ID of the job the caused the table to be ingested.
579 : JobID int
580 : Tables []struct {
581 : TableInfo
582 : Level int
583 : }
584 : // GlobalSeqNum is the sequence number that was assigned to all entries in
585 : // the ingested table.
586 : GlobalSeqNum base.SeqNum
587 : // flushable indicates whether the ingested sstable was treated as a
588 : // flushable.
589 : flushable bool
590 : Err error
591 : // WaitFlushDuration is the time spent waiting for memtable flushes to
592 : // complete, given that an overlap between ingesting sstables and memtables
593 : // exists.
594 : WaitFlushDuration time.Duration
595 : // ManifestUpdateDuration is the time spent updating the manifest.
596 : ManifestUpdateDuration time.Duration
597 : // BlockReadDuration is the total time spent reading blocks for the ingested
598 : // sstable.
599 : BlockReadDuration time.Duration
600 : // BlockReadBytes is the total number of bytes from blocks read for the
601 : // ingested sstable. This does not include bytes read from the block cache.
602 : BlockReadBytes uint64
603 : }
604 :
605 1 : func (i TableIngestInfo) String() string {
606 1 : return redact.StringWithoutMarkers(i)
607 1 : }
608 :
609 : // SafeFormat implements redact.SafeFormatter.
610 1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
611 1 : if i.Err != nil {
612 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
613 0 : return
614 0 : }
615 :
616 1 : if i.flushable {
617 1 : w.Printf("[JOB %d] ingested as flushable, memtable flushes took %.1fs:", redact.Safe(i.JobID),
618 1 : redact.Safe(i.WaitFlushDuration.Seconds()))
619 1 : } else {
620 1 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
621 1 : }
622 :
623 1 : for j := range i.Tables {
624 1 : t := &i.Tables[j]
625 1 : if j > 0 {
626 1 : w.Printf(",")
627 1 : }
628 1 : levelStr := ""
629 1 : if !i.flushable {
630 1 : levelStr = fmt.Sprintf("L%d:", t.Level)
631 1 : }
632 1 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
633 1 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
634 : }
635 1 : w.Printf("; manifest update took %.1fs; block reads took %.1fs with %s block bytes read",
636 1 : redact.Safe(i.ManifestUpdateDuration.Seconds()), redact.Safe(i.BlockReadDuration.Seconds()),
637 1 : redact.Safe(humanize.Bytes.Uint64(i.BlockReadBytes)))
638 : }
639 :
640 : // TableStatsInfo contains the info for a table stats loaded event.
641 : type TableStatsInfo struct {
642 : // JobID is the ID of the job that finished loading the initial tables'
643 : // stats.
644 : JobID int
645 : }
646 :
647 1 : func (i TableStatsInfo) String() string {
648 1 : return redact.StringWithoutMarkers(i)
649 1 : }
650 :
651 : // SafeFormat implements redact.SafeFormatter.
652 1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
653 1 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
654 1 : }
655 :
656 : // TableValidatedInfo contains information on the result of a validation run
657 : // on an sstable.
658 : type TableValidatedInfo struct {
659 : JobID int
660 : Meta *manifest.TableMetadata
661 : }
662 :
663 1 : func (i TableValidatedInfo) String() string {
664 1 : return redact.StringWithoutMarkers(i)
665 1 : }
666 :
667 : // SafeFormat implements redact.SafeFormatter.
668 1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
669 1 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
670 1 : }
671 :
672 : // WALCreateInfo contains info about a WAL creation event.
673 : type WALCreateInfo struct {
674 : // JobID is the ID of the job the caused the WAL to be created.
675 : JobID int
676 : Path string
677 : // The file number of the new WAL.
678 : FileNum base.DiskFileNum
679 : // The file number of a previous WAL which was recycled to create this
680 : // one. Zero if recycling did not take place.
681 : RecycledFileNum base.DiskFileNum
682 : Err error
683 : }
684 :
685 1 : func (i WALCreateInfo) String() string {
686 1 : return redact.StringWithoutMarkers(i)
687 1 : }
688 :
689 : // SafeFormat implements redact.SafeFormatter.
690 1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
691 1 : if i.Err != nil {
692 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
693 0 : return
694 0 : }
695 :
696 1 : if i.RecycledFileNum == 0 {
697 1 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
698 1 : return
699 1 : }
700 :
701 1 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
702 1 : redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
703 : }
704 :
705 : // WALDeleteInfo contains the info for a WAL deletion event.
706 : //
707 : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
708 : // is insufficient to infer whether primary or secondary.
709 : type WALDeleteInfo struct {
710 : // JobID is the ID of the job the caused the WAL to be deleted.
711 : JobID int
712 : Path string
713 : FileNum base.DiskFileNum
714 : Err error
715 : }
716 :
717 1 : func (i WALDeleteInfo) String() string {
718 1 : return redact.StringWithoutMarkers(i)
719 1 : }
720 :
721 : // SafeFormat implements redact.SafeFormatter.
722 1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
723 1 : if i.Err != nil {
724 1 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
725 1 : return
726 1 : }
727 1 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
728 : }
729 :
730 : // WriteStallBeginInfo contains the info for a write stall begin event.
731 : type WriteStallBeginInfo struct {
732 : Reason string
733 : }
734 :
735 1 : func (i WriteStallBeginInfo) String() string {
736 1 : return redact.StringWithoutMarkers(i)
737 1 : }
738 :
739 : // SafeFormat implements redact.SafeFormatter.
740 1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
741 1 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
742 1 : }
743 :
744 : // LowDiskSpaceInfo contains the information for a LowDiskSpace
745 : // event.
746 : type LowDiskSpaceInfo struct {
747 : // AvailBytes is the disk space available to the current process in bytes.
748 : AvailBytes uint64
749 : // TotalBytes is the total disk space in bytes.
750 : TotalBytes uint64
751 : // PercentThreshold is one of a set of fixed percentages in the
752 : // lowDiskSpaceThresholds below. This event was issued because the disk
753 : // space went below this threshold.
754 : PercentThreshold int
755 : }
756 :
757 0 : func (i LowDiskSpaceInfo) String() string {
758 0 : return redact.StringWithoutMarkers(i)
759 0 : }
760 :
761 : // SafeFormat implements redact.SafeFormatter.
762 0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
763 0 : w.Printf(
764 0 : "available disk space under %d%% (%s of %s)",
765 0 : redact.Safe(i.PercentThreshold),
766 0 : redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
767 0 : redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
768 0 : )
769 0 : }
770 :
771 : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
772 : type PossibleAPIMisuseInfo struct {
773 : Kind APIMisuseKind
774 :
775 : // UserKey is set for the following kinds:
776 : // - IneffectualSingleDelete,
777 : // - NondeterministicSingleDelete,
778 : // - MissizedDelete,
779 : // - InvalidValue.
780 : UserKey []byte
781 :
782 : // ExtraInfo is set for the following kinds:
783 : // - MissizedDelete: contains "elidedSize=<size>,expectedSize=<size>"
784 : // - InvalidValue: contains "callback=<callbackName>,value=<value>,err=<err>"
785 : ExtraInfo redact.RedactableString
786 : }
787 :
788 1 : func (i PossibleAPIMisuseInfo) String() string {
789 1 : return redact.StringWithoutMarkers(i)
790 1 : }
791 :
792 : // SafeFormat implements redact.SafeFormatter.
793 1 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
794 1 : switch i.Kind {
795 1 : case IneffectualSingleDelete, NondeterministicSingleDelete:
796 1 : w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
797 1 : case MissizedDelete:
798 1 : w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
799 0 : case InvalidValue:
800 0 : w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
801 0 : default:
802 0 : if invariants.Enabled {
803 0 : panic("invalid API misuse event")
804 : }
805 0 : w.Printf("invalid API misuse event")
806 : }
807 : }
808 :
809 : // APIMisuseKind identifies the type of API misuse represented by a
810 : // PossibleAPIMisuse event.
811 : type APIMisuseKind int8
812 :
813 : const (
814 : // IneffectualSingleDelete is emitted in compactions/flushes if any
815 : // single delete is being elided without deleting a point set/merge.
816 : //
817 : // This event can sometimes be a false positive because of delete-only
818 : // compactions which can cause a recent RANGEDEL to peek below an older
819 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
820 : //
821 : // Example:
822 : // RANGEDEL [a, c)#10 in L0
823 : // SINGLEDEL b#5 in L1
824 : // SET b#3 in L6
825 : //
826 : // If the L6 file containing the SET is narrow and the L1 file containing
827 : // the SINGLEDEL is wide, a delete-only compaction can remove the file in
828 : // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
829 : // compacted down, it will not find any SET to delete, resulting in the
830 : // ineffectual callback.
831 : IneffectualSingleDelete APIMisuseKind = iota
832 :
833 : // NondeterministicSingleDelete is emitted in compactions/flushes if any
834 : // single delete has consumed a Set/Merge, and there is another immediately
835 : // older Set/SetWithDelete/Merge. The user of Pebble has violated the
836 : // invariant under which SingleDelete can be used correctly.
837 : //
838 : // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
839 : // ways some of these keys can first meet in a compaction.
840 : //
841 : // - All 3 keys in the same compaction: this callback will detect the
842 : // violation.
843 : //
844 : // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
845 : // disappear. The violation will not be detected, and the DB will have
846 : // Set#1 which is likely incorrect (from the user's perspective).
847 : //
848 : // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
849 : // which will later be consumed by SingleDelete#3. The violation will
850 : // not be detected and the DB will be correct.
851 : //
852 : // This event can sometimes be a false positive because of delete-only
853 : // compactions which can cause a recent RANGEDEL to peek below an older
854 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
855 : //
856 : // Example:
857 : // RANGEDEL [a, z)#60 in L0
858 : // SINGLEDEL g#50 in L1
859 : // SET g#40 in L2
860 : // RANGEDEL [g,h)#30 in L3
861 : // SET g#20 in L6
862 : //
863 : // In this example, the two SETs represent the same user write, and the
864 : // RANGEDELs are caused by the CockroachDB range being dropped. That is,
865 : // the user wrote to g once, range was dropped, then added back, which
866 : // caused the SET again, then at some point g was validly deleted using a
867 : // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
868 : // get fragmented due to compactions it has been part of. Say this L3 file
869 : // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
870 : // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
871 : // using a delete-only compaction, resulting in an LSM with state:
872 : //
873 : // RANGEDEL [a, z)#60 in L0
874 : // SINGLEDEL g#50 in L1
875 : // SET g#40 in L2
876 : // SET g#20 in L6
877 : //
878 : // A multi-level compaction involving L1, L2, L6 will cause the invariant
879 : // violation callback. This example doesn't need multi-level compactions:
880 : // say there was a Pebble snapshot at g#21 preventing g#20 from being
881 : // dropped when it meets g#40 in a compaction. That snapshot will not save
882 : // RANGEDEL [g,h)#30, so we can have:
883 : //
884 : // SINGLEDEL g#50 in L1
885 : // SET g#40, SET g#20 in L6
886 : //
887 : // And say the snapshot is removed and then the L1 and L6 compaction
888 : // happens, resulting in the invariant violation callback.
889 : NondeterministicSingleDelete
890 :
891 : // MissizedDelete is emitted when a DELSIZED tombstone is found that did
892 : // not accurately record the size of the value it deleted. This can lead to
893 : // incorrect behavior in compactions.
894 : MissizedDelete
895 :
896 : // InvalidValue is emitted when a user-implemented callback (such as
897 : // ShortAttributeExtractor) returns an error for a committed value. This
898 : // suggests that either the callback is not implemented for all possible
899 : // values or a malformed value was committed to the DB.
900 : InvalidValue
901 : )
902 :
903 1 : func (k APIMisuseKind) String() string {
904 1 : switch k {
905 1 : case IneffectualSingleDelete:
906 1 : return "ineffectual SINGLEDEL"
907 0 : case NondeterministicSingleDelete:
908 0 : return "nondeterministic SINGLEDEL"
909 1 : case MissizedDelete:
910 1 : return "missized DELSIZED"
911 0 : case InvalidValue:
912 0 : return "invalid value"
913 0 : default:
914 0 : return "unknown"
915 : }
916 : }
917 :
918 : // EventListener contains a set of functions that will be invoked when various
919 : // significant DB events occur. Note that the functions should not run for an
920 : // excessive amount of time as they are invoked synchronously by the DB and may
921 : // block continued DB work. For a similar reason it is advisable to not perform
922 : // any synchronous calls back into the DB.
923 : type EventListener struct {
924 : // BackgroundError is invoked whenever an error occurs during a background
925 : // operation such as flush or compaction.
926 : BackgroundError func(error)
927 :
928 : // BlobFileCreated is invoked after a blob file has been created.
929 : BlobFileCreated func(BlobFileCreateInfo)
930 :
931 : // BlobFileDeleted is invoked after a blob file has been deleted.
932 : BlobFileDeleted func(BlobFileDeleteInfo)
933 :
934 : // BlobFileRewriteBegin is invoked when a blob file rewrite compaction begins.
935 : BlobFileRewriteBegin func(BlobFileRewriteInfo)
936 :
937 : // BlobFileRewriteEnd is invoked when a blob file rewrite compaction ends.
938 : BlobFileRewriteEnd func(BlobFileRewriteInfo)
939 :
940 : // DataCorruption is invoked when an on-disk corruption is detected. It should
941 : // not block, as it is called synchronously in read paths.
942 : DataCorruption func(DataCorruptionInfo)
943 :
944 : // CompactionBegin is invoked after the inputs to a compaction have been
945 : // determined, but before the compaction has produced any output.
946 : CompactionBegin func(CompactionInfo)
947 :
948 : // CompactionEnd is invoked after a compaction has completed and the result
949 : // has been installed.
950 : CompactionEnd func(CompactionInfo)
951 :
952 : // DiskSlow is invoked after a disk write operation on a file created with a
953 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
954 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
955 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
956 : // MUST return without doing any IO, or blocking on anything (like a mutex)
957 : // that is waiting on IO. This is imperative in order to reliably monitor for
958 : // slowness, since if this goroutine gets stuck, the monitoring will stop
959 : // working.
960 : DiskSlow func(DiskSlowInfo)
961 :
962 : // FlushBegin is invoked after the inputs to a flush have been determined,
963 : // but before the flush has produced any output.
964 : FlushBegin func(FlushInfo)
965 :
966 : // FlushEnd is invoked after a flush has complated and the result has been
967 : // installed.
968 : FlushEnd func(FlushInfo)
969 :
970 : // DownloadBegin is invoked when a db.Download operation starts or restarts
971 : // (restarts are caused by new external tables being ingested during the
972 : // operation).
973 : DownloadBegin func(DownloadInfo)
974 :
975 : // DownloadEnd is invoked when a db.Download operation completes.
976 : DownloadEnd func(DownloadInfo)
977 :
978 : // FormatUpgrade is invoked after the database's FormatMajorVersion
979 : // is upgraded.
980 : FormatUpgrade func(FormatMajorVersion)
981 :
982 : // ManifestCreated is invoked after a manifest has been created.
983 : ManifestCreated func(ManifestCreateInfo)
984 :
985 : // ManifestDeleted is invoked after a manifest has been deleted.
986 : ManifestDeleted func(ManifestDeleteInfo)
987 :
988 : // TableCreated is invoked when a table has been created.
989 : TableCreated func(TableCreateInfo)
990 :
991 : // TableDeleted is invoked after a table has been deleted.
992 : TableDeleted func(TableDeleteInfo)
993 :
994 : // TableIngested is invoked after an externally created table has been
995 : // ingested via a call to DB.Ingest().
996 : TableIngested func(TableIngestInfo)
997 :
998 : // TableStatsLoaded is invoked at most once, when the table stats
999 : // collector has loaded statistics for all tables that existed at Open.
1000 : TableStatsLoaded func(TableStatsInfo)
1001 :
1002 : // TableValidated is invoked after validation runs on an sstable.
1003 : TableValidated func(TableValidatedInfo)
1004 :
1005 : // WALCreated is invoked after a WAL has been created.
1006 : WALCreated func(WALCreateInfo)
1007 :
1008 : // WALDeleted is invoked after a WAL has been deleted.
1009 : WALDeleted func(WALDeleteInfo)
1010 :
1011 : // WriteStallBegin is invoked when writes are intentionally delayed.
1012 : WriteStallBegin func(WriteStallBeginInfo)
1013 :
1014 : // WriteStallEnd is invoked when delayed writes are released.
1015 : WriteStallEnd func()
1016 :
1017 : // LowDiskSpace is invoked periodically when the disk space is running
1018 : // low.
1019 : LowDiskSpace func(LowDiskSpaceInfo)
1020 :
1021 : // PossibleAPIMisuse is invoked when a possible API misuse is detected.
1022 : PossibleAPIMisuse func(PossibleAPIMisuseInfo)
1023 : }
1024 :
1025 : // EnsureDefaults ensures that background error events are logged to the
1026 : // specified logger if a handler for those events hasn't been otherwise
1027 : // specified. Ensure all handlers are non-nil so that we don't have to check
1028 : // for nil-ness before invoking.
1029 1 : func (l *EventListener) EnsureDefaults(logger Logger) {
1030 1 : if l.BackgroundError == nil {
1031 1 : if logger != nil {
1032 1 : l.BackgroundError = func(err error) {
1033 1 : logger.Errorf("background error: %s", err)
1034 1 : }
1035 1 : } else {
1036 1 : l.BackgroundError = func(error) {}
1037 : }
1038 : }
1039 1 : if l.BlobFileCreated == nil {
1040 1 : l.BlobFileCreated = func(info BlobFileCreateInfo) {}
1041 : }
1042 1 : if l.BlobFileDeleted == nil {
1043 1 : l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
1044 : }
1045 1 : if l.BlobFileRewriteBegin == nil {
1046 1 : l.BlobFileRewriteBegin = func(info BlobFileRewriteInfo) {}
1047 : }
1048 1 : if l.BlobFileRewriteEnd == nil {
1049 1 : l.BlobFileRewriteEnd = func(info BlobFileRewriteInfo) {}
1050 : }
1051 1 : if l.DataCorruption == nil {
1052 1 : if logger != nil {
1053 1 : l.DataCorruption = func(info DataCorruptionInfo) {
1054 1 : logger.Fatalf("%s", info)
1055 1 : }
1056 1 : } else {
1057 1 : l.DataCorruption = func(info DataCorruptionInfo) {}
1058 : }
1059 : }
1060 1 : if l.CompactionBegin == nil {
1061 1 : l.CompactionBegin = func(info CompactionInfo) {}
1062 : }
1063 1 : if l.CompactionEnd == nil {
1064 1 : l.CompactionEnd = func(info CompactionInfo) {}
1065 : }
1066 1 : if l.DiskSlow == nil {
1067 1 : l.DiskSlow = func(info DiskSlowInfo) {}
1068 : }
1069 1 : if l.FlushBegin == nil {
1070 1 : l.FlushBegin = func(info FlushInfo) {}
1071 : }
1072 1 : if l.FlushEnd == nil {
1073 1 : l.FlushEnd = func(info FlushInfo) {}
1074 : }
1075 1 : if l.DownloadBegin == nil {
1076 1 : l.DownloadBegin = func(info DownloadInfo) {}
1077 : }
1078 1 : if l.DownloadEnd == nil {
1079 1 : l.DownloadEnd = func(info DownloadInfo) {}
1080 : }
1081 1 : if l.FormatUpgrade == nil {
1082 1 : l.FormatUpgrade = func(v FormatMajorVersion) {}
1083 : }
1084 1 : if l.ManifestCreated == nil {
1085 1 : l.ManifestCreated = func(info ManifestCreateInfo) {}
1086 : }
1087 1 : if l.ManifestDeleted == nil {
1088 1 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
1089 : }
1090 1 : if l.TableCreated == nil {
1091 1 : l.TableCreated = func(info TableCreateInfo) {}
1092 : }
1093 1 : if l.TableDeleted == nil {
1094 1 : l.TableDeleted = func(info TableDeleteInfo) {}
1095 : }
1096 1 : if l.TableIngested == nil {
1097 1 : l.TableIngested = func(info TableIngestInfo) {}
1098 : }
1099 1 : if l.TableStatsLoaded == nil {
1100 1 : l.TableStatsLoaded = func(info TableStatsInfo) {}
1101 : }
1102 1 : if l.TableValidated == nil {
1103 1 : l.TableValidated = func(validated TableValidatedInfo) {}
1104 : }
1105 1 : if l.WALCreated == nil {
1106 1 : l.WALCreated = func(info WALCreateInfo) {}
1107 : }
1108 1 : if l.WALDeleted == nil {
1109 1 : l.WALDeleted = func(info WALDeleteInfo) {}
1110 : }
1111 1 : if l.WriteStallBegin == nil {
1112 1 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
1113 : }
1114 1 : if l.WriteStallEnd == nil {
1115 1 : l.WriteStallEnd = func() {}
1116 : }
1117 1 : if l.LowDiskSpace == nil {
1118 1 : l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
1119 : }
1120 1 : if l.PossibleAPIMisuse == nil {
1121 1 : l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
1122 : }
1123 : }
1124 :
1125 : // MakeLoggingEventListener creates an EventListener that logs all events to the
1126 : // specified logger.
1127 1 : func MakeLoggingEventListener(logger Logger) EventListener {
1128 1 : if logger == nil {
1129 1 : logger = DefaultLogger
1130 1 : }
1131 :
1132 1 : return EventListener{
1133 1 : BackgroundError: func(err error) {
1134 1 : logger.Errorf("background error: %s", err)
1135 1 : },
1136 1 : BlobFileCreated: func(info BlobFileCreateInfo) {
1137 1 : logger.Infof("%s", info)
1138 1 : },
1139 1 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
1140 1 : logger.Infof("%s", info)
1141 1 : },
1142 1 : BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
1143 1 : logger.Infof("%s", info)
1144 1 : },
1145 1 : BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
1146 1 : logger.Infof("%s", info)
1147 1 : },
1148 1 : DataCorruption: func(info DataCorruptionInfo) {
1149 1 : logger.Errorf("%s", info)
1150 1 : },
1151 1 : CompactionBegin: func(info CompactionInfo) {
1152 1 : logger.Infof("%s", info)
1153 1 : },
1154 1 : CompactionEnd: func(info CompactionInfo) {
1155 1 : logger.Infof("%s", info)
1156 1 : },
1157 0 : DiskSlow: func(info DiskSlowInfo) {
1158 0 : logger.Infof("%s", info)
1159 0 : },
1160 1 : FlushBegin: func(info FlushInfo) {
1161 1 : logger.Infof("%s", info)
1162 1 : },
1163 1 : FlushEnd: func(info FlushInfo) {
1164 1 : logger.Infof("%s", info)
1165 1 : },
1166 1 : DownloadBegin: func(info DownloadInfo) {
1167 1 : logger.Infof("%s", info)
1168 1 : },
1169 1 : DownloadEnd: func(info DownloadInfo) {
1170 1 : logger.Infof("%s", info)
1171 1 : },
1172 1 : FormatUpgrade: func(v FormatMajorVersion) {
1173 1 : logger.Infof("upgraded to format version: %s", v)
1174 1 : },
1175 1 : ManifestCreated: func(info ManifestCreateInfo) {
1176 1 : logger.Infof("%s", info)
1177 1 : },
1178 1 : ManifestDeleted: func(info ManifestDeleteInfo) {
1179 1 : logger.Infof("%s", info)
1180 1 : },
1181 1 : TableCreated: func(info TableCreateInfo) {
1182 1 : logger.Infof("%s", info)
1183 1 : },
1184 1 : TableDeleted: func(info TableDeleteInfo) {
1185 1 : logger.Infof("%s", info)
1186 1 : },
1187 1 : TableIngested: func(info TableIngestInfo) {
1188 1 : logger.Infof("%s", info)
1189 1 : },
1190 1 : TableStatsLoaded: func(info TableStatsInfo) {
1191 1 : logger.Infof("%s", info)
1192 1 : },
1193 1 : TableValidated: func(info TableValidatedInfo) {
1194 1 : logger.Infof("%s", info)
1195 1 : },
1196 1 : WALCreated: func(info WALCreateInfo) {
1197 1 : logger.Infof("%s", info)
1198 1 : },
1199 1 : WALDeleted: func(info WALDeleteInfo) {
1200 1 : logger.Infof("%s", info)
1201 1 : },
1202 1 : WriteStallBegin: func(info WriteStallBeginInfo) {
1203 1 : logger.Infof("%s", info)
1204 1 : },
1205 1 : WriteStallEnd: func() {
1206 1 : logger.Infof("write stall ending")
1207 1 : },
1208 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1209 0 : logger.Infof("%s", info)
1210 0 : },
1211 1 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1212 1 : logger.Infof("%s", info)
1213 1 : },
1214 : }
1215 : }
1216 :
1217 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
1218 1 : func TeeEventListener(a, b EventListener) EventListener {
1219 1 : a.EnsureDefaults(nil)
1220 1 : b.EnsureDefaults(nil)
1221 1 : return EventListener{
1222 1 : BackgroundError: func(err error) {
1223 1 : a.BackgroundError(err)
1224 1 : b.BackgroundError(err)
1225 1 : },
1226 0 : BlobFileCreated: func(info BlobFileCreateInfo) {
1227 0 : a.BlobFileCreated(info)
1228 0 : b.BlobFileCreated(info)
1229 0 : },
1230 0 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
1231 0 : a.BlobFileDeleted(info)
1232 0 : b.BlobFileDeleted(info)
1233 0 : },
1234 0 : BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
1235 0 : a.BlobFileRewriteBegin(info)
1236 0 : b.BlobFileRewriteBegin(info)
1237 0 : },
1238 0 : BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
1239 0 : a.BlobFileRewriteEnd(info)
1240 0 : b.BlobFileRewriteEnd(info)
1241 0 : },
1242 1 : DataCorruption: func(info DataCorruptionInfo) {
1243 1 : a.DataCorruption(info)
1244 1 : b.DataCorruption(info)
1245 1 : },
1246 1 : CompactionBegin: func(info CompactionInfo) {
1247 1 : a.CompactionBegin(info)
1248 1 : b.CompactionBegin(info)
1249 1 : },
1250 1 : CompactionEnd: func(info CompactionInfo) {
1251 1 : a.CompactionEnd(info)
1252 1 : b.CompactionEnd(info)
1253 1 : },
1254 0 : DiskSlow: func(info DiskSlowInfo) {
1255 0 : a.DiskSlow(info)
1256 0 : b.DiskSlow(info)
1257 0 : },
1258 1 : FlushBegin: func(info FlushInfo) {
1259 1 : a.FlushBegin(info)
1260 1 : b.FlushBegin(info)
1261 1 : },
1262 1 : FlushEnd: func(info FlushInfo) {
1263 1 : a.FlushEnd(info)
1264 1 : b.FlushEnd(info)
1265 1 : },
1266 0 : DownloadBegin: func(info DownloadInfo) {
1267 0 : a.DownloadBegin(info)
1268 0 : b.DownloadBegin(info)
1269 0 : },
1270 0 : DownloadEnd: func(info DownloadInfo) {
1271 0 : a.DownloadEnd(info)
1272 0 : b.DownloadEnd(info)
1273 0 : },
1274 1 : FormatUpgrade: func(v FormatMajorVersion) {
1275 1 : a.FormatUpgrade(v)
1276 1 : b.FormatUpgrade(v)
1277 1 : },
1278 1 : ManifestCreated: func(info ManifestCreateInfo) {
1279 1 : a.ManifestCreated(info)
1280 1 : b.ManifestCreated(info)
1281 1 : },
1282 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
1283 0 : a.ManifestDeleted(info)
1284 0 : b.ManifestDeleted(info)
1285 0 : },
1286 1 : TableCreated: func(info TableCreateInfo) {
1287 1 : a.TableCreated(info)
1288 1 : b.TableCreated(info)
1289 1 : },
1290 1 : TableDeleted: func(info TableDeleteInfo) {
1291 1 : a.TableDeleted(info)
1292 1 : b.TableDeleted(info)
1293 1 : },
1294 1 : TableIngested: func(info TableIngestInfo) {
1295 1 : a.TableIngested(info)
1296 1 : b.TableIngested(info)
1297 1 : },
1298 1 : TableStatsLoaded: func(info TableStatsInfo) {
1299 1 : a.TableStatsLoaded(info)
1300 1 : b.TableStatsLoaded(info)
1301 1 : },
1302 0 : TableValidated: func(info TableValidatedInfo) {
1303 0 : a.TableValidated(info)
1304 0 : b.TableValidated(info)
1305 0 : },
1306 1 : WALCreated: func(info WALCreateInfo) {
1307 1 : a.WALCreated(info)
1308 1 : b.WALCreated(info)
1309 1 : },
1310 1 : WALDeleted: func(info WALDeleteInfo) {
1311 1 : a.WALDeleted(info)
1312 1 : b.WALDeleted(info)
1313 1 : },
1314 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
1315 0 : a.WriteStallBegin(info)
1316 0 : b.WriteStallBegin(info)
1317 0 : },
1318 0 : WriteStallEnd: func() {
1319 0 : a.WriteStallEnd()
1320 0 : b.WriteStallEnd()
1321 0 : },
1322 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1323 0 : a.LowDiskSpace(info)
1324 0 : b.LowDiskSpace(info)
1325 0 : },
1326 0 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1327 0 : a.PossibleAPIMisuse(info)
1328 0 : b.PossibleAPIMisuse(info)
1329 0 : },
1330 : }
1331 : }
1332 :
1333 : // lowDiskSpaceReporter contains the logic to report low disk space events.
1334 : // Report is called whenever we get the disk usage statistics.
1335 : //
1336 : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
1337 : // whenever we reach a new threshold. We periodically repost the event every 30
1338 : // minutes until we are above all thresholds.
1339 : type lowDiskSpaceReporter struct {
1340 : mu struct {
1341 : sync.Mutex
1342 : lastNoticeThreshold int
1343 : lastNoticeTime crtime.Mono
1344 : }
1345 : }
1346 :
1347 : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
1348 :
1349 : const lowDiskSpaceFrequency = 30 * time.Minute
1350 :
1351 1 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
1352 1 : threshold, ok := r.findThreshold(availBytes, totalBytes)
1353 1 : if !ok {
1354 1 : // Normal path.
1355 1 : return
1356 1 : }
1357 1 : if r.shouldReport(threshold, crtime.NowMono()) {
1358 1 : el.LowDiskSpace(LowDiskSpaceInfo{
1359 1 : AvailBytes: availBytes,
1360 1 : TotalBytes: totalBytes,
1361 1 : PercentThreshold: threshold,
1362 1 : })
1363 1 : }
1364 : }
1365 :
1366 : // shouldReport returns true if we should report an event. Updates
1367 : // lastNoticeTime/lastNoticeThreshold appropriately.
1368 1 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
1369 1 : r.mu.Lock()
1370 1 : defer r.mu.Unlock()
1371 1 : if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
1372 1 : now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
1373 1 : r.mu.lastNoticeThreshold = threshold
1374 1 : r.mu.lastNoticeTime = now
1375 1 : return true
1376 1 : }
1377 1 : return false
1378 : }
1379 :
1380 : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
1381 : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
1382 : // there is more free space than the highest threshold).
1383 : func (r *lowDiskSpaceReporter) findThreshold(
1384 : availBytes, totalBytes uint64,
1385 1 : ) (threshold int, ok bool) {
1386 1 : // Note: in the normal path, we exit the loop during the first iteration.
1387 1 : for i, t := range lowDiskSpaceThresholds {
1388 1 : if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
1389 1 : break
1390 : }
1391 1 : threshold = t
1392 1 : ok = true
1393 : }
1394 1 : return threshold, ok
1395 : }
1396 :
1397 : // reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
1398 : // to the event listener and also adds a DataCorruptionInfo payload to the error.
1399 1 : func (d *DB) reportCorruption(meta base.ObjectInfo, err error) error {
1400 1 : if invariants.Enabled && !IsCorruptionError(err) {
1401 0 : panic("not a corruption error")
1402 : }
1403 1 : fileType, fileNum := meta.FileInfo()
1404 1 :
1405 1 : objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
1406 1 : if lookupErr != nil {
1407 1 : // If the object is not known to the provider, it must be a local object
1408 1 : // that was missing when we opened the store. Remote objects have their
1409 1 : // metadata in a catalog, so even if the backing object is deleted, the
1410 1 : // DiskFileNum would still be known.
1411 1 : objMeta = objstorage.ObjectMetadata{DiskFileNum: fileNum, FileType: fileType}
1412 1 : }
1413 1 : path := d.objProvider.Path(objMeta)
1414 1 : if objMeta.IsRemote() {
1415 1 : // Remote path (which include the locator and full path) might not always be
1416 1 : // safe.
1417 1 : err = errors.WithHintf(err, "path: %s", path)
1418 1 : } else {
1419 1 : // Local paths are safe: they start with the store directory and the
1420 1 : // filename is generated by Pebble.
1421 1 : err = errors.WithHintf(err, "path: %s", redact.Safe(path))
1422 1 : }
1423 1 : info := DataCorruptionInfo{
1424 1 : Path: path,
1425 1 : IsRemote: objMeta.IsRemote(),
1426 1 : Locator: objMeta.Remote.Locator,
1427 1 : Bounds: meta.UserKeyBounds(),
1428 1 : Details: err,
1429 1 : }
1430 1 : d.opts.EventListener.DataCorruption(info)
1431 1 : // We don't use errors.Join() because that also annotates with this stack
1432 1 : // trace which would not be useful.
1433 1 : return errorsjoin.Join(err, &corruptionDetailError{info: info})
1434 : }
1435 :
1436 : type corruptionDetailError struct {
1437 : info DataCorruptionInfo
1438 : }
1439 :
1440 1 : func (e *corruptionDetailError) Error() string {
1441 1 : return "<corruption detail carrier>"
1442 1 : }
1443 :
1444 : // ExtractDataCorruptionInfo extracts the DataCorruptionInfo details from a
1445 : // corruption error. Returns nil if there is no such detail.
1446 1 : func ExtractDataCorruptionInfo(err error) *DataCorruptionInfo {
1447 1 : var e *corruptionDetailError
1448 1 : if errors.As(err, &e) {
1449 1 : return &e.info
1450 1 : }
1451 0 : return nil
1452 : }
|