Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crtime"
14 : "github.com/cockroachdb/errors"
15 : errorsjoin "github.com/cockroachdb/errors/join"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/humanize"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/objstorage/remote"
22 : "github.com/cockroachdb/pebble/vfs"
23 : "github.com/cockroachdb/redact"
24 : )
25 :
26 : // TableNum is an identifier for a table within a database.
27 : type TableNum = base.TableNum
28 :
29 : // TableInfo exports the manifest.TableInfo type.
30 : type TableInfo = manifest.TableInfo
31 :
32 2 : func tablesTotalSize(tables []TableInfo) uint64 {
33 2 : var size uint64
34 2 : for i := range tables {
35 2 : size += tables[i].Size
36 2 : }
37 2 : return size
38 : }
39 :
40 2 : func formatFileNums(tables []TableInfo) string {
41 2 : var buf strings.Builder
42 2 : for i := range tables {
43 2 : if i > 0 {
44 2 : buf.WriteString(" ")
45 2 : }
46 2 : buf.WriteString(tables[i].FileNum.String())
47 : }
48 2 : return buf.String()
49 : }
50 :
51 : // DataCorruptionInfo contains the information for a DataCorruption event.
52 : type DataCorruptionInfo struct {
53 : // Path of the file that is corrupted. For remote files the path starts with
54 : // "remote://".
55 : Path string
56 : IsRemote bool
57 : // Locator is only set when IsRemote is true (note that an empty Locator is
58 : // valid even then).
59 : Locator remote.Locator
60 : // Bounds indicates the keyspace range that is affected.
61 : Bounds base.UserKeyBounds
62 : // Details of the error. See cockroachdb/error for how to format with or
63 : // without redaction.
64 : Details error
65 : }
66 :
67 1 : func (i DataCorruptionInfo) String() string {
68 1 : return redact.StringWithoutMarkers(i)
69 1 : }
70 :
71 : // SafeFormat implements redact.SafeFormatter.
72 1 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
73 1 : w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
74 1 : if i.IsRemote {
75 0 : w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
76 0 : }
77 1 : w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
78 : }
79 :
80 : // LevelInfo contains info pertaining to a particular level.
81 : type LevelInfo struct {
82 : Level int
83 : Tables []TableInfo
84 : Score float64
85 : }
86 :
87 0 : func (i LevelInfo) String() string {
88 0 : return redact.StringWithoutMarkers(i)
89 0 : }
90 :
91 : // SafeFormat implements redact.SafeFormatter.
92 2 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
93 2 : w.Printf("L%d [%s] (%s) Score=%.2f",
94 2 : redact.Safe(i.Level),
95 2 : redact.Safe(formatFileNums(i.Tables)),
96 2 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
97 2 : redact.Safe(i.Score))
98 2 : }
99 :
100 : // BlobFileCreateInfo contains the info for a blob file creation event.
101 : type BlobFileCreateInfo struct {
102 : JobID int
103 : // Reason is the reason for the table creation: "compacting", "flushing", or
104 : // "ingesting".
105 : Reason string
106 : Path string
107 : FileNum base.DiskFileNum
108 : }
109 :
110 2 : func (i BlobFileCreateInfo) String() string {
111 2 : return redact.StringWithoutMarkers(i)
112 2 : }
113 :
114 : // SafeFormat implements redact.SafeFormatter.
115 2 : func (i BlobFileCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
116 2 : w.Printf("[JOB %d] %s: blob file created %s",
117 2 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
118 2 : }
119 :
120 : // BlobFileDeleteInfo contains the info for a blob file deletion event.
121 : type BlobFileDeleteInfo struct {
122 : JobID int
123 : Path string
124 : FileNum base.DiskFileNum
125 : Err error
126 : }
127 :
128 2 : func (i BlobFileDeleteInfo) String() string {
129 2 : return redact.StringWithoutMarkers(i)
130 2 : }
131 :
132 : // SafeFormat implements redact.SafeFormatter.
133 2 : func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
134 2 : if i.Err != nil {
135 0 : w.Printf("[JOB %d] blob file delete error %s: %s",
136 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
137 0 : return
138 0 : }
139 2 : w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
140 : }
141 :
142 : // BlobFileRewriteInfo contains the info for a blob file rewrite event.
143 : type BlobFileRewriteInfo struct {
144 : // JobID is the ID of the job.
145 : JobID int
146 : // Input contains the input tables for the compaction organized by level.
147 : Input BlobFileInfo
148 : // Output contains the output tables generated by the compaction. The output
149 : // info is empty for the compaction begin event.
150 : Output BlobFileInfo
151 : // Duration is the time spent compacting, including reading and writing
152 : // files.
153 : Duration time.Duration
154 : // TotalDuration is the total wall-time duration of the compaction,
155 : // including applying the compaction to the database. TotalDuration is
156 : // always ≥ Duration.
157 : TotalDuration time.Duration
158 : Done bool
159 : // Err is set only if Done is true. If non-nil, indicates that the compaction
160 : // failed. Note that err can be ErrCancelledCompaction, which can happen
161 : // during normal operation.
162 : Err error
163 : }
164 :
165 2 : func (i BlobFileRewriteInfo) String() string {
166 2 : return redact.StringWithoutMarkers(i)
167 2 : }
168 :
169 : // SafeFormat implements redact.SafeFormatter.
170 2 : func (i BlobFileRewriteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
171 2 : if i.Err != nil {
172 1 : w.Printf("[JOB %d] blob file (%s, %s) rewrite error: %s",
173 1 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum, i.Err)
174 1 : return
175 1 : }
176 :
177 2 : if !i.Done {
178 2 : w.Printf("[JOB %d] rewriting blob file %s (physical file %s)",
179 2 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum)
180 2 : return
181 2 : }
182 2 : w.Printf("[JOB %d] rewrote blob file (%s, %s) -> (%s, %s), in %.1fs (%.1fs total)",
183 2 : redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum,
184 2 : i.Output.BlobFileID, i.Output.DiskFileNum,
185 2 : redact.Safe(i.Duration.Seconds()),
186 2 : redact.Safe(i.TotalDuration.Seconds()))
187 : }
188 :
189 : // BlobFileInfo describes a blob file.
190 : type BlobFileInfo struct {
191 : // BlobFileID is the logical ID of the blob file.
192 : BlobFileID base.BlobFileID
193 : // DiskFileNum is the file number of the blob file on disk.
194 : DiskFileNum base.DiskFileNum
195 : // Size is the physical size of the file in bytes.
196 : Size uint64
197 : // ValueSize is the pre-compressed size of the values in the blob file in
198 : // bytes.
199 : ValueSize uint64
200 : }
201 :
202 : // CompactionInfo contains the info for a compaction event.
203 : type CompactionInfo struct {
204 : // JobID is the ID of the compaction job.
205 : JobID int
206 : // Reason is the reason for the compaction.
207 : Reason string
208 : // Input contains the input tables for the compaction organized by level.
209 : Input []LevelInfo
210 : // Output contains the output tables generated by the compaction. The output
211 : // tables are empty for the compaction begin event.
212 : Output LevelInfo
213 : // Duration is the time spent compacting, including reading and writing
214 : // sstables.
215 : Duration time.Duration
216 : // TotalDuration is the total wall-time duration of the compaction,
217 : // including applying the compaction to the database. TotalDuration is
218 : // always ≥ Duration.
219 : TotalDuration time.Duration
220 : Done bool
221 : // Err is set only if Done is true. If non-nil, indicates that the compaction
222 : // failed. Note that err can be ErrCancelledCompaction, which can happen
223 : // during normal operation.
224 : Err error
225 :
226 : SingleLevelOverlappingRatio float64
227 : MultiLevelOverlappingRatio float64
228 :
229 : // Annotations specifies additional info to appear in a compaction's event log line
230 : Annotations compactionAnnotations
231 : }
232 :
233 : type compactionAnnotations []string
234 :
235 : // SafeFormat implements redact.SafeFormatter.
236 2 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
237 2 : if len(ca) == 0 {
238 0 : return
239 0 : }
240 2 : for i := range ca {
241 2 : if i != 0 {
242 2 : w.Print(" ")
243 2 : }
244 2 : w.Printf("%s", redact.SafeString(ca[i]))
245 : }
246 : }
247 :
248 2 : func (i CompactionInfo) String() string {
249 2 : return redact.StringWithoutMarkers(i)
250 2 : }
251 :
252 : // SafeFormat implements redact.SafeFormatter.
253 2 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
254 2 : if i.Err != nil {
255 2 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
256 2 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
257 2 : return
258 2 : }
259 :
260 2 : if !i.Done {
261 2 : w.Printf("[JOB %d] compacting(%s) ",
262 2 : redact.Safe(i.JobID),
263 2 : redact.SafeString(i.Reason))
264 2 : if len(i.Annotations) > 0 {
265 2 : w.Printf("%s ", i.Annotations)
266 2 : }
267 2 : w.Printf("%s; ", levelInfos(i.Input))
268 2 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
269 2 : return
270 : }
271 2 : outputSize := tablesTotalSize(i.Output.Tables)
272 2 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
273 2 : if len(i.Annotations) > 0 {
274 2 : w.Printf("%s ", i.Annotations)
275 2 : }
276 2 : w.Print(levelInfos(i.Input))
277 2 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
278 2 : redact.Safe(i.Output.Level),
279 2 : redact.Safe(formatFileNums(i.Output.Tables)),
280 2 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
281 2 : redact.Safe(i.Duration.Seconds()),
282 2 : redact.Safe(i.TotalDuration.Seconds()),
283 2 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
284 : }
285 :
286 : type levelInfos []LevelInfo
287 :
288 2 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
289 2 : for j, levelInfo := range i {
290 2 : if j > 0 {
291 2 : w.Printf(" + ")
292 2 : }
293 2 : w.Print(levelInfo)
294 : }
295 : }
296 :
297 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
298 : // file.
299 : type DiskSlowInfo = vfs.DiskSlowInfo
300 :
301 : // FlushInfo contains the info for a flush event.
302 : type FlushInfo struct {
303 : // JobID is the ID of the flush job.
304 : JobID int
305 : // Reason is the reason for the flush.
306 : Reason string
307 : // Input contains the count of input memtables that were flushed.
308 : Input int
309 : // InputBytes contains the total in-memory size of the memtable(s) that were
310 : // flushed. This size includes skiplist indexing data structures.
311 : InputBytes uint64
312 : // Output contains the ouptut table generated by the flush. The output info
313 : // is empty for the flush begin event.
314 : Output []TableInfo
315 : // Duration is the time spent flushing. This duration includes writing and
316 : // syncing all of the flushed keys to sstables.
317 : Duration time.Duration
318 : // TotalDuration is the total wall-time duration of the flush, including
319 : // applying the flush to the database. TotalDuration is always ≥ Duration.
320 : TotalDuration time.Duration
321 : // Ingest is set to true if the flush is handling tables that were added to
322 : // the flushable queue via an ingestion operation.
323 : Ingest bool
324 : // IngestLevels are the output levels for each ingested table in the flush.
325 : // This field is only populated when Ingest is true.
326 : IngestLevels []int
327 : Done bool
328 : Err error
329 : }
330 :
331 2 : func (i FlushInfo) String() string {
332 2 : return redact.StringWithoutMarkers(i)
333 2 : }
334 :
335 : // SafeFormat implements redact.SafeFormatter.
336 2 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
337 2 : if i.Err != nil {
338 2 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
339 2 : return
340 2 : }
341 :
342 2 : plural := redact.SafeString("s")
343 2 : if i.Input == 1 {
344 2 : plural = ""
345 2 : }
346 2 : if !i.Done {
347 2 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
348 2 : if !i.Ingest {
349 2 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
350 2 : w.SafeString(plural)
351 2 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
352 2 : } else {
353 2 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
354 2 : }
355 2 : return
356 : }
357 :
358 2 : outputSize := tablesTotalSize(i.Output)
359 2 : if !i.Ingest {
360 2 : if invariants.Enabled && len(i.IngestLevels) > 0 {
361 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
362 : }
363 2 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
364 2 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
365 2 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
366 2 : redact.Safe(formatFileNums(i.Output)),
367 2 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
368 2 : redact.Safe(i.Duration.Seconds()),
369 2 : redact.Safe(i.TotalDuration.Seconds()),
370 2 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
371 2 : } else {
372 2 : if invariants.Enabled && len(i.IngestLevels) == 0 {
373 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
374 : }
375 2 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
376 2 : redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
377 2 : for j, level := range i.IngestLevels {
378 2 : file := i.Output[j]
379 2 : if j > 0 {
380 2 : w.Printf(" +")
381 2 : }
382 2 : w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
383 : }
384 2 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
385 2 : redact.Safe(i.Duration.Seconds()),
386 2 : redact.Safe(i.TotalDuration.Seconds()),
387 2 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
388 : }
389 : }
390 :
391 : // DownloadInfo contains the info for a DB.Download() event.
392 : type DownloadInfo struct {
393 : // JobID is the ID of the download job.
394 : JobID int
395 :
396 : Spans []DownloadSpan
397 :
398 : // Duration is the time since the operation was started.
399 : Duration time.Duration
400 : DownloadCompactionsLaunched int
401 :
402 : // RestartCount indicates that the download operation restarted because it
403 : // noticed that new external files were ingested. A DownloadBegin event with
404 : // RestartCount = 0 is the start of the operation; each time we restart it we
405 : // have another DownloadBegin event with RestartCount > 0.
406 : RestartCount int
407 : Done bool
408 : Err error
409 : }
410 :
411 2 : func (i DownloadInfo) String() string {
412 2 : return redact.StringWithoutMarkers(i)
413 2 : }
414 :
415 : // SafeFormat implements redact.SafeFormatter.
416 2 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
417 2 : switch {
418 0 : case i.Err != nil:
419 0 : w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
420 :
421 2 : case i.Done:
422 2 : w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
423 2 : redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
424 :
425 2 : default:
426 2 : if i.RestartCount == 0 {
427 2 : w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
428 2 : } else {
429 0 : w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
430 0 : redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
431 0 : redact.Safe(i.DownloadCompactionsLaunched))
432 0 : }
433 : }
434 : }
435 :
436 : // ManifestCreateInfo contains info about a manifest creation event.
437 : type ManifestCreateInfo struct {
438 : // JobID is the ID of the job the caused the manifest to be created.
439 : JobID int
440 : Path string
441 : // The file number of the new Manifest.
442 : FileNum base.DiskFileNum
443 : Err error
444 : }
445 :
446 2 : func (i ManifestCreateInfo) String() string {
447 2 : return redact.StringWithoutMarkers(i)
448 2 : }
449 :
450 : // SafeFormat implements redact.SafeFormatter.
451 2 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
452 2 : if i.Err != nil {
453 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
454 0 : return
455 0 : }
456 2 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
457 : }
458 :
459 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
460 : type ManifestDeleteInfo struct {
461 : // JobID is the ID of the job the caused the Manifest to be deleted.
462 : JobID int
463 : Path string
464 : FileNum base.DiskFileNum
465 : Err error
466 : }
467 :
468 2 : func (i ManifestDeleteInfo) String() string {
469 2 : return redact.StringWithoutMarkers(i)
470 2 : }
471 :
472 : // SafeFormat implements redact.SafeFormatter.
473 2 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
474 2 : if i.Err != nil {
475 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
476 0 : return
477 0 : }
478 2 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
479 : }
480 :
481 : // TableCreateInfo contains the info for a table creation event.
482 : type TableCreateInfo struct {
483 : JobID int
484 : // Reason is the reason for the table creation: "compacting", "flushing", or
485 : // "ingesting".
486 : Reason string
487 : Path string
488 : FileNum base.DiskFileNum
489 : }
490 :
491 2 : func (i TableCreateInfo) String() string {
492 2 : return redact.StringWithoutMarkers(i)
493 2 : }
494 :
495 : // SafeFormat implements redact.SafeFormatter.
496 2 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
497 2 : w.Printf("[JOB %d] %s: sstable created %s",
498 2 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
499 2 : }
500 :
501 : // TableDeleteInfo contains the info for a table deletion event.
502 : type TableDeleteInfo struct {
503 : JobID int
504 : Path string
505 : FileNum base.DiskFileNum
506 : Err error
507 : }
508 :
509 2 : func (i TableDeleteInfo) String() string {
510 2 : return redact.StringWithoutMarkers(i)
511 2 : }
512 :
513 : // SafeFormat implements redact.SafeFormatter.
514 2 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
515 2 : if i.Err != nil {
516 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
517 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
518 0 : return
519 0 : }
520 2 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
521 : }
522 :
523 : // TableIngestInfo contains the info for a table ingestion event.
524 : type TableIngestInfo struct {
525 : // JobID is the ID of the job the caused the table to be ingested.
526 : JobID int
527 : Tables []struct {
528 : TableInfo
529 : Level int
530 : }
531 : // GlobalSeqNum is the sequence number that was assigned to all entries in
532 : // the ingested table.
533 : GlobalSeqNum base.SeqNum
534 : // flushable indicates whether the ingested sstable was treated as a
535 : // flushable.
536 : flushable bool
537 : Err error
538 : // WaitFlushDuration is the time spent waiting for memtable flushes to
539 : // complete, given that an overlap between ingesting sstables and memtables
540 : // exists.
541 : WaitFlushDuration time.Duration
542 : // ManifestUpdateDuration is the time spent updating the manifest.
543 : ManifestUpdateDuration time.Duration
544 : // BlockReadDuration is the total time spent reading blocks for the ingested
545 : // sstable.
546 : BlockReadDuration time.Duration
547 : // BlockReadBytes is the total number of bytes from blocks read for the
548 : // ingested sstable. This does not include bytes read from the block cache.
549 : BlockReadBytes uint64
550 : }
551 :
552 2 : func (i TableIngestInfo) String() string {
553 2 : return redact.StringWithoutMarkers(i)
554 2 : }
555 :
556 : // SafeFormat implements redact.SafeFormatter.
557 2 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
558 2 : if i.Err != nil {
559 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
560 0 : return
561 0 : }
562 :
563 2 : if i.flushable {
564 2 : w.Printf("[JOB %d] ingested as flushable, memtable flushes took %.1fs:", redact.Safe(i.JobID),
565 2 : redact.Safe(i.WaitFlushDuration.Seconds()))
566 2 : } else {
567 2 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
568 2 : }
569 :
570 2 : for j := range i.Tables {
571 2 : t := &i.Tables[j]
572 2 : if j > 0 {
573 2 : w.Printf(",")
574 2 : }
575 2 : levelStr := ""
576 2 : if !i.flushable {
577 2 : levelStr = fmt.Sprintf("L%d:", t.Level)
578 2 : }
579 2 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
580 2 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
581 : }
582 2 : w.Printf("; manifest update took %.1fs; block reads took %.1fs with %s block bytes read",
583 2 : redact.Safe(i.ManifestUpdateDuration.Seconds()), redact.Safe(i.BlockReadDuration.Seconds()),
584 2 : redact.Safe(humanize.Bytes.Uint64(i.BlockReadBytes)))
585 : }
586 :
587 : // TableStatsInfo contains the info for a table stats loaded event.
588 : type TableStatsInfo struct {
589 : // JobID is the ID of the job that finished loading the initial tables'
590 : // stats.
591 : JobID int
592 : }
593 :
594 2 : func (i TableStatsInfo) String() string {
595 2 : return redact.StringWithoutMarkers(i)
596 2 : }
597 :
598 : // SafeFormat implements redact.SafeFormatter.
599 2 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
600 2 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
601 2 : }
602 :
603 : // TableValidatedInfo contains information on the result of a validation run
604 : // on an sstable.
605 : type TableValidatedInfo struct {
606 : JobID int
607 : Meta *manifest.TableMetadata
608 : }
609 :
610 2 : func (i TableValidatedInfo) String() string {
611 2 : return redact.StringWithoutMarkers(i)
612 2 : }
613 :
614 : // SafeFormat implements redact.SafeFormatter.
615 2 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
616 2 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
617 2 : }
618 :
619 : // WALCreateInfo contains info about a WAL creation event.
620 : type WALCreateInfo struct {
621 : // JobID is the ID of the job the caused the WAL to be created.
622 : JobID int
623 : Path string
624 : // The file number of the new WAL.
625 : FileNum base.DiskFileNum
626 : // The file number of a previous WAL which was recycled to create this
627 : // one. Zero if recycling did not take place.
628 : RecycledFileNum base.DiskFileNum
629 : Err error
630 : }
631 :
632 2 : func (i WALCreateInfo) String() string {
633 2 : return redact.StringWithoutMarkers(i)
634 2 : }
635 :
636 : // SafeFormat implements redact.SafeFormatter.
637 2 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
638 2 : if i.Err != nil {
639 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
640 0 : return
641 0 : }
642 :
643 2 : if i.RecycledFileNum == 0 {
644 2 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
645 2 : return
646 2 : }
647 :
648 1 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
649 1 : redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
650 : }
651 :
652 : // WALDeleteInfo contains the info for a WAL deletion event.
653 : //
654 : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
655 : // is insufficient to infer whether primary or secondary.
656 : type WALDeleteInfo struct {
657 : // JobID is the ID of the job the caused the WAL to be deleted.
658 : JobID int
659 : Path string
660 : FileNum base.DiskFileNum
661 : Err error
662 : }
663 :
664 2 : func (i WALDeleteInfo) String() string {
665 2 : return redact.StringWithoutMarkers(i)
666 2 : }
667 :
668 : // SafeFormat implements redact.SafeFormatter.
669 2 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
670 2 : if i.Err != nil {
671 1 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
672 1 : return
673 1 : }
674 2 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
675 : }
676 :
677 : // WriteStallBeginInfo contains the info for a write stall begin event.
678 : type WriteStallBeginInfo struct {
679 : Reason string
680 : }
681 :
682 2 : func (i WriteStallBeginInfo) String() string {
683 2 : return redact.StringWithoutMarkers(i)
684 2 : }
685 :
686 : // SafeFormat implements redact.SafeFormatter.
687 2 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
688 2 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
689 2 : }
690 :
691 : // LowDiskSpaceInfo contains the information for a LowDiskSpace
692 : // event.
693 : type LowDiskSpaceInfo struct {
694 : // AvailBytes is the disk space available to the current process in bytes.
695 : AvailBytes uint64
696 : // TotalBytes is the total disk space in bytes.
697 : TotalBytes uint64
698 : // PercentThreshold is one of a set of fixed percentages in the
699 : // lowDiskSpaceThresholds below. This event was issued because the disk
700 : // space went below this threshold.
701 : PercentThreshold int
702 : }
703 :
704 0 : func (i LowDiskSpaceInfo) String() string {
705 0 : return redact.StringWithoutMarkers(i)
706 0 : }
707 :
708 : // SafeFormat implements redact.SafeFormatter.
709 0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
710 0 : w.Printf(
711 0 : "available disk space under %d%% (%s of %s)",
712 0 : redact.Safe(i.PercentThreshold),
713 0 : redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
714 0 : redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
715 0 : )
716 0 : }
717 :
718 : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
719 : type PossibleAPIMisuseInfo struct {
720 : Kind APIMisuseKind
721 :
722 : // UserKey is set for the following kinds:
723 : // - IneffectualSingleDelete,
724 : // - NondeterministicSingleDelete,
725 : // - MissizedDelete,
726 : // - InvalidValue.
727 : UserKey []byte
728 :
729 : // ExtraInfo is set for the following kinds:
730 : // - MissizedDelete: contains "elidedSize=<size>,expectedSize=<size>"
731 : // - InvalidValue: contains "callback=<callbackName>,value=<value>,err=<err>"
732 : ExtraInfo redact.RedactableString
733 : }
734 :
735 2 : func (i PossibleAPIMisuseInfo) String() string {
736 2 : return redact.StringWithoutMarkers(i)
737 2 : }
738 :
739 : // SafeFormat implements redact.SafeFormatter.
740 2 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
741 2 : switch i.Kind {
742 2 : case IneffectualSingleDelete, NondeterministicSingleDelete:
743 2 : w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
744 2 : case MissizedDelete:
745 2 : w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
746 0 : case InvalidValue:
747 0 : w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
748 0 : default:
749 0 : if invariants.Enabled {
750 0 : panic("invalid API misuse event")
751 : }
752 0 : w.Printf("invalid API misuse event")
753 : }
754 : }
755 :
756 : // APIMisuseKind identifies the type of API misuse represented by a
757 : // PossibleAPIMisuse event.
758 : type APIMisuseKind int8
759 :
760 : const (
761 : // IneffectualSingleDelete is emitted in compactions/flushes if any
762 : // single delete is being elided without deleting a point set/merge.
763 : //
764 : // This event can sometimes be a false positive because of delete-only
765 : // compactions which can cause a recent RANGEDEL to peek below an older
766 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
767 : //
768 : // Example:
769 : // RANGEDEL [a, c)#10 in L0
770 : // SINGLEDEL b#5 in L1
771 : // SET b#3 in L6
772 : //
773 : // If the L6 file containing the SET is narrow and the L1 file containing
774 : // the SINGLEDEL is wide, a delete-only compaction can remove the file in
775 : // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
776 : // compacted down, it will not find any SET to delete, resulting in the
777 : // ineffectual callback.
778 : IneffectualSingleDelete APIMisuseKind = iota
779 :
780 : // NondeterministicSingleDelete is emitted in compactions/flushes if any
781 : // single delete has consumed a Set/Merge, and there is another immediately
782 : // older Set/SetWithDelete/Merge. The user of Pebble has violated the
783 : // invariant under which SingleDelete can be used correctly.
784 : //
785 : // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
786 : // ways some of these keys can first meet in a compaction.
787 : //
788 : // - All 3 keys in the same compaction: this callback will detect the
789 : // violation.
790 : //
791 : // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
792 : // disappear. The violation will not be detected, and the DB will have
793 : // Set#1 which is likely incorrect (from the user's perspective).
794 : //
795 : // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
796 : // which will later be consumed by SingleDelete#3. The violation will
797 : // not be detected and the DB will be correct.
798 : //
799 : // This event can sometimes be a false positive because of delete-only
800 : // compactions which can cause a recent RANGEDEL to peek below an older
801 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
802 : //
803 : // Example:
804 : // RANGEDEL [a, z)#60 in L0
805 : // SINGLEDEL g#50 in L1
806 : // SET g#40 in L2
807 : // RANGEDEL [g,h)#30 in L3
808 : // SET g#20 in L6
809 : //
810 : // In this example, the two SETs represent the same user write, and the
811 : // RANGEDELs are caused by the CockroachDB range being dropped. That is,
812 : // the user wrote to g once, range was dropped, then added back, which
813 : // caused the SET again, then at some point g was validly deleted using a
814 : // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
815 : // get fragmented due to compactions it has been part of. Say this L3 file
816 : // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
817 : // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
818 : // using a delete-only compaction, resulting in an LSM with state:
819 : //
820 : // RANGEDEL [a, z)#60 in L0
821 : // SINGLEDEL g#50 in L1
822 : // SET g#40 in L2
823 : // SET g#20 in L6
824 : //
825 : // A multi-level compaction involving L1, L2, L6 will cause the invariant
826 : // violation callback. This example doesn't need multi-level compactions:
827 : // say there was a Pebble snapshot at g#21 preventing g#20 from being
828 : // dropped when it meets g#40 in a compaction. That snapshot will not save
829 : // RANGEDEL [g,h)#30, so we can have:
830 : //
831 : // SINGLEDEL g#50 in L1
832 : // SET g#40, SET g#20 in L6
833 : //
834 : // And say the snapshot is removed and then the L1 and L6 compaction
835 : // happens, resulting in the invariant violation callback.
836 : NondeterministicSingleDelete
837 :
838 : // MissizedDelete is emitted when a DELSIZED tombstone is found that did
839 : // not accurately record the size of the value it deleted. This can lead to
840 : // incorrect behavior in compactions.
841 : MissizedDelete
842 :
843 : // InvalidValue is emitted when a user-implemented callback (such as
844 : // ShortAttributeExtractor) returns an error for a committed value. This
845 : // suggests that either the callback is not implemented for all possible
846 : // values or a malformed value was committed to the DB.
847 : InvalidValue
848 : )
849 :
850 2 : func (k APIMisuseKind) String() string {
851 2 : switch k {
852 2 : case IneffectualSingleDelete:
853 2 : return "ineffectual SINGLEDEL"
854 0 : case NondeterministicSingleDelete:
855 0 : return "nondeterministic SINGLEDEL"
856 2 : case MissizedDelete:
857 2 : return "missized DELSIZED"
858 0 : case InvalidValue:
859 0 : return "invalid value"
860 0 : default:
861 0 : return "unknown"
862 : }
863 : }
864 :
865 : // EventListener contains a set of functions that will be invoked when various
866 : // significant DB events occur. Note that the functions should not run for an
867 : // excessive amount of time as they are invoked synchronously by the DB and may
868 : // block continued DB work. For a similar reason it is advisable to not perform
869 : // any synchronous calls back into the DB.
870 : type EventListener struct {
871 : // BackgroundError is invoked whenever an error occurs during a background
872 : // operation such as flush or compaction.
873 : BackgroundError func(error)
874 :
875 : // BlobFileCreated is invoked after a blob file has been created.
876 : BlobFileCreated func(BlobFileCreateInfo)
877 :
878 : // BlobFileDeleted is invoked after a blob file has been deleted.
879 : BlobFileDeleted func(BlobFileDeleteInfo)
880 :
881 : // BlobFileRewriteBegin is invoked when a blob file rewrite compaction begins.
882 : BlobFileRewriteBegin func(BlobFileRewriteInfo)
883 :
884 : // BlobFileRewriteEnd is invoked when a blob file rewrite compaction ends.
885 : BlobFileRewriteEnd func(BlobFileRewriteInfo)
886 :
887 : // DataCorruption is invoked when an on-disk corruption is detected. It should
888 : // not block, as it is called synchronously in read paths.
889 : DataCorruption func(DataCorruptionInfo)
890 :
891 : // CompactionBegin is invoked after the inputs to a compaction have been
892 : // determined, but before the compaction has produced any output.
893 : CompactionBegin func(CompactionInfo)
894 :
895 : // CompactionEnd is invoked after a compaction has completed and the result
896 : // has been installed.
897 : CompactionEnd func(CompactionInfo)
898 :
899 : // DiskSlow is invoked after a disk write operation on a file created with a
900 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
901 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
902 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
903 : // MUST return without doing any IO, or blocking on anything (like a mutex)
904 : // that is waiting on IO. This is imperative in order to reliably monitor for
905 : // slowness, since if this goroutine gets stuck, the monitoring will stop
906 : // working.
907 : DiskSlow func(DiskSlowInfo)
908 :
909 : // FlushBegin is invoked after the inputs to a flush have been determined,
910 : // but before the flush has produced any output.
911 : FlushBegin func(FlushInfo)
912 :
913 : // FlushEnd is invoked after a flush has complated and the result has been
914 : // installed.
915 : FlushEnd func(FlushInfo)
916 :
917 : // DownloadBegin is invoked when a db.Download operation starts or restarts
918 : // (restarts are caused by new external tables being ingested during the
919 : // operation).
920 : DownloadBegin func(DownloadInfo)
921 :
922 : // DownloadEnd is invoked when a db.Download operation completes.
923 : DownloadEnd func(DownloadInfo)
924 :
925 : // FormatUpgrade is invoked after the database's FormatMajorVersion
926 : // is upgraded.
927 : FormatUpgrade func(FormatMajorVersion)
928 :
929 : // ManifestCreated is invoked after a manifest has been created.
930 : ManifestCreated func(ManifestCreateInfo)
931 :
932 : // ManifestDeleted is invoked after a manifest has been deleted.
933 : ManifestDeleted func(ManifestDeleteInfo)
934 :
935 : // TableCreated is invoked when a table has been created.
936 : TableCreated func(TableCreateInfo)
937 :
938 : // TableDeleted is invoked after a table has been deleted.
939 : TableDeleted func(TableDeleteInfo)
940 :
941 : // TableIngested is invoked after an externally created table has been
942 : // ingested via a call to DB.Ingest().
943 : TableIngested func(TableIngestInfo)
944 :
945 : // TableStatsLoaded is invoked at most once, when the table stats
946 : // collector has loaded statistics for all tables that existed at Open.
947 : TableStatsLoaded func(TableStatsInfo)
948 :
949 : // TableValidated is invoked after validation runs on an sstable.
950 : TableValidated func(TableValidatedInfo)
951 :
952 : // WALCreated is invoked after a WAL has been created.
953 : WALCreated func(WALCreateInfo)
954 :
955 : // WALDeleted is invoked after a WAL has been deleted.
956 : WALDeleted func(WALDeleteInfo)
957 :
958 : // WriteStallBegin is invoked when writes are intentionally delayed.
959 : WriteStallBegin func(WriteStallBeginInfo)
960 :
961 : // WriteStallEnd is invoked when delayed writes are released.
962 : WriteStallEnd func()
963 :
964 : // LowDiskSpace is invoked periodically when the disk space is running
965 : // low.
966 : LowDiskSpace func(LowDiskSpaceInfo)
967 :
968 : // PossibleAPIMisuse is invoked when a possible API misuse is detected.
969 : PossibleAPIMisuse func(PossibleAPIMisuseInfo)
970 : }
971 :
972 : // EnsureDefaults ensures that background error events are logged to the
973 : // specified logger if a handler for those events hasn't been otherwise
974 : // specified. Ensure all handlers are non-nil so that we don't have to check
975 : // for nil-ness before invoking.
976 2 : func (l *EventListener) EnsureDefaults(logger Logger) {
977 2 : if l.BackgroundError == nil {
978 2 : if logger != nil {
979 2 : l.BackgroundError = func(err error) {
980 1 : logger.Errorf("background error: %s", err)
981 1 : }
982 1 : } else {
983 1 : l.BackgroundError = func(error) {}
984 : }
985 : }
986 2 : if l.BlobFileCreated == nil {
987 2 : l.BlobFileCreated = func(info BlobFileCreateInfo) {}
988 : }
989 2 : if l.BlobFileDeleted == nil {
990 2 : l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
991 : }
992 2 : if l.BlobFileRewriteBegin == nil {
993 2 : l.BlobFileRewriteBegin = func(info BlobFileRewriteInfo) {}
994 : }
995 2 : if l.BlobFileRewriteEnd == nil {
996 2 : l.BlobFileRewriteEnd = func(info BlobFileRewriteInfo) {}
997 : }
998 2 : if l.DataCorruption == nil {
999 2 : if logger != nil {
1000 2 : l.DataCorruption = func(info DataCorruptionInfo) {
1001 1 : logger.Fatalf("%s", info)
1002 1 : }
1003 1 : } else {
1004 1 : l.DataCorruption = func(info DataCorruptionInfo) {}
1005 : }
1006 : }
1007 2 : if l.CompactionBegin == nil {
1008 2 : l.CompactionBegin = func(info CompactionInfo) {}
1009 : }
1010 2 : if l.CompactionEnd == nil {
1011 2 : l.CompactionEnd = func(info CompactionInfo) {}
1012 : }
1013 2 : if l.DiskSlow == nil {
1014 2 : l.DiskSlow = func(info DiskSlowInfo) {}
1015 : }
1016 2 : if l.FlushBegin == nil {
1017 2 : l.FlushBegin = func(info FlushInfo) {}
1018 : }
1019 2 : if l.FlushEnd == nil {
1020 2 : l.FlushEnd = func(info FlushInfo) {}
1021 : }
1022 2 : if l.DownloadBegin == nil {
1023 2 : l.DownloadBegin = func(info DownloadInfo) {}
1024 : }
1025 2 : if l.DownloadEnd == nil {
1026 2 : l.DownloadEnd = func(info DownloadInfo) {}
1027 : }
1028 2 : if l.FormatUpgrade == nil {
1029 2 : l.FormatUpgrade = func(v FormatMajorVersion) {}
1030 : }
1031 2 : if l.ManifestCreated == nil {
1032 2 : l.ManifestCreated = func(info ManifestCreateInfo) {}
1033 : }
1034 2 : if l.ManifestDeleted == nil {
1035 2 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
1036 : }
1037 2 : if l.TableCreated == nil {
1038 2 : l.TableCreated = func(info TableCreateInfo) {}
1039 : }
1040 2 : if l.TableDeleted == nil {
1041 2 : l.TableDeleted = func(info TableDeleteInfo) {}
1042 : }
1043 2 : if l.TableIngested == nil {
1044 2 : l.TableIngested = func(info TableIngestInfo) {}
1045 : }
1046 2 : if l.TableStatsLoaded == nil {
1047 2 : l.TableStatsLoaded = func(info TableStatsInfo) {}
1048 : }
1049 2 : if l.TableValidated == nil {
1050 2 : l.TableValidated = func(validated TableValidatedInfo) {}
1051 : }
1052 2 : if l.WALCreated == nil {
1053 2 : l.WALCreated = func(info WALCreateInfo) {}
1054 : }
1055 2 : if l.WALDeleted == nil {
1056 2 : l.WALDeleted = func(info WALDeleteInfo) {}
1057 : }
1058 2 : if l.WriteStallBegin == nil {
1059 2 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
1060 : }
1061 2 : if l.WriteStallEnd == nil {
1062 2 : l.WriteStallEnd = func() {}
1063 : }
1064 2 : if l.LowDiskSpace == nil {
1065 2 : l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
1066 : }
1067 2 : if l.PossibleAPIMisuse == nil {
1068 2 : l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
1069 : }
1070 : }
1071 :
1072 : // MakeLoggingEventListener creates an EventListener that logs all events to the
1073 : // specified logger.
1074 2 : func MakeLoggingEventListener(logger Logger) EventListener {
1075 2 : if logger == nil {
1076 1 : logger = DefaultLogger
1077 1 : }
1078 :
1079 2 : return EventListener{
1080 2 : BackgroundError: func(err error) {
1081 1 : logger.Errorf("background error: %s", err)
1082 1 : },
1083 2 : BlobFileCreated: func(info BlobFileCreateInfo) {
1084 2 : logger.Infof("%s", info)
1085 2 : },
1086 2 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
1087 2 : logger.Infof("%s", info)
1088 2 : },
1089 2 : BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
1090 2 : logger.Infof("%s", info)
1091 2 : },
1092 2 : BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
1093 2 : logger.Infof("%s", info)
1094 2 : },
1095 0 : DataCorruption: func(info DataCorruptionInfo) {
1096 0 : logger.Errorf("%s", info)
1097 0 : },
1098 2 : CompactionBegin: func(info CompactionInfo) {
1099 2 : logger.Infof("%s", info)
1100 2 : },
1101 1 : CompactionEnd: func(info CompactionInfo) {
1102 1 : logger.Infof("%s", info)
1103 1 : },
1104 0 : DiskSlow: func(info DiskSlowInfo) {
1105 0 : logger.Infof("%s", info)
1106 0 : },
1107 2 : FlushBegin: func(info FlushInfo) {
1108 2 : logger.Infof("%s", info)
1109 2 : },
1110 1 : FlushEnd: func(info FlushInfo) {
1111 1 : logger.Infof("%s", info)
1112 1 : },
1113 2 : DownloadBegin: func(info DownloadInfo) {
1114 2 : logger.Infof("%s", info)
1115 2 : },
1116 1 : DownloadEnd: func(info DownloadInfo) {
1117 1 : logger.Infof("%s", info)
1118 1 : },
1119 2 : FormatUpgrade: func(v FormatMajorVersion) {
1120 2 : logger.Infof("upgraded to format version: %s", v)
1121 2 : },
1122 1 : ManifestCreated: func(info ManifestCreateInfo) {
1123 1 : logger.Infof("%s", info)
1124 1 : },
1125 1 : ManifestDeleted: func(info ManifestDeleteInfo) {
1126 1 : logger.Infof("%s", info)
1127 1 : },
1128 2 : TableCreated: func(info TableCreateInfo) {
1129 2 : logger.Infof("%s", info)
1130 2 : },
1131 1 : TableDeleted: func(info TableDeleteInfo) {
1132 1 : logger.Infof("%s", info)
1133 1 : },
1134 1 : TableIngested: func(info TableIngestInfo) {
1135 1 : logger.Infof("%s", info)
1136 1 : },
1137 2 : TableStatsLoaded: func(info TableStatsInfo) {
1138 2 : logger.Infof("%s", info)
1139 2 : },
1140 2 : TableValidated: func(info TableValidatedInfo) {
1141 2 : logger.Infof("%s", info)
1142 2 : },
1143 1 : WALCreated: func(info WALCreateInfo) {
1144 1 : logger.Infof("%s", info)
1145 1 : },
1146 1 : WALDeleted: func(info WALDeleteInfo) {
1147 1 : logger.Infof("%s", info)
1148 1 : },
1149 2 : WriteStallBegin: func(info WriteStallBeginInfo) {
1150 2 : logger.Infof("%s", info)
1151 2 : },
1152 2 : WriteStallEnd: func() {
1153 2 : logger.Infof("write stall ending")
1154 2 : },
1155 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1156 0 : logger.Infof("%s", info)
1157 0 : },
1158 2 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1159 2 : logger.Infof("%s", info)
1160 2 : },
1161 : }
1162 : }
1163 :
1164 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
1165 1 : func TeeEventListener(a, b EventListener) EventListener {
1166 1 : a.EnsureDefaults(nil)
1167 1 : b.EnsureDefaults(nil)
1168 1 : return EventListener{
1169 1 : BackgroundError: func(err error) {
1170 0 : a.BackgroundError(err)
1171 0 : b.BackgroundError(err)
1172 0 : },
1173 0 : BlobFileCreated: func(info BlobFileCreateInfo) {
1174 0 : a.BlobFileCreated(info)
1175 0 : b.BlobFileCreated(info)
1176 0 : },
1177 0 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
1178 0 : a.BlobFileDeleted(info)
1179 0 : b.BlobFileDeleted(info)
1180 0 : },
1181 0 : BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
1182 0 : a.BlobFileRewriteBegin(info)
1183 0 : b.BlobFileRewriteBegin(info)
1184 0 : },
1185 0 : BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
1186 0 : a.BlobFileRewriteEnd(info)
1187 0 : b.BlobFileRewriteEnd(info)
1188 0 : },
1189 0 : DataCorruption: func(info DataCorruptionInfo) {
1190 0 : a.DataCorruption(info)
1191 0 : b.DataCorruption(info)
1192 0 : },
1193 1 : CompactionBegin: func(info CompactionInfo) {
1194 1 : a.CompactionBegin(info)
1195 1 : b.CompactionBegin(info)
1196 1 : },
1197 1 : CompactionEnd: func(info CompactionInfo) {
1198 1 : a.CompactionEnd(info)
1199 1 : b.CompactionEnd(info)
1200 1 : },
1201 0 : DiskSlow: func(info DiskSlowInfo) {
1202 0 : a.DiskSlow(info)
1203 0 : b.DiskSlow(info)
1204 0 : },
1205 1 : FlushBegin: func(info FlushInfo) {
1206 1 : a.FlushBegin(info)
1207 1 : b.FlushBegin(info)
1208 1 : },
1209 1 : FlushEnd: func(info FlushInfo) {
1210 1 : a.FlushEnd(info)
1211 1 : b.FlushEnd(info)
1212 1 : },
1213 0 : DownloadBegin: func(info DownloadInfo) {
1214 0 : a.DownloadBegin(info)
1215 0 : b.DownloadBegin(info)
1216 0 : },
1217 0 : DownloadEnd: func(info DownloadInfo) {
1218 0 : a.DownloadEnd(info)
1219 0 : b.DownloadEnd(info)
1220 0 : },
1221 0 : FormatUpgrade: func(v FormatMajorVersion) {
1222 0 : a.FormatUpgrade(v)
1223 0 : b.FormatUpgrade(v)
1224 0 : },
1225 1 : ManifestCreated: func(info ManifestCreateInfo) {
1226 1 : a.ManifestCreated(info)
1227 1 : b.ManifestCreated(info)
1228 1 : },
1229 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
1230 0 : a.ManifestDeleted(info)
1231 0 : b.ManifestDeleted(info)
1232 0 : },
1233 1 : TableCreated: func(info TableCreateInfo) {
1234 1 : a.TableCreated(info)
1235 1 : b.TableCreated(info)
1236 1 : },
1237 1 : TableDeleted: func(info TableDeleteInfo) {
1238 1 : a.TableDeleted(info)
1239 1 : b.TableDeleted(info)
1240 1 : },
1241 1 : TableIngested: func(info TableIngestInfo) {
1242 1 : a.TableIngested(info)
1243 1 : b.TableIngested(info)
1244 1 : },
1245 1 : TableStatsLoaded: func(info TableStatsInfo) {
1246 1 : a.TableStatsLoaded(info)
1247 1 : b.TableStatsLoaded(info)
1248 1 : },
1249 0 : TableValidated: func(info TableValidatedInfo) {
1250 0 : a.TableValidated(info)
1251 0 : b.TableValidated(info)
1252 0 : },
1253 1 : WALCreated: func(info WALCreateInfo) {
1254 1 : a.WALCreated(info)
1255 1 : b.WALCreated(info)
1256 1 : },
1257 1 : WALDeleted: func(info WALDeleteInfo) {
1258 1 : a.WALDeleted(info)
1259 1 : b.WALDeleted(info)
1260 1 : },
1261 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
1262 0 : a.WriteStallBegin(info)
1263 0 : b.WriteStallBegin(info)
1264 0 : },
1265 0 : WriteStallEnd: func() {
1266 0 : a.WriteStallEnd()
1267 0 : b.WriteStallEnd()
1268 0 : },
1269 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1270 0 : a.LowDiskSpace(info)
1271 0 : b.LowDiskSpace(info)
1272 0 : },
1273 0 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1274 0 : a.PossibleAPIMisuse(info)
1275 0 : b.PossibleAPIMisuse(info)
1276 0 : },
1277 : }
1278 : }
1279 :
1280 : // lowDiskSpaceReporter contains the logic to report low disk space events.
1281 : // Report is called whenever we get the disk usage statistics.
1282 : //
1283 : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
1284 : // whenever we reach a new threshold. We periodically repost the event every 30
1285 : // minutes until we are above all thresholds.
1286 : type lowDiskSpaceReporter struct {
1287 : mu struct {
1288 : sync.Mutex
1289 : lastNoticeThreshold int
1290 : lastNoticeTime crtime.Mono
1291 : }
1292 : }
1293 :
1294 : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
1295 :
1296 : const lowDiskSpaceFrequency = 30 * time.Minute
1297 :
1298 2 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
1299 2 : threshold, ok := r.findThreshold(availBytes, totalBytes)
1300 2 : if !ok {
1301 2 : // Normal path.
1302 2 : return
1303 2 : }
1304 1 : if r.shouldReport(threshold, crtime.NowMono()) {
1305 1 : el.LowDiskSpace(LowDiskSpaceInfo{
1306 1 : AvailBytes: availBytes,
1307 1 : TotalBytes: totalBytes,
1308 1 : PercentThreshold: threshold,
1309 1 : })
1310 1 : }
1311 : }
1312 :
1313 : // shouldReport returns true if we should report an event. Updates
1314 : // lastNoticeTime/lastNoticeThreshold appropriately.
1315 1 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
1316 1 : r.mu.Lock()
1317 1 : defer r.mu.Unlock()
1318 1 : if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
1319 1 : now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
1320 1 : r.mu.lastNoticeThreshold = threshold
1321 1 : r.mu.lastNoticeTime = now
1322 1 : return true
1323 1 : }
1324 1 : return false
1325 : }
1326 :
1327 : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
1328 : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
1329 : // there is more free space than the highest threshold).
1330 : func (r *lowDiskSpaceReporter) findThreshold(
1331 : availBytes, totalBytes uint64,
1332 2 : ) (threshold int, ok bool) {
1333 2 : // Note: in the normal path, we exit the loop during the first iteration.
1334 2 : for i, t := range lowDiskSpaceThresholds {
1335 2 : if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
1336 2 : break
1337 : }
1338 1 : threshold = t
1339 1 : ok = true
1340 : }
1341 2 : return threshold, ok
1342 : }
1343 :
1344 : // reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
1345 : // to the event listener and also adds a DataCorruptionInfo payload to the error.
1346 1 : func (d *DB) reportCorruption(meta base.ObjectInfo, err error) error {
1347 1 : if invariants.Enabled && !IsCorruptionError(err) {
1348 0 : panic("not a corruption error")
1349 : }
1350 1 : fileType, fileNum := meta.FileInfo()
1351 1 :
1352 1 : objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
1353 1 : if lookupErr != nil {
1354 1 : // If the object is not known to the provider, it must be a local object
1355 1 : // that was missing when we opened the store. Remote objects have their
1356 1 : // metadata in a catalog, so even if the backing object is deleted, the
1357 1 : // DiskFileNum would still be known.
1358 1 : objMeta = objstorage.ObjectMetadata{DiskFileNum: fileNum, FileType: fileType}
1359 1 : }
1360 1 : path := d.objProvider.Path(objMeta)
1361 1 : if objMeta.IsRemote() {
1362 1 : // Remote path (which include the locator and full path) might not always be
1363 1 : // safe.
1364 1 : err = errors.WithHintf(err, "path: %s", path)
1365 1 : } else {
1366 1 : // Local paths are safe: they start with the store directory and the
1367 1 : // filename is generated by Pebble.
1368 1 : err = errors.WithHintf(err, "path: %s", redact.Safe(path))
1369 1 : }
1370 1 : info := DataCorruptionInfo{
1371 1 : Path: path,
1372 1 : IsRemote: objMeta.IsRemote(),
1373 1 : Locator: objMeta.Remote.Locator,
1374 1 : Bounds: meta.UserKeyBounds(),
1375 1 : Details: err,
1376 1 : }
1377 1 : d.opts.EventListener.DataCorruption(info)
1378 1 : // We don't use errors.Join() because that also annotates with this stack
1379 1 : // trace which would not be useful.
1380 1 : return errorsjoin.Join(err, &corruptionDetailError{info: info})
1381 : }
1382 :
1383 : type corruptionDetailError struct {
1384 : info DataCorruptionInfo
1385 : }
1386 :
1387 1 : func (e *corruptionDetailError) Error() string {
1388 1 : return "<corruption detail carrier>"
1389 1 : }
1390 :
1391 : // ExtractDataCorruptionInfo extracts the DataCorruptionInfo details from a
1392 : // corruption error. Returns nil if there is no such detail.
1393 1 : func ExtractDataCorruptionInfo(err error) *DataCorruptionInfo {
1394 1 : var e *corruptionDetailError
1395 1 : if errors.As(err, &e) {
1396 1 : return &e.info
1397 1 : }
1398 0 : return nil
1399 : }
|