Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crtime"
14 : "github.com/cockroachdb/errors"
15 : errorsjoin "github.com/cockroachdb/errors/join"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/humanize"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/objstorage/remote"
22 : "github.com/cockroachdb/pebble/sstable/blob"
23 : "github.com/cockroachdb/pebble/vfs"
24 : "github.com/cockroachdb/redact"
25 : )
26 :
27 : // TableNum is an identifier for a table within a database.
28 : type TableNum = base.TableNum
29 :
30 : // TableInfo exports the manifest.TableInfo type.
31 : type TableInfo = manifest.TableInfo
32 :
33 1 : func tablesTotalSize(tables []TableInfo) uint64 {
34 1 : var size uint64
35 1 : for i := range tables {
36 1 : size += tables[i].Size
37 1 : }
38 1 : return size
39 : }
40 :
41 1 : func formatFileNums(tables []TableInfo) string {
42 1 : var buf strings.Builder
43 1 : for i := range tables {
44 1 : if i > 0 {
45 1 : buf.WriteString(" ")
46 1 : }
47 1 : buf.WriteString(tables[i].FileNum.String())
48 : }
49 1 : return buf.String()
50 : }
51 :
52 : // DataCorruptionInfo contains the information for a DataCorruption event.
53 : type DataCorruptionInfo struct {
54 : // Path of the file that is corrupted. For remote files the path starts with
55 : // "remote://".
56 : Path string
57 : IsRemote bool
58 : // Locator is only set when IsRemote is true (note that an empty Locator is
59 : // valid even then).
60 : Locator remote.Locator
61 : // Bounds indicates the keyspace range that is affected.
62 : Bounds base.UserKeyBounds
63 : // Details of the error. See cockroachdb/error for how to format with or
64 : // without redaction.
65 : Details error
66 : }
67 :
68 1 : func (i DataCorruptionInfo) String() string {
69 1 : return redact.StringWithoutMarkers(i)
70 1 : }
71 :
72 : // SafeFormat implements redact.SafeFormatter.
73 1 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
74 1 : w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
75 1 : if i.IsRemote {
76 0 : w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
77 0 : }
78 1 : w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
79 : }
80 :
81 : // LevelInfo contains info pertaining to a particular level.
82 : type LevelInfo struct {
83 : Level int
84 : Tables []TableInfo
85 : Score float64
86 : }
87 :
88 0 : func (i LevelInfo) String() string {
89 0 : return redact.StringWithoutMarkers(i)
90 0 : }
91 :
92 : // SafeFormat implements redact.SafeFormatter.
93 1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
94 1 : w.Printf("L%d [%s] (%s) Score=%.2f",
95 1 : redact.Safe(i.Level),
96 1 : redact.Safe(formatFileNums(i.Tables)),
97 1 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
98 1 : redact.Safe(i.Score))
99 1 : }
100 :
101 : // BlobFileCreateInfo contains the info for a blob file creation event.
102 : type BlobFileCreateInfo struct {
103 : JobID int
104 : // Reason is the reason for the table creation: "compacting", "flushing", or
105 : // "ingesting".
106 : Reason string
107 : Path string
108 : FileNum base.DiskFileNum
109 : }
110 :
111 1 : func (i BlobFileCreateInfo) String() string {
112 1 : return redact.StringWithoutMarkers(i)
113 1 : }
114 :
115 : // SafeFormat implements redact.SafeFormatter.
116 1 : func (i BlobFileCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
117 1 : w.Printf("[JOB %d] %s: blob file created %s",
118 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
119 1 : }
120 :
121 : // BlobFileDeleteInfo contains the info for a blob file deletion event.
122 : type BlobFileDeleteInfo struct {
123 : JobID int
124 : Path string
125 : FileNum base.DiskFileNum
126 : Err error
127 : }
128 :
129 1 : func (i BlobFileDeleteInfo) String() string {
130 1 : return redact.StringWithoutMarkers(i)
131 1 : }
132 :
133 : // SafeFormat implements redact.SafeFormatter.
134 1 : func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
135 1 : if i.Err != nil {
136 0 : w.Printf("[JOB %d] blob file delete error %s: %s",
137 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
138 0 : return
139 0 : }
140 1 : w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
141 : }
142 :
143 : // CompactionInfo contains the info for a compaction event.
144 : type CompactionInfo struct {
145 : // JobID is the ID of the compaction job.
146 : JobID int
147 : // Reason is the reason for the compaction.
148 : Reason string
149 : // Input contains the input tables for the compaction organized by level.
150 : Input []LevelInfo
151 : // Output contains the output tables generated by the compaction. The output
152 : // tables are empty for the compaction begin event.
153 : Output LevelInfo
154 : // Duration is the time spent compacting, including reading and writing
155 : // sstables.
156 : Duration time.Duration
157 : // TotalDuration is the total wall-time duration of the compaction,
158 : // including applying the compaction to the database. TotalDuration is
159 : // always ≥ Duration.
160 : TotalDuration time.Duration
161 : Done bool
162 : // Err is set only if Done is true. If non-nil, indicates that the compaction
163 : // failed. Note that err can be ErrCancelledCompaction, which can happen
164 : // during normal operation.
165 : Err error
166 :
167 : SingleLevelOverlappingRatio float64
168 : MultiLevelOverlappingRatio float64
169 :
170 : // Annotations specifies additional info to appear in a compaction's event log line
171 : Annotations compactionAnnotations
172 : }
173 :
174 : type compactionAnnotations []string
175 :
176 : // SafeFormat implements redact.SafeFormatter.
177 1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
178 1 : if len(ca) == 0 {
179 0 : return
180 0 : }
181 1 : for i := range ca {
182 1 : if i != 0 {
183 0 : w.Print(" ")
184 0 : }
185 1 : w.Printf("%s", redact.SafeString(ca[i]))
186 : }
187 : }
188 :
189 1 : func (i CompactionInfo) String() string {
190 1 : return redact.StringWithoutMarkers(i)
191 1 : }
192 :
193 : // SafeFormat implements redact.SafeFormatter.
194 1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
195 1 : if i.Err != nil {
196 1 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
197 1 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
198 1 : return
199 1 : }
200 :
201 1 : if !i.Done {
202 1 : w.Printf("[JOB %d] compacting(%s) ",
203 1 : redact.Safe(i.JobID),
204 1 : redact.SafeString(i.Reason))
205 1 : if len(i.Annotations) > 0 {
206 1 : w.Printf("%s ", i.Annotations)
207 1 : }
208 1 : w.Printf("%s; ", levelInfos(i.Input))
209 1 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
210 1 : return
211 : }
212 1 : outputSize := tablesTotalSize(i.Output.Tables)
213 1 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
214 1 : if len(i.Annotations) > 0 {
215 1 : w.Printf("%s ", i.Annotations)
216 1 : }
217 1 : w.Print(levelInfos(i.Input))
218 1 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
219 1 : redact.Safe(i.Output.Level),
220 1 : redact.Safe(formatFileNums(i.Output.Tables)),
221 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
222 1 : redact.Safe(i.Duration.Seconds()),
223 1 : redact.Safe(i.TotalDuration.Seconds()),
224 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
225 : }
226 :
227 : type levelInfos []LevelInfo
228 :
229 1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
230 1 : for j, levelInfo := range i {
231 1 : if j > 0 {
232 1 : w.Printf(" + ")
233 1 : }
234 1 : w.Print(levelInfo)
235 : }
236 : }
237 :
238 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
239 : // file.
240 : type DiskSlowInfo = vfs.DiskSlowInfo
241 :
242 : // FlushInfo contains the info for a flush event.
243 : type FlushInfo struct {
244 : // JobID is the ID of the flush job.
245 : JobID int
246 : // Reason is the reason for the flush.
247 : Reason string
248 : // Input contains the count of input memtables that were flushed.
249 : Input int
250 : // InputBytes contains the total in-memory size of the memtable(s) that were
251 : // flushed. This size includes skiplist indexing data structures.
252 : InputBytes uint64
253 : // Output contains the ouptut table generated by the flush. The output info
254 : // is empty for the flush begin event.
255 : Output []TableInfo
256 : // Duration is the time spent flushing. This duration includes writing and
257 : // syncing all of the flushed keys to sstables.
258 : Duration time.Duration
259 : // TotalDuration is the total wall-time duration of the flush, including
260 : // applying the flush to the database. TotalDuration is always ≥ Duration.
261 : TotalDuration time.Duration
262 : // Ingest is set to true if the flush is handling tables that were added to
263 : // the flushable queue via an ingestion operation.
264 : Ingest bool
265 : // IngestLevels are the output levels for each ingested table in the flush.
266 : // This field is only populated when Ingest is true.
267 : IngestLevels []int
268 : Done bool
269 : Err error
270 : }
271 :
272 1 : func (i FlushInfo) String() string {
273 1 : return redact.StringWithoutMarkers(i)
274 1 : }
275 :
276 : // SafeFormat implements redact.SafeFormatter.
277 1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
278 1 : if i.Err != nil {
279 1 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
280 1 : return
281 1 : }
282 :
283 1 : plural := redact.SafeString("s")
284 1 : if i.Input == 1 {
285 1 : plural = ""
286 1 : }
287 1 : if !i.Done {
288 1 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
289 1 : if !i.Ingest {
290 1 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
291 1 : w.SafeString(plural)
292 1 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
293 1 : } else {
294 1 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
295 1 : }
296 1 : return
297 : }
298 :
299 1 : outputSize := tablesTotalSize(i.Output)
300 1 : if !i.Ingest {
301 1 : if invariants.Enabled && len(i.IngestLevels) > 0 {
302 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
303 : }
304 1 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
305 1 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
306 1 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
307 1 : redact.Safe(formatFileNums(i.Output)),
308 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
309 1 : redact.Safe(i.Duration.Seconds()),
310 1 : redact.Safe(i.TotalDuration.Seconds()),
311 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
312 1 : } else {
313 1 : if invariants.Enabled && len(i.IngestLevels) == 0 {
314 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
315 : }
316 1 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
317 1 : redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
318 1 : for j, level := range i.IngestLevels {
319 1 : file := i.Output[j]
320 1 : if j > 0 {
321 1 : w.Printf(" +")
322 1 : }
323 1 : w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
324 : }
325 1 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
326 1 : redact.Safe(i.Duration.Seconds()),
327 1 : redact.Safe(i.TotalDuration.Seconds()),
328 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
329 : }
330 : }
331 :
332 : // DownloadInfo contains the info for a DB.Download() event.
333 : type DownloadInfo struct {
334 : // JobID is the ID of the download job.
335 : JobID int
336 :
337 : Spans []DownloadSpan
338 :
339 : // Duration is the time since the operation was started.
340 : Duration time.Duration
341 : DownloadCompactionsLaunched int
342 :
343 : // RestartCount indicates that the download operation restarted because it
344 : // noticed that new external files were ingested. A DownloadBegin event with
345 : // RestartCount = 0 is the start of the operation; each time we restart it we
346 : // have another DownloadBegin event with RestartCount > 0.
347 : RestartCount int
348 : Done bool
349 : Err error
350 : }
351 :
352 1 : func (i DownloadInfo) String() string {
353 1 : return redact.StringWithoutMarkers(i)
354 1 : }
355 :
356 : // SafeFormat implements redact.SafeFormatter.
357 1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
358 1 : switch {
359 0 : case i.Err != nil:
360 0 : w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
361 :
362 1 : case i.Done:
363 1 : w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
364 1 : redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
365 :
366 1 : default:
367 1 : if i.RestartCount == 0 {
368 1 : w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
369 1 : } else {
370 0 : w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
371 0 : redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
372 0 : redact.Safe(i.DownloadCompactionsLaunched))
373 0 : }
374 : }
375 : }
376 :
377 : // ManifestCreateInfo contains info about a manifest creation event.
378 : type ManifestCreateInfo struct {
379 : // JobID is the ID of the job the caused the manifest to be created.
380 : JobID int
381 : Path string
382 : // The file number of the new Manifest.
383 : FileNum base.DiskFileNum
384 : Err error
385 : }
386 :
387 1 : func (i ManifestCreateInfo) String() string {
388 1 : return redact.StringWithoutMarkers(i)
389 1 : }
390 :
391 : // SafeFormat implements redact.SafeFormatter.
392 1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
393 1 : if i.Err != nil {
394 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
395 0 : return
396 0 : }
397 1 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
398 : }
399 :
400 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
401 : type ManifestDeleteInfo struct {
402 : // JobID is the ID of the job the caused the Manifest to be deleted.
403 : JobID int
404 : Path string
405 : FileNum base.DiskFileNum
406 : Err error
407 : }
408 :
409 1 : func (i ManifestDeleteInfo) String() string {
410 1 : return redact.StringWithoutMarkers(i)
411 1 : }
412 :
413 : // SafeFormat implements redact.SafeFormatter.
414 1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
415 1 : if i.Err != nil {
416 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
417 0 : return
418 0 : }
419 1 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
420 : }
421 :
422 : // TableCreateInfo contains the info for a table creation event.
423 : type TableCreateInfo struct {
424 : JobID int
425 : // Reason is the reason for the table creation: "compacting", "flushing", or
426 : // "ingesting".
427 : Reason string
428 : Path string
429 : FileNum base.DiskFileNum
430 : }
431 :
432 1 : func (i TableCreateInfo) String() string {
433 1 : return redact.StringWithoutMarkers(i)
434 1 : }
435 :
436 : // SafeFormat implements redact.SafeFormatter.
437 1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
438 1 : w.Printf("[JOB %d] %s: sstable created %s",
439 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
440 1 : }
441 :
442 : // TableDeleteInfo contains the info for a table deletion event.
443 : type TableDeleteInfo struct {
444 : JobID int
445 : Path string
446 : FileNum base.DiskFileNum
447 : Err error
448 : }
449 :
450 1 : func (i TableDeleteInfo) String() string {
451 1 : return redact.StringWithoutMarkers(i)
452 1 : }
453 :
454 : // SafeFormat implements redact.SafeFormatter.
455 1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
456 1 : if i.Err != nil {
457 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
458 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
459 0 : return
460 0 : }
461 1 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
462 : }
463 :
464 : // TableIngestInfo contains the info for a table ingestion event.
465 : type TableIngestInfo struct {
466 : // JobID is the ID of the job the caused the table to be ingested.
467 : JobID int
468 : Tables []struct {
469 : TableInfo
470 : Level int
471 : }
472 : // GlobalSeqNum is the sequence number that was assigned to all entries in
473 : // the ingested table.
474 : GlobalSeqNum base.SeqNum
475 : // flushable indicates whether the ingested sstable was treated as a
476 : // flushable.
477 : flushable bool
478 : Err error
479 : }
480 :
481 1 : func (i TableIngestInfo) String() string {
482 1 : return redact.StringWithoutMarkers(i)
483 1 : }
484 :
485 : // SafeFormat implements redact.SafeFormatter.
486 1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
487 1 : if i.Err != nil {
488 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
489 0 : return
490 0 : }
491 :
492 1 : if i.flushable {
493 1 : w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
494 1 : } else {
495 1 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
496 1 : }
497 :
498 1 : for j := range i.Tables {
499 1 : t := &i.Tables[j]
500 1 : if j > 0 {
501 1 : w.Printf(",")
502 1 : }
503 1 : levelStr := ""
504 1 : if !i.flushable {
505 1 : levelStr = fmt.Sprintf("L%d:", t.Level)
506 1 : }
507 1 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
508 1 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
509 : }
510 : }
511 :
512 : // TableStatsInfo contains the info for a table stats loaded event.
513 : type TableStatsInfo struct {
514 : // JobID is the ID of the job that finished loading the initial tables'
515 : // stats.
516 : JobID int
517 : }
518 :
519 1 : func (i TableStatsInfo) String() string {
520 1 : return redact.StringWithoutMarkers(i)
521 1 : }
522 :
523 : // SafeFormat implements redact.SafeFormatter.
524 1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
525 1 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
526 1 : }
527 :
528 : // TableValidatedInfo contains information on the result of a validation run
529 : // on an sstable.
530 : type TableValidatedInfo struct {
531 : JobID int
532 : Meta *manifest.TableMetadata
533 : }
534 :
535 1 : func (i TableValidatedInfo) String() string {
536 1 : return redact.StringWithoutMarkers(i)
537 1 : }
538 :
539 : // SafeFormat implements redact.SafeFormatter.
540 1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
541 1 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
542 1 : }
543 :
544 : // WALCreateInfo contains info about a WAL creation event.
545 : type WALCreateInfo struct {
546 : // JobID is the ID of the job the caused the WAL to be created.
547 : JobID int
548 : Path string
549 : // The file number of the new WAL.
550 : FileNum base.DiskFileNum
551 : // The file number of a previous WAL which was recycled to create this
552 : // one. Zero if recycling did not take place.
553 : RecycledFileNum base.DiskFileNum
554 : Err error
555 : }
556 :
557 1 : func (i WALCreateInfo) String() string {
558 1 : return redact.StringWithoutMarkers(i)
559 1 : }
560 :
561 : // SafeFormat implements redact.SafeFormatter.
562 1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
563 1 : if i.Err != nil {
564 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
565 0 : return
566 0 : }
567 :
568 1 : if i.RecycledFileNum == 0 {
569 1 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
570 1 : return
571 1 : }
572 :
573 1 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
574 1 : redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
575 : }
576 :
577 : // WALDeleteInfo contains the info for a WAL deletion event.
578 : //
579 : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
580 : // is insufficient to infer whether primary or secondary.
581 : type WALDeleteInfo struct {
582 : // JobID is the ID of the job the caused the WAL to be deleted.
583 : JobID int
584 : Path string
585 : FileNum base.DiskFileNum
586 : Err error
587 : }
588 :
589 1 : func (i WALDeleteInfo) String() string {
590 1 : return redact.StringWithoutMarkers(i)
591 1 : }
592 :
593 : // SafeFormat implements redact.SafeFormatter.
594 1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
595 1 : if i.Err != nil {
596 1 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
597 1 : return
598 1 : }
599 1 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
600 : }
601 :
602 : // WriteStallBeginInfo contains the info for a write stall begin event.
603 : type WriteStallBeginInfo struct {
604 : Reason string
605 : }
606 :
607 1 : func (i WriteStallBeginInfo) String() string {
608 1 : return redact.StringWithoutMarkers(i)
609 1 : }
610 :
611 : // SafeFormat implements redact.SafeFormatter.
612 1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
613 1 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
614 1 : }
615 :
616 : // LowDiskSpaceInfo contains the information for a LowDiskSpace
617 : // event.
618 : type LowDiskSpaceInfo struct {
619 : // AvailBytes is the disk space available to the current process in bytes.
620 : AvailBytes uint64
621 : // TotalBytes is the total disk space in bytes.
622 : TotalBytes uint64
623 : // PercentThreshold is one of a set of fixed percentages in the
624 : // lowDiskSpaceThresholds below. This event was issued because the disk
625 : // space went below this threshold.
626 : PercentThreshold int
627 : }
628 :
629 0 : func (i LowDiskSpaceInfo) String() string {
630 0 : return redact.StringWithoutMarkers(i)
631 0 : }
632 :
633 : // SafeFormat implements redact.SafeFormatter.
634 0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
635 0 : w.Printf(
636 0 : "available disk space under %d%% (%s of %s)",
637 0 : redact.Safe(i.PercentThreshold),
638 0 : redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
639 0 : redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
640 0 : )
641 0 : }
642 :
643 : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
644 : type PossibleAPIMisuseInfo struct {
645 : Kind APIMisuseKind
646 :
647 : // UserKey is set for the following kinds:
648 : // - IneffectualSingleDelete,
649 : // - NondeterministicSingleDelete,
650 : // - MissizedDelete.
651 : UserKey []byte
652 :
653 : // ExtraInfo is set for the following kinds:
654 : // - MissizedDelete: contains "elidedSize=<size>,expectedSize=<size>"
655 : ExtraInfo string
656 : }
657 :
658 1 : func (i PossibleAPIMisuseInfo) String() string {
659 1 : return redact.StringWithoutMarkers(i)
660 1 : }
661 :
662 : // SafeFormat implements redact.SafeFormatter.
663 1 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
664 1 : switch i.Kind {
665 1 : case IneffectualSingleDelete, NondeterministicSingleDelete:
666 1 : w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
667 1 : case MissizedDelete:
668 1 : w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, redact.Safe(i.ExtraInfo))
669 0 : default:
670 0 : if invariants.Enabled {
671 0 : panic("invalid API misuse event")
672 : }
673 0 : w.Printf("invalid API misuse event")
674 : }
675 : }
676 :
677 : // APIMisuseKind identifies the type of API misuse represented by a
678 : // PossibleAPIMisuse event.
679 : type APIMisuseKind int8
680 :
681 : const (
682 : // IneffectualSingleDelete is emitted in compactions/flushes if any
683 : // single delete is being elided without deleting a point set/merge.
684 : //
685 : // This event can sometimes be a false positive because of delete-only
686 : // compactions which can cause a recent RANGEDEL to peek below an older
687 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
688 : //
689 : // Example:
690 : // RANGEDEL [a, c)#10 in L0
691 : // SINGLEDEL b#5 in L1
692 : // SET b#3 in L6
693 : //
694 : // If the L6 file containing the SET is narrow and the L1 file containing
695 : // the SINGLEDEL is wide, a delete-only compaction can remove the file in
696 : // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
697 : // compacted down, it will not find any SET to delete, resulting in the
698 : // ineffectual callback.
699 : IneffectualSingleDelete APIMisuseKind = iota
700 :
701 : // NondeterministicSingleDelete is emitted in compactions/flushes if any
702 : // single delete has consumed a Set/Merge, and there is another immediately
703 : // older Set/SetWithDelete/Merge. The user of Pebble has violated the
704 : // invariant under which SingleDelete can be used correctly.
705 : //
706 : // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
707 : // ways some of these keys can first meet in a compaction.
708 : //
709 : // - All 3 keys in the same compaction: this callback will detect the
710 : // violation.
711 : //
712 : // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
713 : // disappear. The violation will not be detected, and the DB will have
714 : // Set#1 which is likely incorrect (from the user's perspective).
715 : //
716 : // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
717 : // which will later be consumed by SingleDelete#3. The violation will
718 : // not be detected and the DB will be correct.
719 : //
720 : // This event can sometimes be a false positive because of delete-only
721 : // compactions which can cause a recent RANGEDEL to peek below an older
722 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
723 : //
724 : // Example:
725 : // RANGEDEL [a, z)#60 in L0
726 : // SINGLEDEL g#50 in L1
727 : // SET g#40 in L2
728 : // RANGEDEL [g,h)#30 in L3
729 : // SET g#20 in L6
730 : //
731 : // In this example, the two SETs represent the same user write, and the
732 : // RANGEDELs are caused by the CockroachDB range being dropped. That is,
733 : // the user wrote to g once, range was dropped, then added back, which
734 : // caused the SET again, then at some point g was validly deleted using a
735 : // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
736 : // get fragmented due to compactions it has been part of. Say this L3 file
737 : // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
738 : // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
739 : // using a delete-only compaction, resulting in an LSM with state:
740 : //
741 : // RANGEDEL [a, z)#60 in L0
742 : // SINGLEDEL g#50 in L1
743 : // SET g#40 in L2
744 : // SET g#20 in L6
745 : //
746 : // A multi-level compaction involving L1, L2, L6 will cause the invariant
747 : // violation callback. This example doesn't need multi-level compactions:
748 : // say there was a Pebble snapshot at g#21 preventing g#20 from being
749 : // dropped when it meets g#40 in a compaction. That snapshot will not save
750 : // RANGEDEL [g,h)#30, so we can have:
751 : //
752 : // SINGLEDEL g#50 in L1
753 : // SET g#40, SET g#20 in L6
754 : //
755 : // And say the snapshot is removed and then the L1 and L6 compaction
756 : // happens, resulting in the invariant violation callback.
757 : NondeterministicSingleDelete
758 :
759 : // MissizedDelete is emitted when a DELSIZED tombstone is found that did
760 : // not accurately record the size of the value it deleted. This can lead to
761 : // incorrect behavior in compactions.
762 : MissizedDelete
763 : )
764 :
765 1 : func (k APIMisuseKind) String() string {
766 1 : switch k {
767 1 : case IneffectualSingleDelete:
768 1 : return "ineffectual SINGLEDEL"
769 0 : case NondeterministicSingleDelete:
770 0 : return "nondeterministic SINGLEDEL"
771 1 : case MissizedDelete:
772 1 : return "missized DELSIZED"
773 0 : default:
774 0 : return "unknown"
775 : }
776 : }
777 :
778 : // EventListener contains a set of functions that will be invoked when various
779 : // significant DB events occur. Note that the functions should not run for an
780 : // excessive amount of time as they are invoked synchronously by the DB and may
781 : // block continued DB work. For a similar reason it is advisable to not perform
782 : // any synchronous calls back into the DB.
783 : type EventListener struct {
784 : // BackgroundError is invoked whenever an error occurs during a background
785 : // operation such as flush or compaction.
786 : BackgroundError func(error)
787 :
788 : // BlobFileCreated is invoked after a blob file has been created.
789 : BlobFileCreated func(BlobFileCreateInfo)
790 :
791 : // BlobFileDeleted is invoked after a blob file has been deleted.
792 : BlobFileDeleted func(BlobFileDeleteInfo)
793 :
794 : // DataCorruption is invoked when an on-disk corruption is detected. It should
795 : // not block, as it is called synchronously in read paths.
796 : DataCorruption func(DataCorruptionInfo)
797 :
798 : // CompactionBegin is invoked after the inputs to a compaction have been
799 : // determined, but before the compaction has produced any output.
800 : CompactionBegin func(CompactionInfo)
801 :
802 : // CompactionEnd is invoked after a compaction has completed and the result
803 : // has been installed.
804 : CompactionEnd func(CompactionInfo)
805 :
806 : // DiskSlow is invoked after a disk write operation on a file created with a
807 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
808 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
809 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
810 : // MUST return without doing any IO, or blocking on anything (like a mutex)
811 : // that is waiting on IO. This is imperative in order to reliably monitor for
812 : // slowness, since if this goroutine gets stuck, the monitoring will stop
813 : // working.
814 : DiskSlow func(DiskSlowInfo)
815 :
816 : // FlushBegin is invoked after the inputs to a flush have been determined,
817 : // but before the flush has produced any output.
818 : FlushBegin func(FlushInfo)
819 :
820 : // FlushEnd is invoked after a flush has complated and the result has been
821 : // installed.
822 : FlushEnd func(FlushInfo)
823 :
824 : // DownloadBegin is invoked when a db.Download operation starts or restarts
825 : // (restarts are caused by new external tables being ingested during the
826 : // operation).
827 : DownloadBegin func(DownloadInfo)
828 :
829 : // DownloadEnd is invoked when a db.Download operation completes.
830 : DownloadEnd func(DownloadInfo)
831 :
832 : // FormatUpgrade is invoked after the database's FormatMajorVersion
833 : // is upgraded.
834 : FormatUpgrade func(FormatMajorVersion)
835 :
836 : // ManifestCreated is invoked after a manifest has been created.
837 : ManifestCreated func(ManifestCreateInfo)
838 :
839 : // ManifestDeleted is invoked after a manifest has been deleted.
840 : ManifestDeleted func(ManifestDeleteInfo)
841 :
842 : // TableCreated is invoked when a table has been created.
843 : TableCreated func(TableCreateInfo)
844 :
845 : // TableDeleted is invoked after a table has been deleted.
846 : TableDeleted func(TableDeleteInfo)
847 :
848 : // TableIngested is invoked after an externally created table has been
849 : // ingested via a call to DB.Ingest().
850 : TableIngested func(TableIngestInfo)
851 :
852 : // TableStatsLoaded is invoked at most once, when the table stats
853 : // collector has loaded statistics for all tables that existed at Open.
854 : TableStatsLoaded func(TableStatsInfo)
855 :
856 : // TableValidated is invoked after validation runs on an sstable.
857 : TableValidated func(TableValidatedInfo)
858 :
859 : // WALCreated is invoked after a WAL has been created.
860 : WALCreated func(WALCreateInfo)
861 :
862 : // WALDeleted is invoked after a WAL has been deleted.
863 : WALDeleted func(WALDeleteInfo)
864 :
865 : // WriteStallBegin is invoked when writes are intentionally delayed.
866 : WriteStallBegin func(WriteStallBeginInfo)
867 :
868 : // WriteStallEnd is invoked when delayed writes are released.
869 : WriteStallEnd func()
870 :
871 : // LowDiskSpace is invoked periodically when the disk space is running
872 : // low.
873 : LowDiskSpace func(LowDiskSpaceInfo)
874 :
875 : // PossibleAPIMisuse is invoked when a possible API misuse is detected.
876 : PossibleAPIMisuse func(PossibleAPIMisuseInfo)
877 : }
878 :
879 : // EnsureDefaults ensures that background error events are logged to the
880 : // specified logger if a handler for those events hasn't been otherwise
881 : // specified. Ensure all handlers are non-nil so that we don't have to check
882 : // for nil-ness before invoking.
883 1 : func (l *EventListener) EnsureDefaults(logger Logger) {
884 1 : if l.BackgroundError == nil {
885 1 : if logger != nil {
886 1 : l.BackgroundError = func(err error) {
887 1 : logger.Errorf("background error: %s", err)
888 1 : }
889 1 : } else {
890 1 : l.BackgroundError = func(error) {}
891 : }
892 : }
893 1 : if l.BlobFileCreated == nil {
894 1 : l.BlobFileCreated = func(info BlobFileCreateInfo) {}
895 : }
896 1 : if l.BlobFileDeleted == nil {
897 1 : l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
898 : }
899 1 : if l.DataCorruption == nil {
900 1 : if logger != nil {
901 1 : l.DataCorruption = func(info DataCorruptionInfo) {
902 1 : logger.Fatalf("%s", info)
903 1 : }
904 1 : } else {
905 1 : l.DataCorruption = func(info DataCorruptionInfo) {}
906 : }
907 : }
908 1 : if l.CompactionBegin == nil {
909 1 : l.CompactionBegin = func(info CompactionInfo) {}
910 : }
911 1 : if l.CompactionEnd == nil {
912 1 : l.CompactionEnd = func(info CompactionInfo) {}
913 : }
914 1 : if l.DiskSlow == nil {
915 1 : l.DiskSlow = func(info DiskSlowInfo) {}
916 : }
917 1 : if l.FlushBegin == nil {
918 1 : l.FlushBegin = func(info FlushInfo) {}
919 : }
920 1 : if l.FlushEnd == nil {
921 1 : l.FlushEnd = func(info FlushInfo) {}
922 : }
923 1 : if l.DownloadBegin == nil {
924 1 : l.DownloadBegin = func(info DownloadInfo) {}
925 : }
926 1 : if l.DownloadEnd == nil {
927 1 : l.DownloadEnd = func(info DownloadInfo) {}
928 : }
929 1 : if l.FormatUpgrade == nil {
930 1 : l.FormatUpgrade = func(v FormatMajorVersion) {}
931 : }
932 1 : if l.ManifestCreated == nil {
933 1 : l.ManifestCreated = func(info ManifestCreateInfo) {}
934 : }
935 1 : if l.ManifestDeleted == nil {
936 1 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
937 : }
938 1 : if l.TableCreated == nil {
939 1 : l.TableCreated = func(info TableCreateInfo) {}
940 : }
941 1 : if l.TableDeleted == nil {
942 1 : l.TableDeleted = func(info TableDeleteInfo) {}
943 : }
944 1 : if l.TableIngested == nil {
945 1 : l.TableIngested = func(info TableIngestInfo) {}
946 : }
947 1 : if l.TableStatsLoaded == nil {
948 1 : l.TableStatsLoaded = func(info TableStatsInfo) {}
949 : }
950 1 : if l.TableValidated == nil {
951 1 : l.TableValidated = func(validated TableValidatedInfo) {}
952 : }
953 1 : if l.WALCreated == nil {
954 1 : l.WALCreated = func(info WALCreateInfo) {}
955 : }
956 1 : if l.WALDeleted == nil {
957 1 : l.WALDeleted = func(info WALDeleteInfo) {}
958 : }
959 1 : if l.WriteStallBegin == nil {
960 1 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
961 : }
962 1 : if l.WriteStallEnd == nil {
963 1 : l.WriteStallEnd = func() {}
964 : }
965 1 : if l.LowDiskSpace == nil {
966 1 : l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
967 : }
968 1 : if l.PossibleAPIMisuse == nil {
969 1 : l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
970 : }
971 : }
972 :
973 : // MakeLoggingEventListener creates an EventListener that logs all events to the
974 : // specified logger.
975 1 : func MakeLoggingEventListener(logger Logger) EventListener {
976 1 : if logger == nil {
977 1 : logger = DefaultLogger
978 1 : }
979 :
980 1 : return EventListener{
981 1 : BackgroundError: func(err error) {
982 1 : logger.Errorf("background error: %s", err)
983 1 : },
984 1 : BlobFileCreated: func(info BlobFileCreateInfo) {
985 1 : logger.Infof("%s", info)
986 1 : },
987 1 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
988 1 : logger.Infof("%s", info)
989 1 : },
990 0 : DataCorruption: func(info DataCorruptionInfo) {
991 0 : logger.Errorf("%s", info)
992 0 : },
993 1 : CompactionBegin: func(info CompactionInfo) {
994 1 : logger.Infof("%s", info)
995 1 : },
996 1 : CompactionEnd: func(info CompactionInfo) {
997 1 : logger.Infof("%s", info)
998 1 : },
999 0 : DiskSlow: func(info DiskSlowInfo) {
1000 0 : logger.Infof("%s", info)
1001 0 : },
1002 1 : FlushBegin: func(info FlushInfo) {
1003 1 : logger.Infof("%s", info)
1004 1 : },
1005 1 : FlushEnd: func(info FlushInfo) {
1006 1 : logger.Infof("%s", info)
1007 1 : },
1008 1 : DownloadBegin: func(info DownloadInfo) {
1009 1 : logger.Infof("%s", info)
1010 1 : },
1011 1 : DownloadEnd: func(info DownloadInfo) {
1012 1 : logger.Infof("%s", info)
1013 1 : },
1014 1 : FormatUpgrade: func(v FormatMajorVersion) {
1015 1 : logger.Infof("upgraded to format version: %s", v)
1016 1 : },
1017 1 : ManifestCreated: func(info ManifestCreateInfo) {
1018 1 : logger.Infof("%s", info)
1019 1 : },
1020 1 : ManifestDeleted: func(info ManifestDeleteInfo) {
1021 1 : logger.Infof("%s", info)
1022 1 : },
1023 1 : TableCreated: func(info TableCreateInfo) {
1024 1 : logger.Infof("%s", info)
1025 1 : },
1026 1 : TableDeleted: func(info TableDeleteInfo) {
1027 1 : logger.Infof("%s", info)
1028 1 : },
1029 1 : TableIngested: func(info TableIngestInfo) {
1030 1 : logger.Infof("%s", info)
1031 1 : },
1032 1 : TableStatsLoaded: func(info TableStatsInfo) {
1033 1 : logger.Infof("%s", info)
1034 1 : },
1035 1 : TableValidated: func(info TableValidatedInfo) {
1036 1 : logger.Infof("%s", info)
1037 1 : },
1038 1 : WALCreated: func(info WALCreateInfo) {
1039 1 : logger.Infof("%s", info)
1040 1 : },
1041 1 : WALDeleted: func(info WALDeleteInfo) {
1042 1 : logger.Infof("%s", info)
1043 1 : },
1044 1 : WriteStallBegin: func(info WriteStallBeginInfo) {
1045 1 : logger.Infof("%s", info)
1046 1 : },
1047 1 : WriteStallEnd: func() {
1048 1 : logger.Infof("write stall ending")
1049 1 : },
1050 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1051 0 : logger.Infof("%s", info)
1052 0 : },
1053 1 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1054 1 : logger.Infof("%s", info)
1055 1 : },
1056 : }
1057 : }
1058 :
1059 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
1060 1 : func TeeEventListener(a, b EventListener) EventListener {
1061 1 : a.EnsureDefaults(nil)
1062 1 : b.EnsureDefaults(nil)
1063 1 : return EventListener{
1064 1 : BackgroundError: func(err error) {
1065 0 : a.BackgroundError(err)
1066 0 : b.BackgroundError(err)
1067 0 : },
1068 0 : BlobFileCreated: func(info BlobFileCreateInfo) {
1069 0 : a.BlobFileCreated(info)
1070 0 : b.BlobFileCreated(info)
1071 0 : },
1072 1 : BlobFileDeleted: func(info BlobFileDeleteInfo) {
1073 1 : a.BlobFileDeleted(info)
1074 1 : b.BlobFileDeleted(info)
1075 1 : },
1076 0 : DataCorruption: func(info DataCorruptionInfo) {
1077 0 : a.DataCorruption(info)
1078 0 : b.DataCorruption(info)
1079 0 : },
1080 1 : CompactionBegin: func(info CompactionInfo) {
1081 1 : a.CompactionBegin(info)
1082 1 : b.CompactionBegin(info)
1083 1 : },
1084 1 : CompactionEnd: func(info CompactionInfo) {
1085 1 : a.CompactionEnd(info)
1086 1 : b.CompactionEnd(info)
1087 1 : },
1088 0 : DiskSlow: func(info DiskSlowInfo) {
1089 0 : a.DiskSlow(info)
1090 0 : b.DiskSlow(info)
1091 0 : },
1092 1 : FlushBegin: func(info FlushInfo) {
1093 1 : a.FlushBegin(info)
1094 1 : b.FlushBegin(info)
1095 1 : },
1096 1 : FlushEnd: func(info FlushInfo) {
1097 1 : a.FlushEnd(info)
1098 1 : b.FlushEnd(info)
1099 1 : },
1100 0 : DownloadBegin: func(info DownloadInfo) {
1101 0 : a.DownloadBegin(info)
1102 0 : b.DownloadBegin(info)
1103 0 : },
1104 0 : DownloadEnd: func(info DownloadInfo) {
1105 0 : a.DownloadEnd(info)
1106 0 : b.DownloadEnd(info)
1107 0 : },
1108 0 : FormatUpgrade: func(v FormatMajorVersion) {
1109 0 : a.FormatUpgrade(v)
1110 0 : b.FormatUpgrade(v)
1111 0 : },
1112 1 : ManifestCreated: func(info ManifestCreateInfo) {
1113 1 : a.ManifestCreated(info)
1114 1 : b.ManifestCreated(info)
1115 1 : },
1116 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
1117 0 : a.ManifestDeleted(info)
1118 0 : b.ManifestDeleted(info)
1119 0 : },
1120 1 : TableCreated: func(info TableCreateInfo) {
1121 1 : a.TableCreated(info)
1122 1 : b.TableCreated(info)
1123 1 : },
1124 1 : TableDeleted: func(info TableDeleteInfo) {
1125 1 : a.TableDeleted(info)
1126 1 : b.TableDeleted(info)
1127 1 : },
1128 1 : TableIngested: func(info TableIngestInfo) {
1129 1 : a.TableIngested(info)
1130 1 : b.TableIngested(info)
1131 1 : },
1132 1 : TableStatsLoaded: func(info TableStatsInfo) {
1133 1 : a.TableStatsLoaded(info)
1134 1 : b.TableStatsLoaded(info)
1135 1 : },
1136 0 : TableValidated: func(info TableValidatedInfo) {
1137 0 : a.TableValidated(info)
1138 0 : b.TableValidated(info)
1139 0 : },
1140 1 : WALCreated: func(info WALCreateInfo) {
1141 1 : a.WALCreated(info)
1142 1 : b.WALCreated(info)
1143 1 : },
1144 1 : WALDeleted: func(info WALDeleteInfo) {
1145 1 : a.WALDeleted(info)
1146 1 : b.WALDeleted(info)
1147 1 : },
1148 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
1149 0 : a.WriteStallBegin(info)
1150 0 : b.WriteStallBegin(info)
1151 0 : },
1152 0 : WriteStallEnd: func() {
1153 0 : a.WriteStallEnd()
1154 0 : b.WriteStallEnd()
1155 0 : },
1156 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1157 0 : a.LowDiskSpace(info)
1158 0 : b.LowDiskSpace(info)
1159 0 : },
1160 0 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1161 0 : a.PossibleAPIMisuse(info)
1162 0 : b.PossibleAPIMisuse(info)
1163 0 : },
1164 : }
1165 : }
1166 :
1167 : // lowDiskSpaceReporter contains the logic to report low disk space events.
1168 : // Report is called whenever we get the disk usage statistics.
1169 : //
1170 : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
1171 : // whenever we reach a new threshold. We periodically repost the event every 30
1172 : // minutes until we are above all thresholds.
1173 : type lowDiskSpaceReporter struct {
1174 : mu struct {
1175 : sync.Mutex
1176 : lastNoticeThreshold int
1177 : lastNoticeTime crtime.Mono
1178 : }
1179 : }
1180 :
1181 : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
1182 :
1183 : const lowDiskSpaceFrequency = 30 * time.Minute
1184 :
1185 1 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
1186 1 : threshold, ok := r.findThreshold(availBytes, totalBytes)
1187 1 : if !ok {
1188 1 : // Normal path.
1189 1 : return
1190 1 : }
1191 1 : if r.shouldReport(threshold, crtime.NowMono()) {
1192 1 : el.LowDiskSpace(LowDiskSpaceInfo{
1193 1 : AvailBytes: availBytes,
1194 1 : TotalBytes: totalBytes,
1195 1 : PercentThreshold: threshold,
1196 1 : })
1197 1 : }
1198 : }
1199 :
1200 : // shouldReport returns true if we should report an event. Updates
1201 : // lastNoticeTime/lastNoticeThreshold appropriately.
1202 1 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
1203 1 : r.mu.Lock()
1204 1 : defer r.mu.Unlock()
1205 1 : if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
1206 1 : now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
1207 1 : r.mu.lastNoticeThreshold = threshold
1208 1 : r.mu.lastNoticeTime = now
1209 1 : return true
1210 1 : }
1211 1 : return false
1212 : }
1213 :
1214 : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
1215 : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
1216 : // there is more free space than the highest threshold).
1217 : func (r *lowDiskSpaceReporter) findThreshold(
1218 : availBytes, totalBytes uint64,
1219 1 : ) (threshold int, ok bool) {
1220 1 : // Note: in the normal path, we exit the loop during the first iteration.
1221 1 : for i, t := range lowDiskSpaceThresholds {
1222 1 : if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
1223 1 : break
1224 : }
1225 1 : threshold = t
1226 1 : ok = true
1227 : }
1228 1 : return threshold, ok
1229 : }
1230 :
1231 : // reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
1232 : // to the event listener and also adds a DataCorruptionInfo payload to the error.
1233 1 : func (d *DB) reportCorruption(meta any, err error) error {
1234 1 : switch meta := meta.(type) {
1235 1 : case *manifest.TableMetadata:
1236 1 : return d.reportFileCorruption(base.FileTypeTable, meta.TableBacking.DiskFileNum, meta.UserKeyBounds(), err)
1237 0 : case *manifest.BlobFileMetadata:
1238 0 : diskFileNum := blob.DiskFileNumTODO(meta.FileID)
1239 0 : // TODO(jackson): Add bounds for blob files.
1240 0 : return d.reportFileCorruption(base.FileTypeBlob, diskFileNum, base.UserKeyBounds{}, err)
1241 0 : default:
1242 0 : panic(fmt.Sprintf("unknown metadata type: %T", meta))
1243 : }
1244 : }
1245 :
1246 : func (d *DB) reportFileCorruption(
1247 : fileType base.FileType, fileNum base.DiskFileNum, userKeyBounds base.UserKeyBounds, err error,
1248 1 : ) error {
1249 1 : if invariants.Enabled && !IsCorruptionError(err) {
1250 0 : panic("not a corruption error")
1251 : }
1252 :
1253 1 : objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
1254 1 : if lookupErr != nil {
1255 1 : // If the object is not known to the provider, it must be a local object
1256 1 : // that was missing when we opened the store. Remote objects have their
1257 1 : // metadata in a catalog, so even if the backing object is deleted, the
1258 1 : // DiskFileNum would still be known.
1259 1 : objMeta = objstorage.ObjectMetadata{DiskFileNum: fileNum, FileType: fileType}
1260 1 : }
1261 1 : path := d.objProvider.Path(objMeta)
1262 1 : if objMeta.IsRemote() {
1263 1 : // Remote path (which include the locator and full path) might not always be
1264 1 : // safe.
1265 1 : err = errors.WithHintf(err, "path: %s", path)
1266 1 : } else {
1267 1 : // Local paths are safe: they start with the store directory and the
1268 1 : // filename is generated by Pebble.
1269 1 : err = errors.WithHintf(err, "path: %s", redact.Safe(path))
1270 1 : }
1271 1 : info := DataCorruptionInfo{
1272 1 : Path: path,
1273 1 : IsRemote: objMeta.IsRemote(),
1274 1 : Locator: objMeta.Remote.Locator,
1275 1 : Bounds: userKeyBounds,
1276 1 : Details: err,
1277 1 : }
1278 1 : d.opts.EventListener.DataCorruption(info)
1279 1 : // We don't use errors.Join() because that also annotates with this stack
1280 1 : // trace which would not be useful.
1281 1 : return errorsjoin.Join(err, &corruptionDetailError{info: info})
1282 : }
1283 :
1284 : type corruptionDetailError struct {
1285 : info DataCorruptionInfo
1286 : }
1287 :
1288 1 : func (e *corruptionDetailError) Error() string {
1289 1 : return "<corruption detail carrier>"
1290 1 : }
1291 :
1292 : // ExtractDataCorruptionInfo extracts the DataCorruptionInfo details from a
1293 : // corruption error. Returns nil if there is no such detail.
1294 1 : func ExtractDataCorruptionInfo(err error) *DataCorruptionInfo {
1295 1 : var e *corruptionDetailError
1296 1 : if errors.As(err, &e) {
1297 1 : return &e.info
1298 1 : }
1299 0 : return nil
1300 : }
|