Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "strings"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crtime"
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/humanize"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/internal/manifest"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : "github.com/cockroachdb/pebble/vfs"
22 : "github.com/cockroachdb/redact"
23 : )
24 :
25 : // FileNum is an identifier for a file within a database.
26 : type FileNum = base.FileNum
27 :
28 : // TableInfo exports the manifest.TableInfo type.
29 : type TableInfo = manifest.TableInfo
30 :
31 1 : func tablesTotalSize(tables []TableInfo) uint64 {
32 1 : var size uint64
33 1 : for i := range tables {
34 1 : size += tables[i].Size
35 1 : }
36 1 : return size
37 : }
38 :
39 1 : func formatFileNums(tables []TableInfo) string {
40 1 : var buf strings.Builder
41 1 : for i := range tables {
42 1 : if i > 0 {
43 1 : buf.WriteString(" ")
44 1 : }
45 1 : buf.WriteString(tables[i].FileNum.String())
46 : }
47 1 : return buf.String()
48 : }
49 :
50 : // DataCorruptionInfo contains the information for a DataCorruption event.
51 : type DataCorruptionInfo struct {
52 : // Path of the file that is corrupted. For remote files the path starts with
53 : // "remote://".
54 : Path string
55 : IsRemote bool
56 : // Locator is only set when IsRemote is true (note that an empty Locator is
57 : // valid even then).
58 : Locator remote.Locator
59 : // Bounds indicates the keyspace range that is affected.
60 : Bounds base.UserKeyBounds
61 : // Details of the error. See cockroachdb/error for how to format with or
62 : // without redaction.
63 : Details error
64 : }
65 :
66 0 : func (i DataCorruptionInfo) String() string {
67 0 : return redact.StringWithoutMarkers(i)
68 0 : }
69 :
70 : // SafeFormat implements redact.SafeFormatter.
71 0 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
72 0 : w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
73 0 : if i.IsRemote {
74 0 : w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
75 0 : }
76 0 : w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
77 : }
78 :
79 : // LevelInfo contains info pertaining to a particular level.
80 : type LevelInfo struct {
81 : Level int
82 : Tables []TableInfo
83 : Score float64
84 : }
85 :
86 0 : func (i LevelInfo) String() string {
87 0 : return redact.StringWithoutMarkers(i)
88 0 : }
89 :
90 : // SafeFormat implements redact.SafeFormatter.
91 1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
92 1 : w.Printf("L%d [%s] (%s) Score=%.2f",
93 1 : redact.Safe(i.Level),
94 1 : redact.Safe(formatFileNums(i.Tables)),
95 1 : redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
96 1 : redact.Safe(i.Score))
97 1 : }
98 :
99 : // CompactionInfo contains the info for a compaction event.
100 : type CompactionInfo struct {
101 : // JobID is the ID of the compaction job.
102 : JobID int
103 : // Reason is the reason for the compaction.
104 : Reason string
105 : // Input contains the input tables for the compaction organized by level.
106 : Input []LevelInfo
107 : // Output contains the output tables generated by the compaction. The output
108 : // tables are empty for the compaction begin event.
109 : Output LevelInfo
110 : // Duration is the time spent compacting, including reading and writing
111 : // sstables.
112 : Duration time.Duration
113 : // TotalDuration is the total wall-time duration of the compaction,
114 : // including applying the compaction to the database. TotalDuration is
115 : // always ≥ Duration.
116 : TotalDuration time.Duration
117 : Done bool
118 : Err error
119 :
120 : SingleLevelOverlappingRatio float64
121 : MultiLevelOverlappingRatio float64
122 :
123 : // Annotations specifies additional info to appear in a compaction's event log line
124 : Annotations compactionAnnotations
125 : }
126 :
127 : type compactionAnnotations []string
128 :
129 : // SafeFormat implements redact.SafeFormatter.
130 1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
131 1 : if len(ca) == 0 {
132 0 : return
133 0 : }
134 1 : for i := range ca {
135 1 : if i != 0 {
136 0 : w.Print(" ")
137 0 : }
138 1 : w.Printf("%s", redact.SafeString(ca[i]))
139 : }
140 : }
141 :
142 1 : func (i CompactionInfo) String() string {
143 1 : return redact.StringWithoutMarkers(i)
144 1 : }
145 :
146 : // SafeFormat implements redact.SafeFormatter.
147 1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
148 1 : if i.Err != nil {
149 1 : w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
150 1 : redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
151 1 : return
152 1 : }
153 :
154 1 : if !i.Done {
155 1 : w.Printf("[JOB %d] compacting(%s) ",
156 1 : redact.Safe(i.JobID),
157 1 : redact.SafeString(i.Reason))
158 1 : if len(i.Annotations) > 0 {
159 1 : w.Printf("%s ", i.Annotations)
160 1 : }
161 1 : w.Printf("%s; ", levelInfos(i.Input))
162 1 : w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
163 1 : return
164 : }
165 1 : outputSize := tablesTotalSize(i.Output.Tables)
166 1 : w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
167 1 : if len(i.Annotations) > 0 {
168 1 : w.Printf("%s ", i.Annotations)
169 1 : }
170 1 : w.Print(levelInfos(i.Input))
171 1 : w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
172 1 : redact.Safe(i.Output.Level),
173 1 : redact.Safe(formatFileNums(i.Output.Tables)),
174 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
175 1 : redact.Safe(i.Duration.Seconds()),
176 1 : redact.Safe(i.TotalDuration.Seconds()),
177 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
178 : }
179 :
180 : type levelInfos []LevelInfo
181 :
182 1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
183 1 : for j, levelInfo := range i {
184 1 : if j > 0 {
185 1 : w.Printf(" + ")
186 1 : }
187 1 : w.Print(levelInfo)
188 : }
189 : }
190 :
191 : // DiskSlowInfo contains the info for a disk slowness event when writing to a
192 : // file.
193 : type DiskSlowInfo = vfs.DiskSlowInfo
194 :
195 : // FlushInfo contains the info for a flush event.
196 : type FlushInfo struct {
197 : // JobID is the ID of the flush job.
198 : JobID int
199 : // Reason is the reason for the flush.
200 : Reason string
201 : // Input contains the count of input memtables that were flushed.
202 : Input int
203 : // InputBytes contains the total in-memory size of the memtable(s) that were
204 : // flushed. This size includes skiplist indexing data structures.
205 : InputBytes uint64
206 : // Output contains the ouptut table generated by the flush. The output info
207 : // is empty for the flush begin event.
208 : Output []TableInfo
209 : // Duration is the time spent flushing. This duration includes writing and
210 : // syncing all of the flushed keys to sstables.
211 : Duration time.Duration
212 : // TotalDuration is the total wall-time duration of the flush, including
213 : // applying the flush to the database. TotalDuration is always ≥ Duration.
214 : TotalDuration time.Duration
215 : // Ingest is set to true if the flush is handling tables that were added to
216 : // the flushable queue via an ingestion operation.
217 : Ingest bool
218 : // IngestLevels are the output levels for each ingested table in the flush.
219 : // This field is only populated when Ingest is true.
220 : IngestLevels []int
221 : Done bool
222 : Err error
223 : }
224 :
225 1 : func (i FlushInfo) String() string {
226 1 : return redact.StringWithoutMarkers(i)
227 1 : }
228 :
229 : // SafeFormat implements redact.SafeFormatter.
230 1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
231 1 : if i.Err != nil {
232 1 : w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
233 1 : return
234 1 : }
235 :
236 1 : plural := redact.SafeString("s")
237 1 : if i.Input == 1 {
238 1 : plural = ""
239 1 : }
240 1 : if !i.Done {
241 1 : w.Printf("[JOB %d] ", redact.Safe(i.JobID))
242 1 : if !i.Ingest {
243 1 : w.Printf("flushing %d memtable", redact.Safe(i.Input))
244 1 : w.SafeString(plural)
245 1 : w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
246 1 : } else {
247 1 : w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
248 1 : }
249 1 : return
250 : }
251 :
252 1 : outputSize := tablesTotalSize(i.Output)
253 1 : if !i.Ingest {
254 1 : if invariants.Enabled && len(i.IngestLevels) > 0 {
255 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
256 : }
257 1 : w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
258 1 : redact.Safe(i.JobID), redact.Safe(i.Input), plural,
259 1 : redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
260 1 : redact.Safe(formatFileNums(i.Output)),
261 1 : redact.Safe(humanize.Bytes.Uint64(outputSize)),
262 1 : redact.Safe(i.Duration.Seconds()),
263 1 : redact.Safe(i.TotalDuration.Seconds()),
264 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
265 1 : } else {
266 1 : if invariants.Enabled && len(i.IngestLevels) == 0 {
267 0 : panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
268 : }
269 1 : w.Printf("[JOB %d] flushed %d ingested flushable%s",
270 1 : redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
271 1 : for j, level := range i.IngestLevels {
272 1 : file := i.Output[j]
273 1 : if j > 0 {
274 1 : w.Printf(" +")
275 1 : }
276 1 : w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
277 : }
278 1 : w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
279 1 : redact.Safe(i.Duration.Seconds()),
280 1 : redact.Safe(i.TotalDuration.Seconds()),
281 1 : redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
282 : }
283 : }
284 :
285 : // DownloadInfo contains the info for a DB.Download() event.
286 : type DownloadInfo struct {
287 : // JobID is the ID of the download job.
288 : JobID int
289 :
290 : Spans []DownloadSpan
291 :
292 : // Duration is the time since the operation was started.
293 : Duration time.Duration
294 : DownloadCompactionsLaunched int
295 :
296 : // RestartCount indicates that the download operation restarted because it
297 : // noticed that new external files were ingested. A DownloadBegin event with
298 : // RestartCount = 0 is the start of the operation; each time we restart it we
299 : // have another DownloadBegin event with RestartCount > 0.
300 : RestartCount int
301 : Done bool
302 : Err error
303 : }
304 :
305 1 : func (i DownloadInfo) String() string {
306 1 : return redact.StringWithoutMarkers(i)
307 1 : }
308 :
309 : // SafeFormat implements redact.SafeFormatter.
310 1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
311 1 : switch {
312 0 : case i.Err != nil:
313 0 : w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
314 :
315 1 : case i.Done:
316 1 : w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
317 1 : redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
318 :
319 1 : default:
320 1 : if i.RestartCount == 0 {
321 1 : w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
322 1 : } else {
323 0 : w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
324 0 : redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
325 0 : redact.Safe(i.DownloadCompactionsLaunched))
326 0 : }
327 : }
328 : }
329 :
330 : // ManifestCreateInfo contains info about a manifest creation event.
331 : type ManifestCreateInfo struct {
332 : // JobID is the ID of the job the caused the manifest to be created.
333 : JobID int
334 : Path string
335 : // The file number of the new Manifest.
336 : FileNum base.DiskFileNum
337 : Err error
338 : }
339 :
340 1 : func (i ManifestCreateInfo) String() string {
341 1 : return redact.StringWithoutMarkers(i)
342 1 : }
343 :
344 : // SafeFormat implements redact.SafeFormatter.
345 1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
346 1 : if i.Err != nil {
347 0 : w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
348 0 : return
349 0 : }
350 1 : w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
351 : }
352 :
353 : // ManifestDeleteInfo contains the info for a Manifest deletion event.
354 : type ManifestDeleteInfo struct {
355 : // JobID is the ID of the job the caused the Manifest to be deleted.
356 : JobID int
357 : Path string
358 : FileNum base.DiskFileNum
359 : Err error
360 : }
361 :
362 1 : func (i ManifestDeleteInfo) String() string {
363 1 : return redact.StringWithoutMarkers(i)
364 1 : }
365 :
366 : // SafeFormat implements redact.SafeFormatter.
367 1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
368 1 : if i.Err != nil {
369 0 : w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
370 0 : return
371 0 : }
372 1 : w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
373 : }
374 :
375 : // TableCreateInfo contains the info for a table creation event.
376 : type TableCreateInfo struct {
377 : JobID int
378 : // Reason is the reason for the table creation: "compacting", "flushing", or
379 : // "ingesting".
380 : Reason string
381 : Path string
382 : FileNum base.DiskFileNum
383 : }
384 :
385 1 : func (i TableCreateInfo) String() string {
386 1 : return redact.StringWithoutMarkers(i)
387 1 : }
388 :
389 : // SafeFormat implements redact.SafeFormatter.
390 1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
391 1 : w.Printf("[JOB %d] %s: sstable created %s",
392 1 : redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
393 1 : }
394 :
395 : // TableDeleteInfo contains the info for a table deletion event.
396 : type TableDeleteInfo struct {
397 : JobID int
398 : Path string
399 : FileNum base.DiskFileNum
400 : Err error
401 : }
402 :
403 1 : func (i TableDeleteInfo) String() string {
404 1 : return redact.StringWithoutMarkers(i)
405 1 : }
406 :
407 : // SafeFormat implements redact.SafeFormatter.
408 1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
409 1 : if i.Err != nil {
410 0 : w.Printf("[JOB %d] sstable delete error %s: %s",
411 0 : redact.Safe(i.JobID), i.FileNum, i.Err)
412 0 : return
413 0 : }
414 1 : w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
415 : }
416 :
417 : // TableIngestInfo contains the info for a table ingestion event.
418 : type TableIngestInfo struct {
419 : // JobID is the ID of the job the caused the table to be ingested.
420 : JobID int
421 : Tables []struct {
422 : TableInfo
423 : Level int
424 : }
425 : // GlobalSeqNum is the sequence number that was assigned to all entries in
426 : // the ingested table.
427 : GlobalSeqNum base.SeqNum
428 : // flushable indicates whether the ingested sstable was treated as a
429 : // flushable.
430 : flushable bool
431 : Err error
432 : }
433 :
434 1 : func (i TableIngestInfo) String() string {
435 1 : return redact.StringWithoutMarkers(i)
436 1 : }
437 :
438 : // SafeFormat implements redact.SafeFormatter.
439 1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
440 1 : if i.Err != nil {
441 0 : w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
442 0 : return
443 0 : }
444 :
445 1 : if i.flushable {
446 1 : w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
447 1 : } else {
448 1 : w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
449 1 : }
450 :
451 1 : for j := range i.Tables {
452 1 : t := &i.Tables[j]
453 1 : if j > 0 {
454 1 : w.Printf(",")
455 1 : }
456 1 : levelStr := ""
457 1 : if !i.flushable {
458 1 : levelStr = fmt.Sprintf("L%d:", t.Level)
459 1 : }
460 1 : w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
461 1 : redact.Safe(humanize.Bytes.Uint64(t.Size)))
462 : }
463 : }
464 :
465 : // TableStatsInfo contains the info for a table stats loaded event.
466 : type TableStatsInfo struct {
467 : // JobID is the ID of the job that finished loading the initial tables'
468 : // stats.
469 : JobID int
470 : }
471 :
472 1 : func (i TableStatsInfo) String() string {
473 1 : return redact.StringWithoutMarkers(i)
474 1 : }
475 :
476 : // SafeFormat implements redact.SafeFormatter.
477 1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
478 1 : w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
479 1 : }
480 :
481 : // TableValidatedInfo contains information on the result of a validation run
482 : // on an sstable.
483 : type TableValidatedInfo struct {
484 : JobID int
485 : Meta *tableMetadata
486 : }
487 :
488 1 : func (i TableValidatedInfo) String() string {
489 1 : return redact.StringWithoutMarkers(i)
490 1 : }
491 :
492 : // SafeFormat implements redact.SafeFormatter.
493 1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
494 1 : w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
495 1 : }
496 :
497 : // WALCreateInfo contains info about a WAL creation event.
498 : type WALCreateInfo struct {
499 : // JobID is the ID of the job the caused the WAL to be created.
500 : JobID int
501 : Path string
502 : // The file number of the new WAL.
503 : FileNum base.DiskFileNum
504 : // The file number of a previous WAL which was recycled to create this
505 : // one. Zero if recycling did not take place.
506 : RecycledFileNum base.DiskFileNum
507 : Err error
508 : }
509 :
510 1 : func (i WALCreateInfo) String() string {
511 1 : return redact.StringWithoutMarkers(i)
512 1 : }
513 :
514 : // SafeFormat implements redact.SafeFormatter.
515 1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
516 1 : if i.Err != nil {
517 0 : w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
518 0 : return
519 0 : }
520 :
521 1 : if i.RecycledFileNum == 0 {
522 1 : w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
523 1 : return
524 1 : }
525 :
526 0 : w.Printf("[JOB %d] WAL created %s (recycled %s)",
527 0 : redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
528 : }
529 :
530 : // WALDeleteInfo contains the info for a WAL deletion event.
531 : //
532 : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
533 : // is insufficient to infer whether primary or secondary.
534 : type WALDeleteInfo struct {
535 : // JobID is the ID of the job the caused the WAL to be deleted.
536 : JobID int
537 : Path string
538 : FileNum base.DiskFileNum
539 : Err error
540 : }
541 :
542 1 : func (i WALDeleteInfo) String() string {
543 1 : return redact.StringWithoutMarkers(i)
544 1 : }
545 :
546 : // SafeFormat implements redact.SafeFormatter.
547 1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
548 1 : if i.Err != nil {
549 0 : w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
550 0 : return
551 0 : }
552 1 : w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
553 : }
554 :
555 : // WriteStallBeginInfo contains the info for a write stall begin event.
556 : type WriteStallBeginInfo struct {
557 : Reason string
558 : }
559 :
560 1 : func (i WriteStallBeginInfo) String() string {
561 1 : return redact.StringWithoutMarkers(i)
562 1 : }
563 :
564 : // SafeFormat implements redact.SafeFormatter.
565 1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
566 1 : w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
567 1 : }
568 :
569 : // LowDiskSpaceInfo contains the information for a LowDiskSpace
570 : // event.
571 : type LowDiskSpaceInfo struct {
572 : // AvailBytes is the disk space available to the current process in bytes.
573 : AvailBytes uint64
574 : // TotalBytes is the total disk space in bytes.
575 : TotalBytes uint64
576 : // PercentThreshold is one of a set of fixed percentages in the
577 : // lowDiskSpaceThresholds below. This event was issued because the disk
578 : // space went below this threshold.
579 : PercentThreshold int
580 : }
581 :
582 0 : func (i LowDiskSpaceInfo) String() string {
583 0 : return redact.StringWithoutMarkers(i)
584 0 : }
585 :
586 : // SafeFormat implements redact.SafeFormatter.
587 0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
588 0 : w.Printf(
589 0 : "available disk space under %d%% (%s of %s)",
590 0 : redact.Safe(i.PercentThreshold),
591 0 : redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
592 0 : redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
593 0 : )
594 0 : }
595 :
596 : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
597 : type PossibleAPIMisuseInfo struct {
598 : Kind APIMisuseKind
599 :
600 : // UserKey is set for the following kinds:
601 : // - IneffectualSingleDelete,
602 : // - NondeterministicSingleDelete.
603 : UserKey []byte
604 : }
605 :
606 1 : func (i PossibleAPIMisuseInfo) String() string {
607 1 : return redact.StringWithoutMarkers(i)
608 1 : }
609 :
610 : // SafeFormat implements redact.SafeFormatter.
611 1 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
612 1 : switch i.Kind {
613 1 : case IneffectualSingleDelete, NondeterministicSingleDelete:
614 1 : w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
615 0 : default:
616 0 : if invariants.Enabled {
617 0 : panic("invalid API misuse event")
618 : }
619 0 : w.Printf("invalid API misuse event")
620 : }
621 : }
622 :
623 : // APIMisuseKind identifies the type of API misuse represented by a
624 : // PossibleAPIMisuse event.
625 : type APIMisuseKind int8
626 :
627 : const (
628 : // IneffectualSingleDelete is emitted in compactions/flushes if any
629 : // single delete is being elided without deleting a point set/merge.
630 : //
631 : // This event can sometimes be a false positive because of delete-only
632 : // compactions which can cause a recent RANGEDEL to peek below an older
633 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
634 : //
635 : // Example:
636 : // RANGEDEL [a, c)#10 in L0
637 : // SINGLEDEL b#5 in L1
638 : // SET b#3 in L6
639 : //
640 : // If the L6 file containing the SET is narrow and the L1 file containing
641 : // the SINGLEDEL is wide, a delete-only compaction can remove the file in
642 : // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
643 : // compacted down, it will not find any SET to delete, resulting in the
644 : // ineffectual callback.
645 : IneffectualSingleDelete APIMisuseKind = iota
646 :
647 : // NondeterministicSingleDelete is emitted in compactions/flushes if any
648 : // single delete has consumed a Set/Merge, and there is another immediately
649 : // older Set/SetWithDelete/Merge. The user of Pebble has violated the
650 : // invariant under which SingleDelete can be used correctly.
651 : //
652 : // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
653 : // ways some of these keys can first meet in a compaction.
654 : //
655 : // - All 3 keys in the same compaction: this callback will detect the
656 : // violation.
657 : //
658 : // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
659 : // disappear. The violation will not be detected, and the DB will have
660 : // Set#1 which is likely incorrect (from the user's perspective).
661 : //
662 : // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
663 : // which will later be consumed by SingleDelete#3. The violation will
664 : // not be detected and the DB will be correct.
665 : //
666 : // This event can sometimes be a false positive because of delete-only
667 : // compactions which can cause a recent RANGEDEL to peek below an older
668 : // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
669 : //
670 : // Example:
671 : // RANGEDEL [a, z)#60 in L0
672 : // SINGLEDEL g#50 in L1
673 : // SET g#40 in L2
674 : // RANGEDEL [g,h)#30 in L3
675 : // SET g#20 in L6
676 : //
677 : // In this example, the two SETs represent the same user write, and the
678 : // RANGEDELs are caused by the CockroachDB range being dropped. That is,
679 : // the user wrote to g once, range was dropped, then added back, which
680 : // caused the SET again, then at some point g was validly deleted using a
681 : // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
682 : // get fragmented due to compactions it has been part of. Say this L3 file
683 : // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
684 : // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
685 : // using a delete-only compaction, resulting in an LSM with state:
686 : //
687 : // RANGEDEL [a, z)#60 in L0
688 : // SINGLEDEL g#50 in L1
689 : // SET g#40 in L2
690 : // SET g#20 in L6
691 : //
692 : // A multi-level compaction involving L1, L2, L6 will cause the invariant
693 : // violation callback. This example doesn't need multi-level compactions:
694 : // say there was a Pebble snapshot at g#21 preventing g#20 from being
695 : // dropped when it meets g#40 in a compaction. That snapshot will not save
696 : // RANGEDEL [g,h)#30, so we can have:
697 : //
698 : // SINGLEDEL g#50 in L1
699 : // SET g#40, SET g#20 in L6
700 : //
701 : // And say the snapshot is removed and then the L1 and L6 compaction
702 : // happens, resulting in the invariant violation callback.
703 : NondeterministicSingleDelete
704 : )
705 :
706 1 : func (k APIMisuseKind) String() string {
707 1 : switch k {
708 1 : case IneffectualSingleDelete:
709 1 : return "ineffectual SINGLEDEL"
710 0 : case NondeterministicSingleDelete:
711 0 : return "nondeterministic SINGLEDEL"
712 0 : default:
713 0 : return "unknown"
714 : }
715 : }
716 :
717 : // EventListener contains a set of functions that will be invoked when various
718 : // significant DB events occur. Note that the functions should not run for an
719 : // excessive amount of time as they are invoked synchronously by the DB and may
720 : // block continued DB work. For a similar reason it is advisable to not perform
721 : // any synchronous calls back into the DB.
722 : type EventListener struct {
723 : // BackgroundError is invoked whenever an error occurs during a background
724 : // operation such as flush or compaction.
725 : BackgroundError func(error)
726 :
727 : // DataCorruption is invoked when an on-disk corruption is detected. It should
728 : // not block, as it is called synchronously in read paths.
729 : DataCorruption func(DataCorruptionInfo)
730 :
731 : // CompactionBegin is invoked after the inputs to a compaction have been
732 : // determined, but before the compaction has produced any output.
733 : CompactionBegin func(CompactionInfo)
734 :
735 : // CompactionEnd is invoked after a compaction has completed and the result
736 : // has been installed.
737 : CompactionEnd func(CompactionInfo)
738 :
739 : // DiskSlow is invoked after a disk write operation on a file created with a
740 : // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
741 : // observed to exceed the specified disk slowness threshold duration. DiskSlow
742 : // is called on a goroutine that is monitoring slowness/stuckness. The callee
743 : // MUST return without doing any IO, or blocking on anything (like a mutex)
744 : // that is waiting on IO. This is imperative in order to reliably monitor for
745 : // slowness, since if this goroutine gets stuck, the monitoring will stop
746 : // working.
747 : DiskSlow func(DiskSlowInfo)
748 :
749 : // FlushBegin is invoked after the inputs to a flush have been determined,
750 : // but before the flush has produced any output.
751 : FlushBegin func(FlushInfo)
752 :
753 : // FlushEnd is invoked after a flush has complated and the result has been
754 : // installed.
755 : FlushEnd func(FlushInfo)
756 :
757 : // DownloadBegin is invoked when a db.Download operation starts or restarts
758 : // (restarts are caused by new external tables being ingested during the
759 : // operation).
760 : DownloadBegin func(DownloadInfo)
761 :
762 : // DownloadEnd is invoked when a db.Download operation completes.
763 : DownloadEnd func(DownloadInfo)
764 :
765 : // FormatUpgrade is invoked after the database's FormatMajorVersion
766 : // is upgraded.
767 : FormatUpgrade func(FormatMajorVersion)
768 :
769 : // ManifestCreated is invoked after a manifest has been created.
770 : ManifestCreated func(ManifestCreateInfo)
771 :
772 : // ManifestDeleted is invoked after a manifest has been deleted.
773 : ManifestDeleted func(ManifestDeleteInfo)
774 :
775 : // TableCreated is invoked when a table has been created.
776 : TableCreated func(TableCreateInfo)
777 :
778 : // TableDeleted is invoked after a table has been deleted.
779 : TableDeleted func(TableDeleteInfo)
780 :
781 : // TableIngested is invoked after an externally created table has been
782 : // ingested via a call to DB.Ingest().
783 : TableIngested func(TableIngestInfo)
784 :
785 : // TableStatsLoaded is invoked at most once, when the table stats
786 : // collector has loaded statistics for all tables that existed at Open.
787 : TableStatsLoaded func(TableStatsInfo)
788 :
789 : // TableValidated is invoked after validation runs on an sstable.
790 : TableValidated func(TableValidatedInfo)
791 :
792 : // WALCreated is invoked after a WAL has been created.
793 : WALCreated func(WALCreateInfo)
794 :
795 : // WALDeleted is invoked after a WAL has been deleted.
796 : WALDeleted func(WALDeleteInfo)
797 :
798 : // WriteStallBegin is invoked when writes are intentionally delayed.
799 : WriteStallBegin func(WriteStallBeginInfo)
800 :
801 : // WriteStallEnd is invoked when delayed writes are released.
802 : WriteStallEnd func()
803 :
804 : // LowDiskSpace is invoked periodically when the disk space is running
805 : // low.
806 : LowDiskSpace func(LowDiskSpaceInfo)
807 :
808 : // PossibleAPIMisuse is invoked when a possible API misuse is detected.
809 : PossibleAPIMisuse func(PossibleAPIMisuseInfo)
810 : }
811 :
812 0 : func backgroundError(logger Logger, err error) {
813 0 : if errors.Is(err, ErrCancelledCompaction) {
814 0 : // ErrCancelledCompaction is not an unexpected error, hence severity INFO.
815 0 : logger.Infof("background error: %s", err)
816 0 : } else {
817 0 : logger.Errorf("background error: %s", err)
818 0 : }
819 : }
820 :
821 : // EnsureDefaults ensures that background error events are logged to the
822 : // specified logger if a handler for those events hasn't been otherwise
823 : // specified. Ensure all handlers are non-nil so that we don't have to check
824 : // for nil-ness before invoking.
825 1 : func (l *EventListener) EnsureDefaults(logger Logger) {
826 1 : if l.BackgroundError == nil {
827 1 : if logger != nil {
828 1 : l.BackgroundError = func(err error) {
829 0 : backgroundError(logger, err)
830 0 : }
831 0 : } else {
832 0 : l.BackgroundError = func(error) {}
833 : }
834 : }
835 1 : if l.DataCorruption == nil {
836 1 : if logger != nil {
837 1 : l.DataCorruption = func(info DataCorruptionInfo) {
838 0 : logger.Fatalf("%s", info)
839 0 : }
840 0 : } else {
841 0 : l.DataCorruption = func(info DataCorruptionInfo) {}
842 : }
843 : }
844 1 : if l.CompactionBegin == nil {
845 1 : l.CompactionBegin = func(info CompactionInfo) {}
846 : }
847 1 : if l.CompactionEnd == nil {
848 1 : l.CompactionEnd = func(info CompactionInfo) {}
849 : }
850 1 : if l.DiskSlow == nil {
851 1 : l.DiskSlow = func(info DiskSlowInfo) {}
852 : }
853 1 : if l.FlushBegin == nil {
854 1 : l.FlushBegin = func(info FlushInfo) {}
855 : }
856 1 : if l.FlushEnd == nil {
857 1 : l.FlushEnd = func(info FlushInfo) {}
858 : }
859 1 : if l.DownloadBegin == nil {
860 1 : l.DownloadBegin = func(info DownloadInfo) {}
861 : }
862 1 : if l.DownloadEnd == nil {
863 1 : l.DownloadEnd = func(info DownloadInfo) {}
864 : }
865 1 : if l.FormatUpgrade == nil {
866 1 : l.FormatUpgrade = func(v FormatMajorVersion) {}
867 : }
868 1 : if l.ManifestCreated == nil {
869 1 : l.ManifestCreated = func(info ManifestCreateInfo) {}
870 : }
871 1 : if l.ManifestDeleted == nil {
872 1 : l.ManifestDeleted = func(info ManifestDeleteInfo) {}
873 : }
874 1 : if l.TableCreated == nil {
875 1 : l.TableCreated = func(info TableCreateInfo) {}
876 : }
877 1 : if l.TableDeleted == nil {
878 1 : l.TableDeleted = func(info TableDeleteInfo) {}
879 : }
880 1 : if l.TableIngested == nil {
881 1 : l.TableIngested = func(info TableIngestInfo) {}
882 : }
883 1 : if l.TableStatsLoaded == nil {
884 1 : l.TableStatsLoaded = func(info TableStatsInfo) {}
885 : }
886 1 : if l.TableValidated == nil {
887 1 : l.TableValidated = func(validated TableValidatedInfo) {}
888 : }
889 1 : if l.WALCreated == nil {
890 1 : l.WALCreated = func(info WALCreateInfo) {}
891 : }
892 1 : if l.WALDeleted == nil {
893 1 : l.WALDeleted = func(info WALDeleteInfo) {}
894 : }
895 1 : if l.WriteStallBegin == nil {
896 1 : l.WriteStallBegin = func(info WriteStallBeginInfo) {}
897 : }
898 1 : if l.WriteStallEnd == nil {
899 1 : l.WriteStallEnd = func() {}
900 : }
901 1 : if l.LowDiskSpace == nil {
902 1 : l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
903 : }
904 1 : if l.PossibleAPIMisuse == nil {
905 1 : l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
906 : }
907 : }
908 :
909 : // MakeLoggingEventListener creates an EventListener that logs all events to the
910 : // specified logger.
911 1 : func MakeLoggingEventListener(logger Logger) EventListener {
912 1 : if logger == nil {
913 0 : logger = DefaultLogger
914 0 : }
915 :
916 1 : return EventListener{
917 1 : BackgroundError: func(err error) {
918 0 : backgroundError(logger, err)
919 0 : },
920 0 : DataCorruption: func(info DataCorruptionInfo) {
921 0 : logger.Errorf("%s", info)
922 0 : },
923 1 : CompactionBegin: func(info CompactionInfo) {
924 1 : logger.Infof("%s", info)
925 1 : },
926 0 : CompactionEnd: func(info CompactionInfo) {
927 0 : logger.Infof("%s", info)
928 0 : },
929 0 : DiskSlow: func(info DiskSlowInfo) {
930 0 : logger.Infof("%s", info)
931 0 : },
932 1 : FlushBegin: func(info FlushInfo) {
933 1 : logger.Infof("%s", info)
934 1 : },
935 0 : FlushEnd: func(info FlushInfo) {
936 0 : logger.Infof("%s", info)
937 0 : },
938 1 : DownloadBegin: func(info DownloadInfo) {
939 1 : logger.Infof("%s", info)
940 1 : },
941 0 : DownloadEnd: func(info DownloadInfo) {
942 0 : logger.Infof("%s", info)
943 0 : },
944 1 : FormatUpgrade: func(v FormatMajorVersion) {
945 1 : logger.Infof("upgraded to format version: %s", v)
946 1 : },
947 0 : ManifestCreated: func(info ManifestCreateInfo) {
948 0 : logger.Infof("%s", info)
949 0 : },
950 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
951 0 : logger.Infof("%s", info)
952 0 : },
953 1 : TableCreated: func(info TableCreateInfo) {
954 1 : logger.Infof("%s", info)
955 1 : },
956 0 : TableDeleted: func(info TableDeleteInfo) {
957 0 : logger.Infof("%s", info)
958 0 : },
959 0 : TableIngested: func(info TableIngestInfo) {
960 0 : logger.Infof("%s", info)
961 0 : },
962 1 : TableStatsLoaded: func(info TableStatsInfo) {
963 1 : logger.Infof("%s", info)
964 1 : },
965 1 : TableValidated: func(info TableValidatedInfo) {
966 1 : logger.Infof("%s", info)
967 1 : },
968 0 : WALCreated: func(info WALCreateInfo) {
969 0 : logger.Infof("%s", info)
970 0 : },
971 0 : WALDeleted: func(info WALDeleteInfo) {
972 0 : logger.Infof("%s", info)
973 0 : },
974 1 : WriteStallBegin: func(info WriteStallBeginInfo) {
975 1 : logger.Infof("%s", info)
976 1 : },
977 1 : WriteStallEnd: func() {
978 1 : logger.Infof("write stall ending")
979 1 : },
980 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
981 0 : logger.Infof("%s", info)
982 0 : },
983 1 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
984 1 : logger.Infof("%s", info)
985 1 : },
986 : }
987 : }
988 :
989 : // TeeEventListener wraps two EventListeners, forwarding all events to both.
990 0 : func TeeEventListener(a, b EventListener) EventListener {
991 0 : a.EnsureDefaults(nil)
992 0 : b.EnsureDefaults(nil)
993 0 : return EventListener{
994 0 : BackgroundError: func(err error) {
995 0 : a.BackgroundError(err)
996 0 : b.BackgroundError(err)
997 0 : },
998 0 : DataCorruption: func(info DataCorruptionInfo) {
999 0 : a.DataCorruption(info)
1000 0 : b.DataCorruption(info)
1001 0 : },
1002 0 : CompactionBegin: func(info CompactionInfo) {
1003 0 : a.CompactionBegin(info)
1004 0 : b.CompactionBegin(info)
1005 0 : },
1006 0 : CompactionEnd: func(info CompactionInfo) {
1007 0 : a.CompactionEnd(info)
1008 0 : b.CompactionEnd(info)
1009 0 : },
1010 0 : DiskSlow: func(info DiskSlowInfo) {
1011 0 : a.DiskSlow(info)
1012 0 : b.DiskSlow(info)
1013 0 : },
1014 0 : FlushBegin: func(info FlushInfo) {
1015 0 : a.FlushBegin(info)
1016 0 : b.FlushBegin(info)
1017 0 : },
1018 0 : FlushEnd: func(info FlushInfo) {
1019 0 : a.FlushEnd(info)
1020 0 : b.FlushEnd(info)
1021 0 : },
1022 0 : DownloadBegin: func(info DownloadInfo) {
1023 0 : a.DownloadBegin(info)
1024 0 : b.DownloadBegin(info)
1025 0 : },
1026 0 : DownloadEnd: func(info DownloadInfo) {
1027 0 : a.DownloadEnd(info)
1028 0 : b.DownloadEnd(info)
1029 0 : },
1030 0 : FormatUpgrade: func(v FormatMajorVersion) {
1031 0 : a.FormatUpgrade(v)
1032 0 : b.FormatUpgrade(v)
1033 0 : },
1034 0 : ManifestCreated: func(info ManifestCreateInfo) {
1035 0 : a.ManifestCreated(info)
1036 0 : b.ManifestCreated(info)
1037 0 : },
1038 0 : ManifestDeleted: func(info ManifestDeleteInfo) {
1039 0 : a.ManifestDeleted(info)
1040 0 : b.ManifestDeleted(info)
1041 0 : },
1042 0 : TableCreated: func(info TableCreateInfo) {
1043 0 : a.TableCreated(info)
1044 0 : b.TableCreated(info)
1045 0 : },
1046 0 : TableDeleted: func(info TableDeleteInfo) {
1047 0 : a.TableDeleted(info)
1048 0 : b.TableDeleted(info)
1049 0 : },
1050 0 : TableIngested: func(info TableIngestInfo) {
1051 0 : a.TableIngested(info)
1052 0 : b.TableIngested(info)
1053 0 : },
1054 0 : TableStatsLoaded: func(info TableStatsInfo) {
1055 0 : a.TableStatsLoaded(info)
1056 0 : b.TableStatsLoaded(info)
1057 0 : },
1058 0 : TableValidated: func(info TableValidatedInfo) {
1059 0 : a.TableValidated(info)
1060 0 : b.TableValidated(info)
1061 0 : },
1062 0 : WALCreated: func(info WALCreateInfo) {
1063 0 : a.WALCreated(info)
1064 0 : b.WALCreated(info)
1065 0 : },
1066 0 : WALDeleted: func(info WALDeleteInfo) {
1067 0 : a.WALDeleted(info)
1068 0 : b.WALDeleted(info)
1069 0 : },
1070 0 : WriteStallBegin: func(info WriteStallBeginInfo) {
1071 0 : a.WriteStallBegin(info)
1072 0 : b.WriteStallBegin(info)
1073 0 : },
1074 0 : WriteStallEnd: func() {
1075 0 : a.WriteStallEnd()
1076 0 : b.WriteStallEnd()
1077 0 : },
1078 0 : LowDiskSpace: func(info LowDiskSpaceInfo) {
1079 0 : a.LowDiskSpace(info)
1080 0 : b.LowDiskSpace(info)
1081 0 : },
1082 0 : PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
1083 0 : a.PossibleAPIMisuse(info)
1084 0 : b.PossibleAPIMisuse(info)
1085 0 : },
1086 : }
1087 : }
1088 :
1089 : // lowDiskSpaceReporter contains the logic to report low disk space events.
1090 : // Report is called whenever we get the disk usage statistics.
1091 : //
1092 : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
1093 : // whenever we reach a new threshold. We periodically repost the event every 30
1094 : // minutes until we are above all thresholds.
1095 : type lowDiskSpaceReporter struct {
1096 : mu struct {
1097 : sync.Mutex
1098 : lastNoticeThreshold int
1099 : lastNoticeTime crtime.Mono
1100 : }
1101 : }
1102 :
1103 : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
1104 :
1105 : const lowDiskSpaceFrequency = 30 * time.Minute
1106 :
1107 1 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
1108 1 : threshold, ok := r.findThreshold(availBytes, totalBytes)
1109 1 : if !ok {
1110 1 : // Normal path.
1111 1 : return
1112 1 : }
1113 0 : if r.shouldReport(threshold, crtime.NowMono()) {
1114 0 : el.LowDiskSpace(LowDiskSpaceInfo{
1115 0 : AvailBytes: availBytes,
1116 0 : TotalBytes: totalBytes,
1117 0 : PercentThreshold: threshold,
1118 0 : })
1119 0 : }
1120 : }
1121 :
1122 : // shouldReport returns true if we should report an event. Updates
1123 : // lastNoticeTime/lastNoticeThreshold appropriately.
1124 0 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
1125 0 : r.mu.Lock()
1126 0 : defer r.mu.Unlock()
1127 0 : if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
1128 0 : now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
1129 0 : r.mu.lastNoticeThreshold = threshold
1130 0 : r.mu.lastNoticeTime = now
1131 0 : return true
1132 0 : }
1133 0 : return false
1134 : }
1135 :
1136 : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
1137 : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
1138 : // there is more free space than the highest threshold).
1139 : func (r *lowDiskSpaceReporter) findThreshold(
1140 : availBytes, totalBytes uint64,
1141 1 : ) (threshold int, ok bool) {
1142 1 : // Note: in the normal path, we exit the loop during the first iteration.
1143 1 : for i, t := range lowDiskSpaceThresholds {
1144 1 : if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
1145 1 : break
1146 : }
1147 0 : threshold = t
1148 0 : ok = true
1149 : }
1150 1 : return threshold, ok
1151 : }
1152 :
1153 0 : func (d *DB) reportSSTableCorruption(meta *manifest.TableMetadata, err error) {
1154 0 : if invariants.Enabled && err == nil {
1155 0 : panic("nil error")
1156 : }
1157 0 : objMeta, lookupErr := d.objProvider.Lookup(base.FileTypeTable, meta.FileBacking.DiskFileNum)
1158 0 : if lookupErr != nil {
1159 0 : // If the object is not known to the provider, it must be a local object
1160 0 : // that was missing when we opened the store. Remote objects have their
1161 0 : // metadata in a catalog, so even if the backing object is deleted, the
1162 0 : // DiskFileNum would still be known.
1163 0 : objMeta = objstorage.ObjectMetadata{DiskFileNum: meta.FileBacking.DiskFileNum, FileType: base.FileTypeTable}
1164 0 : }
1165 0 : path := d.objProvider.Path(objMeta)
1166 0 : if objMeta.IsRemote() {
1167 0 : // Remote path (which include the locator and full path) might not always be
1168 0 : // safe.
1169 0 : err = errors.WithHintf(err, "path: %s", path)
1170 0 : } else {
1171 0 : // Local paths are safe: they start with the store directory and the
1172 0 : // filename is generated by Pebble.
1173 0 : err = errors.WithHintf(err, "path: %s", redact.Safe(path))
1174 0 : }
1175 0 : info := DataCorruptionInfo{
1176 0 : Path: path,
1177 0 : IsRemote: objMeta.IsRemote(),
1178 0 : Locator: objMeta.Remote.Locator,
1179 0 : Bounds: meta.UserKeyBounds(),
1180 0 : Details: err,
1181 0 : }
1182 0 : d.opts.EventListener.DataCorruption(info)
1183 : }
|