Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crtime"
14 : "github.com/cockroachdb/errors"
15 : errorsjoin "github.com/cockroachdb/errors/join"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/humanize"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/objstorage/remote"
22 : "github.com/cockroachdb/pebble/vfs"
23 : "github.com/cockroachdb/redact"
24 : )
25 :
26 : // TableNum is an identifier for a table within a database.
27 : type TableNum = base.TableNum
28 :
29 : // TableInfo exports the manifest.TableInfo type.
30 : type TableInfo = manifest.TableInfo
31 :
32 1 : func tablesTotalSize(tables []TableInfo) uint64 {
33 1 : var size uint64
34 1 : for i := range tables {
35 1 : size += tables[i].Size
36 1 : }
37 1 : return size
38 : }
39 :
40 1 : func formatFileNums(tables []TableInfo) string {
41 1 : var buf strings.Builder
42 1 : for i := range tables {
43 1 : if i > 0 {
44 1 : buf.WriteString(" ")
45 1 : }
46 1 : buf.WriteString(tables[i].FileNum.String())
47 : }
48 1 : return buf.String()
49 : }
50 :
51 : // DataCorruptionInfo contains the information for a DataCorruption event.
52 : type DataCorruptionInfo struct {
53 : // Path of the file that is corrupted. For remote files the path starts with
54 : // "remote://".
55 : Path string
56 : IsRemote bool
57 : // Locator is only set when IsRemote is true (note that an empty Locator is
58 : // valid even then).
59 : Locator remote.Locator
60 : // Bounds indicates the keyspace range that is affected.
61 : Bounds base.UserKeyBounds
62 : // Details of the error. See cockroachdb/error for how to format with or
63 : // without redaction.
64 : Details error
65 : }
66 :
67 0 : func (i DataCorruptionInfo) String() string {
68 0 : return redact.StringWithoutMarkers(i)
69 0 : }
70 :
71 : // SafeFormat implements redact.SafeFormatter.
72 0 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
73 0 : w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
74 0 : if i.IsRemote {
75 0 : w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
76 0 : }
77 0 : w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
78 : }
79 :
80 : // LevelInfo contains info pertaining to a particular level.
81 : type LevelInfo struct {
82 : Level int
83 : Tables []TableInfo
84 : Score float64
85 : }
86 :
87 0 : func (i LevelInfo) String() string {
88 0 : return redact.StringWithoutMarkers(i)
89 0 : }
90 :
91 : // SafeFormat implements redact.SafeFormatter.
92 1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
93 1 : w.Printf("L%d [%s] (%s) Score=%.2f",
94 1 : redact.Safe(i.Level),
95 1 : redact.Safe(formatFileNums(i.Tables)),
96 1 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
97 1 : redact.Safe(i.Score))
98 1 : }
99 :
100 : // BlobFileCreateInfo contains the info for a blob file creation event.
101 : type BlobFileCreateInfo struct {
102 : JobID int
103 : // Reason is the reason for the table creation: "compacting", "flushing", or
104 : // "ingesting".
105 : Reason string
106 : Path string
107 : FileNum base.DiskFileNum
108 : }
109 :
110 1 : func (i BlobFileCreateInfo) String() string {
111 1 : return redact.StringWithoutMarkers(i)
112 1 : }
113 :
114 : // SafeFormat implements redact.SafeFormatter.
115 1 : func (i BlobFileCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
116 1 : w.Printf("[JOB %d] %s: blob file created %s",
117 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
118 1 : }
119 :
120 : // BlobFileDeleteInfo contains the info for a blob file deletion event.
121 : type BlobFileDeleteInfo struct {
122 : JobID int
123 : Path string
124 : FileNum base.DiskFileNum
125 : Err error
126 : }
127 :
128 1 : func (i BlobFileDeleteInfo) String() string {
129 1 : return redact.StringWithoutMarkers(i)
130 1 : }
131 :
132 : // SafeFormat implements redact.SafeFormatter.
133 1 : func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
134 1 : if i.Err != nil {
135 0 : w.Printf("[JOB %d] blob file delete error %s: %s",
136 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
137 0 : return
138 0 : }
139 1 : w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
140 : }
141 :
142 : // BlobFileRewriteInfo contains the info for a blob file rewrite event.
143 : type BlobFileRewriteInfo struct {
144 : // JobID is the ID of the job.
145 : JobID int
146 : // Input contains the input tables for the compaction organized by level.
147 : Input BlobFileInfo
148 : // Output contains the output tables generated by the compaction. The output
149 : // info is empty for the compaction begin event.
150 : Output BlobFileInfo
151 : // Duration is the time spent compacting, including reading and writing
152 : // files.
153 : Duration time.Duration
154 : // TotalDuration is the total wall-time duration of the compaction,
155 : // including applying the compaction to the database. TotalDuration is
156 : // always ≥ Duration.
157 : TotalDuration time.Duration
158 : Done bool
159 : // Err is set only if Done is true. If non-nil, indicates that the compaction
160 : // failed. Note that err can be ErrCancelledCompaction, which can happen
161 : // during normal operation.
162 : Err error
163 : }
164 :
165 1 : func (i BlobFileRewriteInfo) String() string {
166 1 : return redact.StringWithoutMarkers(i)
167 1 : }
168 :
169 : // SafeFormat implements redact.SafeFormatter.
170 1 : func (i BlobFileRewriteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
171 1 : if i.Err != nil {
172 1 : w.Printf("[JOB %d] blob file (%s, %s) rewrite error: %s",
173 1 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum, i.Err)
174 1 : return
175 1 : }
176 :
177 1 : if !i.Done {
178 1 : w.Printf("[JOB %d] rewriting blob file %s (physical file %s)",
179 1 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum)
180 1 : return
181 1 : }
182 1 : w.Printf("[JOB %d] rewrote blob file (%s, %s) (%s) -> (%s, %s) (%s), in %.1fs (%.1fs total)",
183 1 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum,
184 1 : redact.Safe(humanize.Bytes.Uint64(i.Input.Size)),
185 1 : i.Output.BlobFileID, i.Output.DiskFileNum,
186 1 : redact.Safe(humanize.Bytes.Uint64(i.Output.Size)),
187 1 : redact.Safe(i.Duration.Seconds()),
188 1 : redact.Safe(i.TotalDuration.Seconds()))
189 : }
190 :
191 : // BlobFileInfo describes a blob file.
192 : type BlobFileInfo struct {
193 : // BlobFileID is the logical ID of the blob file.
194 : BlobFileID base.BlobFileID
195 : // DiskFileNum is the file number of the blob file on disk.
196 : DiskFileNum base.DiskFileNum
197 : // Size is the physical size of the file in bytes.
198 : Size uint64
199 : // ValueSize is the pre-compressed size of the values in the blob file in
200 : // bytes.
201 : ValueSize uint64
202 : }
203 :
204 : // CompactionInfo contains the info for a compaction event.
205 : type CompactionInfo struct {
206 : // JobID is the ID of the compaction job.
207 : JobID int
208 : // Reason is the reason for the compaction.
209 : Reason string
210 : // Input contains the input tables for the compaction organized by level.
211 : Input []LevelInfo
212 : // Output contains the output tables generated by the compaction. The output
213 : // tables are empty for the compaction begin event.
214 : Output LevelInfo
215 : // Duration is the time spent compacting, including reading and writing
216 : // sstables.
217 : Duration time.Duration
218 : // TotalDuration is the total wall-time duration of the compaction,
219 : // including applying the compaction to the database. TotalDuration is
220 : // always ≥ Duration.
221 : TotalDuration time.Duration
222 : Done bool
223 : // Err is set only if Done is true. If non-nil, indicates that the compaction
224 : // failed. Note that err can be ErrCancelledCompaction, which can happen
225 : // during normal operation.
226 : Err error
227 :
228 : SingleLevelOverlappingRatio float64
229 : MultiLevelOverlappingRatio float64
230 :
231 : // Annotations specifies additional info to appear in a compaction's event log line
232 : Annotations compactionAnnotations
233 : }
234 :
235 : type compactionAnnotations []string
236 :
237 : // SafeFormat implements redact.SafeFormatter.
238 1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
239 1 : if len(ca) == 0 {
240 0 : return
241 0 : }
242 1 : for i := range ca {
243 1 : if i != 0 {
244 1 : w.Print(" ")
245 1 : }
246 1 : w.Printf("%s", redact.SafeString(ca[i]))
247 : }
248 : }
249 :
250 1 : func (i CompactionInfo) String() string {
251 1 : return redact.StringWithoutMarkers(i)
252 1 : }
253 :
254 : // SafeFormat implements redact.SafeFormatter.
255 1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
256 1 : if i.Err != nil {
257 1 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
258 1 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
259 1 : return
260 1 : }
261 :
262 1 : if !i.Done {
263 1 : w.Printf("[JOB %d] compacting(%s) ",
264 1 : redact.Safe(i.JobID),
265 1 : redact.SafeString(i.Reason))
266 1 : if len(i.Annotations) > 0 {
267 1 : w.Printf("%s ", i.Annotations)
268 1 : }
269 1 : w.Printf("%s; ", levelInfos(i.Input))
270 1 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
271 1 : return
272 : }
273 1 : outputSize := tablesTotalSize(i.Output.Tables)
274 1 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
275 1 : if len(i.Annotations) > 0 {
276 1 : w.Printf("%s ", i.Annotations)
277 1 : }
278 1 : w.Print(levelInfos(i.Input))
279 1 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
280 1 : redact.Safe(i.Output.Level),
281 1 : redact.Safe(formatFileNums(i.Output.Tables)),
282 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
283 1 : redact.Safe(i.Duration.Seconds()),
284 1 : redact.Safe(i.TotalDuration.Seconds()),
285 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
286 : }
287 :
288 : type levelInfos []LevelInfo
289 :
290 1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
291 1 : for j, levelInfo := range i {
292 1 : if j > 0 {
293 1 : w.Printf(" + ")
294 1 : }
295 1 : w.Print(levelInfo)
296 : }
297 : }
298 :
299 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
300 : // file.
301 : type DiskSlowInfo = vfs.DiskSlowInfo
302 :
303 : // FlushInfo contains the info for a flush event.
304 : type FlushInfo struct {
305 : // JobID is the ID of the flush job.
306 : JobID int
307 : // Reason is the reason for the flush.
308 : Reason string
309 : // Input contains the count of input memtables that were flushed.
310 : Input int
311 : // InputBytes contains the total in-memory size of the memtable(s) that were
312 : // flushed. This size includes skiplist indexing data structures.
313 : InputBytes uint64
314 : // Output contains the ouptut table generated by the flush. The output info
315 : // is empty for the flush begin event.
316 : Output []TableInfo
317 : // Duration is the time spent flushing. This duration includes writing and
318 : // syncing all of the flushed keys to sstables.
319 : Duration time.Duration
320 : // TotalDuration is the total wall-time duration of the flush, including
321 : // applying the flush to the database. TotalDuration is always ≥ Duration.
322 : TotalDuration time.Duration
323 : // Ingest is set to true if the flush is handling tables that were added to
324 : // the flushable queue via an ingestion operation.
325 : Ingest bool
326 : // IngestLevels are the output levels for each ingested table in the flush.
327 : // This field is only populated when Ingest is true.
328 : IngestLevels []int
329 : Done bool
330 : Err error
331 : }
332 :
333 1 : func (i FlushInfo) String() string {
334 1 : return redact.StringWithoutMarkers(i)
335 1 : }
336 :
337 : // SafeFormat implements redact.SafeFormatter.
338 1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
339 1 : if i.Err != nil {
340 1 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
341 1 : return
342 1 : }
343 :
344 1 : plural := redact.SafeString("s")
345 1 : if i.Input == 1 {
346 1 : plural = ""
347 1 : }
348 1 : if !i.Done {
349 1 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
350 1 : if !i.Ingest {
351 1 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
352 1 : w.SafeString(plural)
353 1 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
354 1 : } else {
355 1 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
356 1 : }
357 1 : return
358 : }
359 :
360 1 : outputSize := tablesTotalSize(i.Output)
361 1 : if !i.Ingest {
362 1 : if invariants.Enabled && len(i.IngestLevels) > 0 {
363 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
364 : }
365 1 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
366 1 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
367 1 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
368 1 : redact.Safe(formatFileNums(i.Output)),
369 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
370 1 : redact.Safe(i.Duration.Seconds()),
371 1 : redact.Safe(i.TotalDuration.Seconds()),
372 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
373 1 : } else {
374 1 : if invariants.Enabled && len(i.IngestLevels) == 0 {
375 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
376 : }
377 1 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
378 1 : redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
379 1 : for j, level := range i.IngestLevels {
380 1 : file := i.Output[j]
381 1 : if j > 0 {
382 1 : w.Printf(" +")
383 1 : }
384 1 : w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
385 : }
386 1 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
387 1 : redact.Safe(i.Duration.Seconds()),
388 1 : redact.Safe(i.TotalDuration.Seconds()),
389 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
390 : }
391 : }
392 :
393 : // DownloadInfo contains the info for a DB.Download() event.
394 : type DownloadInfo struct {
395 : // JobID is the ID of the download job.
396 : JobID int
397 :
398 : Spans []DownloadSpan
399 :
400 : // Duration is the time since the operation was started.
401 : Duration time.Duration
402 : DownloadCompactionsLaunched int
403 :
404 : // RestartCount indicates that the download operation restarted because it
405 : // noticed that new external files were ingested. A DownloadBegin event with
406 : // RestartCount = 0 is the start of the operation; each time we restart it we
407 : // have another DownloadBegin event with RestartCount > 0.
408 : RestartCount int
409 : Done bool
410 : Err error
411 : }
412 :
413 1 : func (i DownloadInfo) String() string {
414 1 : return redact.StringWithoutMarkers(i)
415 1 : }
416 :
417 : // SafeFormat implements redact.SafeFormatter.
418 1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
419 1 : switch {
420 0 : case i.Err != nil:
421 0 : w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
422 :
423 1 : case i.Done:
424 1 : w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
425 1 : redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
426 :
427 1 : default:
428 1 : if i.RestartCount == 0 {
429 1 : w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
430 1 : } else {
431 1 : w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
432 1 : redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
433 1 : redact.Safe(i.DownloadCompactionsLaunched))
434 1 : }
435 : }
436 : }
437 :
438 : // ManifestCreateInfo contains info about a manifest creation event.
439 : type ManifestCreateInfo struct {
440 : // JobID is the ID of the job the caused the manifest to be created.
441 : JobID int
442 : Path string
443 : // The file number of the new Manifest.
444 : FileNum base.DiskFileNum
445 : Err error
446 : }
447 :
448 1 : func (i ManifestCreateInfo) String() string {
449 1 : return redact.StringWithoutMarkers(i)
450 1 : }
451 :
452 : // SafeFormat implements redact.SafeFormatter.
453 1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
454 1 : if i.Err != nil {
455 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
456 0 : return
457 0 : }
458 1 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
459 : }
460 :
461 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
462 : type ManifestDeleteInfo struct {
463 : // JobID is the ID of the job the caused the Manifest to be deleted.
464 : JobID int
465 : Path string
466 : FileNum base.DiskFileNum
467 : Err error
468 : }
469 :
470 1 : func (i ManifestDeleteInfo) String() string {
471 1 : return redact.StringWithoutMarkers(i)
472 1 : }
473 :
474 : // SafeFormat implements redact.SafeFormatter.
475 1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
476 1 : if i.Err != nil {
477 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
478 0 : return
479 0 : }
480 1 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
481 : }
482 :
483 : // TableCreateInfo contains the info for a table creation event.
484 : type TableCreateInfo struct {
485 : JobID int
486 : // Reason is the reason for the table creation: "compacting", "flushing", or
487 : // "ingesting".
488 : Reason string
489 : Path string
490 : FileNum base.DiskFileNum
491 : }
492 :
493 1 : func (i TableCreateInfo) String() string {
494 1 : return redact.StringWithoutMarkers(i)
495 1 : }
496 :
497 : // SafeFormat implements redact.SafeFormatter.
498 1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
499 1 : w.Printf("[JOB %d] %s: sstable created %s",
500 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
501 1 : }
502 :
503 : // TableDeleteInfo contains the info for a table deletion event.
504 : type TableDeleteInfo struct {
505 : JobID int
506 : Path string
507 : FileNum base.DiskFileNum
508 : Err error
509 : }
510 :
511 1 : func (i TableDeleteInfo) String() string {
512 1 : return redact.StringWithoutMarkers(i)
513 1 : }
514 :
515 : // SafeFormat implements redact.SafeFormatter.
516 1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
517 1 : if i.Err != nil {
518 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
519 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
520 0 : return
521 0 : }
522 1 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
523 : }
524 :
525 : // TableIngestInfo contains the info for a table ingestion event.
526 : type TableIngestInfo struct {
527 : // JobID is the ID of the job the caused the table to be ingested.
528 : JobID int
529 : Tables []struct {
530 : TableInfo
531 : Level int
532 : }
533 : // GlobalSeqNum is the sequence number that was assigned to all entries in
534 : // the ingested table.
535 : GlobalSeqNum base.SeqNum
536 : // flushable indicates whether the ingested sstable was treated as a
537 : // flushable.
538 : flushable bool
539 : Err error
540 : // WaitFlushDuration is the time spent waiting for memtable flushes to
541 : // complete, given that an overlap between ingesting sstables and memtables
542 : // exists.
543 : WaitFlushDuration time.Duration
544 : // ManifestUpdateDuration is the time spent updating the manifest.
545 : ManifestUpdateDuration time.Duration
546 : // BlockReadDuration is the total time spent reading blocks for the ingested
547 : // sstable.
548 : BlockReadDuration time.Duration
549 : // BlockReadBytes is the total number of bytes from blocks read for the
550 : // ingested sstable. This does not include bytes read from the block cache.
551 : BlockReadBytes uint64
552 : }
553 :
554 1 : func (i TableIngestInfo) String() string {
555 1 : return redact.StringWithoutMarkers(i)
556 1 : }
557 :
558 : // SafeFormat implements redact.SafeFormatter.
559 1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
560 1 : if i.Err != nil {
561 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
562 0 : return
563 0 : }
564 :
565 1 : if i.flushable {
566 1 : w.Printf("[JOB %d] ingested as flushable, memtable flushes took %.1fs:", redact.Safe(i.JobID),
567 1 : redact.Safe(i.WaitFlushDuration.Seconds()))
568 1 : } else {
569 1 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
570 1 : }
571 :
572 1 : for j := range i.Tables {
573 1 : t := &i.Tables[j]
574 1 : if j > 0 {
575 1 : w.Printf(",")
576 1 : }
577 1 : levelStr := ""
578 1 : if !i.flushable {
579 1 : levelStr = fmt.Sprintf("L%d:", t.Level)
580 1 : }
581 1 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
582 1 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
583 : }
584 1 : w.Printf("; manifest update took %.1fs; block reads took %.1fs with %s block bytes read",
585 1 : redact.Safe(i.ManifestUpdateDuration.Seconds()), redact.Safe(i.BlockReadDuration.Seconds()),
586 1 : redact.Safe(humanize.Bytes.Uint64(i.BlockReadBytes)))
587 : }
588 :
589 : // TableStatsInfo contains the info for a table stats loaded event.
590 : type TableStatsInfo struct {
591 : // JobID is the ID of the job that finished loading the initial tables'
592 : // stats.
593 : JobID int
594 : }
595 :
596 1 : func (i TableStatsInfo) String() string {
597 1 : return redact.StringWithoutMarkers(i)
598 1 : }
599 :
600 : // SafeFormat implements redact.SafeFormatter.
601 1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
602 1 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
603 1 : }
604 :
605 : // TableValidatedInfo contains information on the result of a validation run
606 : // on an sstable.
607 : type TableValidatedInfo struct {
608 : JobID int
609 : Meta *manifest.TableMetadata
610 : }
611 :
612 1 : func (i TableValidatedInfo) String() string {
613 1 : return redact.StringWithoutMarkers(i)
614 1 : }
615 :
616 : // SafeFormat implements redact.SafeFormatter.
617 1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
618 1 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
619 1 : }
620 :
621 : // WALCreateInfo contains info about a WAL creation event.
622 : type WALCreateInfo struct {
623 : // JobID is the ID of the job the caused the WAL to be created.
624 : JobID int
625 : Path string
626 : // The file number of the new WAL.
627 : FileNum base.DiskFileNum
628 : // The file number of a previous WAL which was recycled to create this
629 : // one. Zero if recycling did not take place.
630 : RecycledFileNum base.DiskFileNum
631 : Err error
632 : }
633 :
634 1 : func (i WALCreateInfo) String() string {
635 1 : return redact.StringWithoutMarkers(i)
636 1 : }
637 :
638 : // SafeFormat implements redact.SafeFormatter.
639 1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
640 1 : if i.Err != nil {
641 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
642 0 : return
643 0 : }
644 :
645 1 : if i.RecycledFileNum == 0 {
646 1 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
647 1 : return
648 1 : }
649 :
650 0 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
651 0 : redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
652 : }
653 :
654 : // WALDeleteInfo contains the info for a WAL deletion event.
655 : //
656 : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
657 : // is insufficient to infer whether primary or secondary.
658 : type WALDeleteInfo struct {
659 : // JobID is the ID of the job the caused the WAL to be deleted.
660 : JobID int
661 : Path string
662 : FileNum base.DiskFileNum
663 : Err error
664 : }
665 :
666 1 : func (i WALDeleteInfo) String() string {
667 1 : return redact.StringWithoutMarkers(i)
668 1 : }
669 :
670 : // SafeFormat implements redact.SafeFormatter.
671 1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
672 1 : if i.Err != nil {
673 0 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
674 0 : return
675 0 : }
676 1 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
677 : }
678 :
679 : // WriteStallBeginInfo contains the info for a write stall begin event.
680 : type WriteStallBeginInfo struct {
681 : Reason string
682 : }
683 :
684 1 : func (i WriteStallBeginInfo) String() string {
685 1 : return redact.StringWithoutMarkers(i)
686 1 : }
687 :
688 : // SafeFormat implements redact.SafeFormatter.
689 1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
690 1 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
691 1 : }
692 :
693 : // LowDiskSpaceInfo contains the information for a LowDiskSpace
694 : // event.
695 : type LowDiskSpaceInfo struct {
696 : // AvailBytes is the disk space available to the current process in bytes.
697 : AvailBytes uint64
698 : // TotalBytes is the total disk space in bytes.
699 : TotalBytes uint64
700 : // PercentThreshold is one of a set of fixed percentages in the
701 : // lowDiskSpaceThresholds below. This event was issued because the disk
702 : // space went below this threshold.
703 : PercentThreshold int
704 : }
705 :
706 0 : func (i LowDiskSpaceInfo) String() string {
707 0 : return redact.StringWithoutMarkers(i)
708 0 : }
709 :
710 : // SafeFormat implements redact.SafeFormatter.
711 0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
712 0 : w.Printf(
713 0 : "available disk space under %d%% (%s of %s)",
714 0 : redact.Safe(i.PercentThreshold),
715 0 : redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
716 0 : redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
717 0 : )
718 0 : }
719 :
720 : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
721 : type PossibleAPIMisuseInfo struct {
722 : Kind APIMisuseKind
723 :
724 : // UserKey is set for the following kinds:
725 : // - IneffectualSingleDelete,
726 : // - NondeterministicSingleDelete,
727 : // - MissizedDelete,
728 : // - InvalidValue.
729 : UserKey []byte
730 :
731 : // ExtraInfo is set for the following kinds:
732 : // - MissizedDelete: contains "elidedSize=<size>,expectedSize=<size>"
733 : // - InvalidValue: contains "callback=<callbackName>,value=<value>,err=<err>"
734 : ExtraInfo redact.RedactableString
735 : }
736 :
737 1 : func (i PossibleAPIMisuseInfo) String() string {
738 1 : return redact.StringWithoutMarkers(i)
739 1 : }
740 :
741 : // SafeFormat implements redact.SafeFormatter.
742 1 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
743 1 : switch i.Kind {
744 1 : case IneffectualSingleDelete, NondeterministicSingleDelete:
745 1 : w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
746 1 : case MissizedDelete:
747 1 : w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
748 0 : case InvalidValue:
749 0 : w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
750 0 : default:
751 0 : if invariants.Enabled {
752 0 : panic("invalid API misuse event")
753 : }
754 0 : w.Printf("invalid API misuse event")
755 : }
756 : }
757 :
758 : // APIMisuseKind identifies the type of API misuse represented by a
759 : // PossibleAPIMisuse event.
760 : type APIMisuseKind int8
761 :
762 : const (
763 : // IneffectualSingleDelete is emitted in compactions/flushes if any
764 : // single delete is being elided without deleting a point set/merge.
765 : //
766 : // This event can sometimes be a false positive because of delete-only
767 : // compactions which can cause a recent RANGEDEL to peek below an older
768 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
769 : //
770 : // Example:
771 : // RANGEDEL [a, c)#10 in L0
772 : // SINGLEDEL b#5 in L1
773 : // SET b#3 in L6
774 : //
775 : // If the L6 file containing the SET is narrow and the L1 file containing
776 : // the SINGLEDEL is wide, a delete-only compaction can remove the file in
777 : // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
778 : // compacted down, it will not find any SET to delete, resulting in the
779 : // ineffectual callback.
780 : IneffectualSingleDelete APIMisuseKind = iota
781 :
782 : // NondeterministicSingleDelete is emitted in compactions/flushes if any
783 : // single delete has consumed a Set/Merge, and there is another immediately
784 : // older Set/SetWithDelete/Merge. The user of Pebble has violated the
785 : // invariant under which SingleDelete can be used correctly.
786 : //
787 : // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
788 : // ways some of these keys can first meet in a compaction.
789 : //
790 : // - All 3 keys in the same compaction: this callback will detect the
791 : // violation.
792 : //
793 : // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
794 : // disappear. The violation will not be detected, and the DB will have
795 : // Set#1 which is likely incorrect (from the user's perspective).
796 : //
797 : // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
798 : // which will later be consumed by SingleDelete#3. The violation will
799 : // not be detected and the DB will be correct.
800 : //
801 : // This event can sometimes be a false positive because of delete-only
802 : // compactions which can cause a recent RANGEDEL to peek below an older
803 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
804 : //
805 : // Example:
806 : // RANGEDEL [a, z)#60 in L0
807 : // SINGLEDEL g#50 in L1
808 : // SET g#40 in L2
809 : // RANGEDEL [g,h)#30 in L3
810 : // SET g#20 in L6
811 : //
812 : // In this example, the two SETs represent the same user write, and the
813 : // RANGEDELs are caused by the CockroachDB range being dropped. That is,
814 : // the user wrote to g once, range was dropped, then added back, which
815 : // caused the SET again, then at some point g was validly deleted using a
816 : // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
817 : // get fragmented due to compactions it has been part of. Say this L3 file
818 : // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
819 : // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
820 : // using a delete-only compaction, resulting in an LSM with state:
821 : //
822 : // RANGEDEL [a, z)#60 in L0
823 : // SINGLEDEL g#50 in L1
824 : // SET g#40 in L2
825 : // SET g#20 in L6
826 : //
827 : // A multi-level compaction involving L1, L2, L6 will cause the invariant
828 : // violation callback. This example doesn't need multi-level compactions:
829 : // say there was a Pebble snapshot at g#21 preventing g#20 from being
830 : // dropped when it meets g#40 in a compaction. That snapshot will not save
831 : // RANGEDEL [g,h)#30, so we can have:
832 : //
833 : // SINGLEDEL g#50 in L1
834 : // SET g#40, SET g#20 in L6
835 : //
836 : // And say the snapshot is removed and then the L1 and L6 compaction
837 : // happens, resulting in the invariant violation callback.
838 : NondeterministicSingleDelete
839 :
840 : // MissizedDelete is emitted when a DELSIZED tombstone is found that did
841 : // not accurately record the size of the value it deleted. This can lead to
842 : // incorrect behavior in compactions.
843 : MissizedDelete
844 :
845 : // InvalidValue is emitted when a user-implemented callback (such as
846 : // ShortAttributeExtractor) returns an error for a committed value. This
847 : // suggests that either the callback is not implemented for all possible
848 : // values or a malformed value was committed to the DB.
849 : InvalidValue
850 : )
851 :
852 1 : func (k APIMisuseKind) String() string {
853 1 : switch k {
854 1 : case IneffectualSingleDelete:
855 1 : return "ineffectual SINGLEDEL"
856 0 : case NondeterministicSingleDelete:
857 0 : return "nondeterministic SINGLEDEL"
858 1 : case MissizedDelete:
859 1 : return "missized DELSIZED"
860 0 : case InvalidValue:
861 0 : return "invalid value"
862 0 : default:
863 0 : return "unknown"
864 : }
865 : }
866 :
867 : // EventListener contains a set of functions that will be invoked when various
868 : // significant DB events occur. Note that the functions should not run for an
869 : // excessive amount of time as they are invoked synchronously by the DB and may
870 : // block continued DB work. For a similar reason it is advisable to not perform
871 : // any synchronous calls back into the DB.
872 : type EventListener struct {
873 : // BackgroundError is invoked whenever an error occurs during a background
874 : // operation such as flush or compaction.
875 : BackgroundError func(error)
876 :
877 : // BlobFileCreated is invoked after a blob file has been created.
878 : BlobFileCreated func(BlobFileCreateInfo)
879 :
880 : // BlobFileDeleted is invoked after a blob file has been deleted.
881 : BlobFileDeleted func(BlobFileDeleteInfo)
882 :
883 : // BlobFileRewriteBegin is invoked when a blob file rewrite compaction begins.
884 : BlobFileRewriteBegin func(BlobFileRewriteInfo)
885 :
886 : // BlobFileRewriteEnd is invoked when a blob file rewrite compaction ends.
887 : BlobFileRewriteEnd func(BlobFileRewriteInfo)
888 :
889 : // DataCorruption is invoked when an on-disk corruption is detected. It should
890 : // not block, as it is called synchronously in read paths.
891 : DataCorruption func(DataCorruptionInfo)
892 :
893 : // CompactionBegin is invoked after the inputs to a compaction have been
894 : // determined, but before the compaction has produced any output.
895 : CompactionBegin func(CompactionInfo)
896 :
897 : // CompactionEnd is invoked after a compaction has completed and the result
898 : // has been installed.
899 : CompactionEnd func(CompactionInfo)
900 :
901 : // DiskSlow is invoked after a disk write operation on a file created with a
902 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
903 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
904 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
905 : // MUST return without doing any IO, or blocking on anything (like a mutex)
906 : // that is waiting on IO. This is imperative in order to reliably monitor for
907 : // slowness, since if this goroutine gets stuck, the monitoring will stop
908 : // working.
909 : DiskSlow func(DiskSlowInfo)
910 :
911 : // FlushBegin is invoked after the inputs to a flush have been determined,
912 : // but before the flush has produced any output.
913 : FlushBegin func(FlushInfo)
914 :
915 : // FlushEnd is invoked after a flush has complated and the result has been
916 : // installed.
917 : FlushEnd func(FlushInfo)
918 :
919 : // DownloadBegin is invoked when a db.Download operation starts or restarts
920 : // (restarts are caused by new external tables being ingested during the
921 : // operation).
922 : DownloadBegin func(DownloadInfo)
923 :
924 : // DownloadEnd is invoked when a db.Download operation completes.
925 : DownloadEnd func(DownloadInfo)
926 :
927 : // FormatUpgrade is invoked after the database's FormatMajorVersion
928 : // is upgraded.
929 : FormatUpgrade func(FormatMajorVersion)
930 :
931 : // ManifestCreated is invoked after a manifest has been created.
932 : ManifestCreated func(ManifestCreateInfo)
933 :
934 : // ManifestDeleted is invoked after a manifest has been deleted.
935 : ManifestDeleted func(ManifestDeleteInfo)
936 :
937 : // TableCreated is invoked when a table has been created.
938 : TableCreated func(TableCreateInfo)
939 :
940 : // TableDeleted is invoked after a table has been deleted.
941 : TableDeleted func(TableDeleteInfo)
942 :
943 : // TableIngested is invoked after an externally created table has been
944 : // ingested via a call to DB.Ingest().
945 : TableIngested func(TableIngestInfo)
946 :
947 : // TableStatsLoaded is invoked at most once, when the table stats
948 : // collector has loaded statistics for all tables that existed at Open.
949 : TableStatsLoaded func(TableStatsInfo)
950 :
951 : // TableValidated is invoked after validation runs on an sstable.
952 : TableValidated func(TableValidatedInfo)
953 :
954 : // WALCreated is invoked after a WAL has been created.
955 : WALCreated func(WALCreateInfo)
956 :
957 : // WALDeleted is invoked after a WAL has been deleted.
958 : WALDeleted func(WALDeleteInfo)
959 :
960 : // WriteStallBegin is invoked when writes are intentionally delayed.
961 : WriteStallBegin func(WriteStallBeginInfo)
962 :
963 : // WriteStallEnd is invoked when delayed writes are released.
964 : WriteStallEnd func()
965 :
966 : // LowDiskSpace is invoked periodically when the disk space is running
967 : // low.
968 : LowDiskSpace func(LowDiskSpaceInfo)
969 :
970 : // PossibleAPIMisuse is invoked when a possible API misuse is detected.
971 : PossibleAPIMisuse func(PossibleAPIMisuseInfo)
972 : }
973 :
974 : // EnsureDefaults ensures that background error events are logged to the
975 : // specified logger if a handler for those events hasn't been otherwise
976 : // specified. Ensure all handlers are non-nil so that we don't have to check
977 : // for nil-ness before invoking.
978 1 : func (l *EventListener) EnsureDefaults(logger Logger) {
979 1 : if l.BackgroundError == nil {
980 1 : if logger != nil {
981 1 : l.BackgroundError = func(err error) {
982 0 : logger.Errorf("background error: %s", err)
983 0 : }
984 0 : } else {
985 0 : l.BackgroundError = func(error) {}
986 : }
987 : }
988 1 : if l.BlobFileCreated == nil {
989 1 : l.BlobFileCreated = func(info BlobFileCreateInfo) {}
990 : }
991 1 : if l.BlobFileDeleted == nil {
992 1 : l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
993 : }
994 1 : if l.BlobFileRewriteBegin == nil {
995 1 : l.BlobFileRewriteBegin = func(info BlobFileRewriteInfo) {}
996 : }
997 1 : if l.BlobFileRewriteEnd == nil {
998 1 : l.BlobFileRewriteEnd = func(info BlobFileRewriteInfo) {}
999 : }
1000 1 : if l.DataCorruption == nil {
1001 1 : if logger != nil {
1002 1 : l.DataCorruption = func(info DataCorruptionInfo) {
1003 0 : logger.Fatalf("%s", info)
1004 0 : }
1005 0 : } else {
1006 0 : l.DataCorruption = func(info DataCorruptionInfo) {}
1007 : }
1008 : }
1009 1 : if l.CompactionBegin == nil {
1010 1 : l.CompactionBegin = func(info CompactionInfo) {}
1011 : }
1012 1 : if l.CompactionEnd == nil {
1013 1 : l.CompactionEnd = func(info CompactionInfo) {}
1014 : }
1015 1 : if l.DiskSlow == nil {
1016 1 : l.DiskSlow = func(info DiskSlowInfo) {}
1017 : }
1018 1 : if l.FlushBegin == nil {
1019 1 : l.FlushBegin = func(info FlushInfo) {}
1020 : }
1021 1 : if l.FlushEnd == nil {
1022 1 : l.FlushEnd = func(info FlushInfo) {}
1023 : }
1024 1 : if l.DownloadBegin == nil {
1025 1 : l.DownloadBegin = func(info DownloadInfo) {}
1026 : }
1027 1 : if l.DownloadEnd == nil {
1028 1 : l.DownloadEnd = func(info DownloadInfo) {}
1029 : }
1030 1 : if l.FormatUpgrade == nil {
1031 1 : l.FormatUpgrade = func(v FormatMajorVersion) {}
1032 : }
1033 1 : if l.ManifestCreated == nil {
1034 1 : l.ManifestCreated = func(info ManifestCreateInfo) {}
1035 : }
1036 1 : if l.ManifestDeleted == nil {
1037 1 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
1038 : }
1039 1 : if l.TableCreated == nil {
1040 1 : l.TableCreated = func(info TableCreateInfo) {}
1041 : }
1042 1 : if l.TableDeleted == nil {
1043 1 : l.TableDeleted = func(info TableDeleteInfo) {}
1044 : }
1045 1 : if l.TableIngested == nil {
1046 1 : l.TableIngested = func(info TableIngestInfo) {}
1047 : }
1048 1 : if l.TableStatsLoaded == nil {
1049 1 : l.TableStatsLoaded = func(info TableStatsInfo) {}
1050 : }
1051 1 : if l.TableValidated == nil {
1052 1 : l.TableValidated = func(validated TableValidatedInfo) {}
1053 : }
1054 1 : if l.WALCreated == nil {
1055 1 : l.WALCreated = func(info WALCreateInfo) {}
1056 : }
1057 1 : if l.WALDeleted == nil {
1058 1 : l.WALDeleted = func(info WALDeleteInfo) {}
1059 : }
1060 1 : if l.WriteStallBegin == nil {
1061 1 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
1062 : }
1063 1 : if l.WriteStallEnd == nil {
1064 1 : l.WriteStallEnd = func() {}
1065 : }
1066 1 : if l.LowDiskSpace == nil {
1067 1 : l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
1068 : }
1069 1 : if l.PossibleAPIMisuse == nil {
1070 1 : l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
1071 : }
1072 : }
1073 :
1074 : // MakeLoggingEventListener creates an EventListener that logs all events to the
1075 : // specified logger.
1076 1 : func MakeLoggingEventListener(logger Logger) EventListener {
1077 1 : if logger == nil {
1078 0 : logger = DefaultLogger
1079 0 : }
1080 :
1081 1 : return EventListener{
1082 1 : BackgroundError: func(err error) {
1083 0 : logger.Errorf("background error: %s", err)
1084 0 : },
1085 1 : BlobFileCreated: func(info BlobFileCreateInfo) {
1086 1 : logger.Infof("%s", info)
1087 1 : },
1088 1 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
1089 1 : logger.Infof("%s", info)
1090 1 : },
1091 1 : BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
1092 1 : logger.Infof("%s", info)
1093 1 : },
1094 1 : BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
1095 1 : logger.Infof("%s", info)
1096 1 : },
1097 0 : DataCorruption: func(info DataCorruptionInfo) {
1098 0 : logger.Errorf("%s", info)
1099 0 : },
1100 1 : CompactionBegin: func(info CompactionInfo) {
1101 1 : logger.Infof("%s", info)
1102 1 : },
1103 0 : CompactionEnd: func(info CompactionInfo) {
1104 0 : logger.Infof("%s", info)
1105 0 : },
1106 0 : DiskSlow: func(info DiskSlowInfo) {
1107 0 : logger.Infof("%s", info)
1108 0 : },
1109 1 : FlushBegin: func(info FlushInfo) {
1110 1 : logger.Infof("%s", info)
1111 1 : },
1112 0 : FlushEnd: func(info FlushInfo) {
1113 0 : logger.Infof("%s", info)
1114 0 : },
1115 1 : DownloadBegin: func(info DownloadInfo) {
1116 1 : logger.Infof("%s", info)
1117 1 : },
1118 0 : DownloadEnd: func(info DownloadInfo) {
1119 0 : logger.Infof("%s", info)
1120 0 : },
1121 1 : FormatUpgrade: func(v FormatMajorVersion) {
1122 1 : logger.Infof("upgraded to format version: %s", v)
1123 1 : },
1124 0 : ManifestCreated: func(info ManifestCreateInfo) {
1125 0 : logger.Infof("%s", info)
1126 0 : },
1127 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
1128 0 : logger.Infof("%s", info)
1129 0 : },
1130 1 : TableCreated: func(info TableCreateInfo) {
1131 1 : logger.Infof("%s", info)
1132 1 : },
1133 0 : TableDeleted: func(info TableDeleteInfo) {
1134 0 : logger.Infof("%s", info)
1135 0 : },
1136 0 : TableIngested: func(info TableIngestInfo) {
1137 0 : logger.Infof("%s", info)
1138 0 : },
1139 1 : TableStatsLoaded: func(info TableStatsInfo) {
1140 1 : logger.Infof("%s", info)
1141 1 : },
1142 1 : TableValidated: func(info TableValidatedInfo) {
1143 1 : logger.Infof("%s", info)
1144 1 : },
1145 0 : WALCreated: func(info WALCreateInfo) {
1146 0 : logger.Infof("%s", info)
1147 0 : },
1148 0 : WALDeleted: func(info WALDeleteInfo) {
1149 0 : logger.Infof("%s", info)
1150 0 : },
1151 1 : WriteStallBegin: func(info WriteStallBeginInfo) {
1152 1 : logger.Infof("%s", info)
1153 1 : },
1154 1 : WriteStallEnd: func() {
1155 1 : logger.Infof("write stall ending")
1156 1 : },
1157 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1158 0 : logger.Infof("%s", info)
1159 0 : },
1160 1 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1161 1 : logger.Infof("%s", info)
1162 1 : },
1163 : }
1164 : }
1165 :
1166 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
1167 0 : func TeeEventListener(a, b EventListener) EventListener {
1168 0 : a.EnsureDefaults(nil)
1169 0 : b.EnsureDefaults(nil)
1170 0 : return EventListener{
1171 0 : BackgroundError: func(err error) {
1172 0 : a.BackgroundError(err)
1173 0 : b.BackgroundError(err)
1174 0 : },
1175 0 : BlobFileCreated: func(info BlobFileCreateInfo) {
1176 0 : a.BlobFileCreated(info)
1177 0 : b.BlobFileCreated(info)
1178 0 : },
1179 0 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
1180 0 : a.BlobFileDeleted(info)
1181 0 : b.BlobFileDeleted(info)
1182 0 : },
1183 0 : BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
1184 0 : a.BlobFileRewriteBegin(info)
1185 0 : b.BlobFileRewriteBegin(info)
1186 0 : },
1187 0 : BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
1188 0 : a.BlobFileRewriteEnd(info)
1189 0 : b.BlobFileRewriteEnd(info)
1190 0 : },
1191 0 : DataCorruption: func(info DataCorruptionInfo) {
1192 0 : a.DataCorruption(info)
1193 0 : b.DataCorruption(info)
1194 0 : },
1195 0 : CompactionBegin: func(info CompactionInfo) {
1196 0 : a.CompactionBegin(info)
1197 0 : b.CompactionBegin(info)
1198 0 : },
1199 0 : CompactionEnd: func(info CompactionInfo) {
1200 0 : a.CompactionEnd(info)
1201 0 : b.CompactionEnd(info)
1202 0 : },
1203 0 : DiskSlow: func(info DiskSlowInfo) {
1204 0 : a.DiskSlow(info)
1205 0 : b.DiskSlow(info)
1206 0 : },
1207 0 : FlushBegin: func(info FlushInfo) {
1208 0 : a.FlushBegin(info)
1209 0 : b.FlushBegin(info)
1210 0 : },
1211 0 : FlushEnd: func(info FlushInfo) {
1212 0 : a.FlushEnd(info)
1213 0 : b.FlushEnd(info)
1214 0 : },
1215 0 : DownloadBegin: func(info DownloadInfo) {
1216 0 : a.DownloadBegin(info)
1217 0 : b.DownloadBegin(info)
1218 0 : },
1219 0 : DownloadEnd: func(info DownloadInfo) {
1220 0 : a.DownloadEnd(info)
1221 0 : b.DownloadEnd(info)
1222 0 : },
1223 0 : FormatUpgrade: func(v FormatMajorVersion) {
1224 0 : a.FormatUpgrade(v)
1225 0 : b.FormatUpgrade(v)
1226 0 : },
1227 0 : ManifestCreated: func(info ManifestCreateInfo) {
1228 0 : a.ManifestCreated(info)
1229 0 : b.ManifestCreated(info)
1230 0 : },
1231 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
1232 0 : a.ManifestDeleted(info)
1233 0 : b.ManifestDeleted(info)
1234 0 : },
1235 0 : TableCreated: func(info TableCreateInfo) {
1236 0 : a.TableCreated(info)
1237 0 : b.TableCreated(info)
1238 0 : },
1239 0 : TableDeleted: func(info TableDeleteInfo) {
1240 0 : a.TableDeleted(info)
1241 0 : b.TableDeleted(info)
1242 0 : },
1243 0 : TableIngested: func(info TableIngestInfo) {
1244 0 : a.TableIngested(info)
1245 0 : b.TableIngested(info)
1246 0 : },
1247 0 : TableStatsLoaded: func(info TableStatsInfo) {
1248 0 : a.TableStatsLoaded(info)
1249 0 : b.TableStatsLoaded(info)
1250 0 : },
1251 0 : TableValidated: func(info TableValidatedInfo) {
1252 0 : a.TableValidated(info)
1253 0 : b.TableValidated(info)
1254 0 : },
1255 0 : WALCreated: func(info WALCreateInfo) {
1256 0 : a.WALCreated(info)
1257 0 : b.WALCreated(info)
1258 0 : },
1259 0 : WALDeleted: func(info WALDeleteInfo) {
1260 0 : a.WALDeleted(info)
1261 0 : b.WALDeleted(info)
1262 0 : },
1263 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
1264 0 : a.WriteStallBegin(info)
1265 0 : b.WriteStallBegin(info)
1266 0 : },
1267 0 : WriteStallEnd: func() {
1268 0 : a.WriteStallEnd()
1269 0 : b.WriteStallEnd()
1270 0 : },
1271 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1272 0 : a.LowDiskSpace(info)
1273 0 : b.LowDiskSpace(info)
1274 0 : },
1275 0 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1276 0 : a.PossibleAPIMisuse(info)
1277 0 : b.PossibleAPIMisuse(info)
1278 0 : },
1279 : }
1280 : }
1281 :
1282 : // lowDiskSpaceReporter contains the logic to report low disk space events.
1283 : // Report is called whenever we get the disk usage statistics.
1284 : //
1285 : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
1286 : // whenever we reach a new threshold. We periodically repost the event every 30
1287 : // minutes until we are above all thresholds.
1288 : type lowDiskSpaceReporter struct {
1289 : mu struct {
1290 : sync.Mutex
1291 : lastNoticeThreshold int
1292 : lastNoticeTime crtime.Mono
1293 : }
1294 : }
1295 :
1296 : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
1297 :
1298 : const lowDiskSpaceFrequency = 30 * time.Minute
1299 :
1300 1 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
1301 1 : threshold, ok := r.findThreshold(availBytes, totalBytes)
1302 1 : if !ok {
1303 1 : // Normal path.
1304 1 : return
1305 1 : }
1306 0 : if r.shouldReport(threshold, crtime.NowMono()) {
1307 0 : el.LowDiskSpace(LowDiskSpaceInfo{
1308 0 : AvailBytes: availBytes,
1309 0 : TotalBytes: totalBytes,
1310 0 : PercentThreshold: threshold,
1311 0 : })
1312 0 : }
1313 : }
1314 :
1315 : // shouldReport returns true if we should report an event. Updates
1316 : // lastNoticeTime/lastNoticeThreshold appropriately.
1317 0 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
1318 0 : r.mu.Lock()
1319 0 : defer r.mu.Unlock()
1320 0 : if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
1321 0 : now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
1322 0 : r.mu.lastNoticeThreshold = threshold
1323 0 : r.mu.lastNoticeTime = now
1324 0 : return true
1325 0 : }
1326 0 : return false
1327 : }
1328 :
1329 : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
1330 : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
1331 : // there is more free space than the highest threshold).
1332 : func (r *lowDiskSpaceReporter) findThreshold(
1333 : availBytes, totalBytes uint64,
1334 1 : ) (threshold int, ok bool) {
1335 1 : // Note: in the normal path, we exit the loop during the first iteration.
1336 1 : for i, t := range lowDiskSpaceThresholds {
1337 1 : if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
1338 1 : break
1339 : }
1340 0 : threshold = t
1341 0 : ok = true
1342 : }
1343 1 : return threshold, ok
1344 : }
1345 :
1346 : // reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
1347 : // to the event listener and also adds a DataCorruptionInfo payload to the error.
1348 0 : func (d *DB) reportCorruption(meta base.ObjectInfo, err error) error {
1349 0 : if invariants.Enabled && !IsCorruptionError(err) {
1350 0 : panic("not a corruption error")
1351 : }
1352 0 : fileType, fileNum := meta.FileInfo()
1353 0 :
1354 0 : objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
1355 0 : if lookupErr != nil {
1356 0 : // If the object is not known to the provider, it must be a local object
1357 0 : // that was missing when we opened the store. Remote objects have their
1358 0 : // metadata in a catalog, so even if the backing object is deleted, the
1359 0 : // DiskFileNum would still be known.
1360 0 : objMeta = objstorage.ObjectMetadata{DiskFileNum: fileNum, FileType: fileType}
1361 0 : }
1362 0 : path := d.objProvider.Path(objMeta)
1363 0 : if objMeta.IsRemote() {
1364 0 : // Remote path (which include the locator and full path) might not always be
1365 0 : // safe.
1366 0 : err = errors.WithHintf(err, "path: %s", path)
1367 0 : } else {
1368 0 : // Local paths are safe: they start with the store directory and the
1369 0 : // filename is generated by Pebble.
1370 0 : err = errors.WithHintf(err, "path: %s", redact.Safe(path))
1371 0 : }
1372 0 : info := DataCorruptionInfo{
1373 0 : Path: path,
1374 0 : IsRemote: objMeta.IsRemote(),
1375 0 : Locator: objMeta.Remote.Locator,
1376 0 : Bounds: meta.UserKeyBounds(),
1377 0 : Details: err,
1378 0 : }
1379 0 : d.opts.EventListener.DataCorruption(info)
1380 0 : // We don't use errors.Join() because that also annotates with this stack
1381 0 : // trace which would not be useful.
1382 0 : return errorsjoin.Join(err, &corruptionDetailError{info: info})
1383 : }
1384 :
1385 : type corruptionDetailError struct {
1386 : info DataCorruptionInfo
1387 : }
1388 :
1389 0 : func (e *corruptionDetailError) Error() string {
1390 0 : return "<corruption detail carrier>"
1391 0 : }
1392 :
1393 : // ExtractDataCorruptionInfo extracts the DataCorruptionInfo details from a
1394 : // corruption error. Returns nil if there is no such detail.
1395 0 : func ExtractDataCorruptionInfo(err error) *DataCorruptionInfo {
1396 0 : var e *corruptionDetailError
1397 0 : if errors.As(err, &e) {
1398 0 : return &e.info
1399 0 : }
1400 0 : return nil
1401 : }
|