LCOV - code coverage report
Current view: top level - pebble - event.go (source / functions) Coverage Total Hit
Test: 2025-09-21 08:18Z 489b133b - tests only.lcov Lines: 82.0 % 699 573
Test Date: 2025-09-21 08:19:57 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "fmt"
       9              :         "strings"
      10              :         "sync"
      11              :         "time"
      12              : 
      13              :         "github.com/cockroachdb/crlib/crhumanize"
      14              :         "github.com/cockroachdb/crlib/crtime"
      15              :         "github.com/cockroachdb/errors"
      16              :         errorsjoin "github.com/cockroachdb/errors/join"
      17              :         "github.com/cockroachdb/pebble/internal/base"
      18              :         "github.com/cockroachdb/pebble/internal/humanize"
      19              :         "github.com/cockroachdb/pebble/internal/invariants"
      20              :         "github.com/cockroachdb/pebble/internal/manifest"
      21              :         "github.com/cockroachdb/pebble/objstorage"
      22              :         "github.com/cockroachdb/pebble/objstorage/remote"
      23              :         "github.com/cockroachdb/pebble/vfs"
      24              :         "github.com/cockroachdb/redact"
      25              : )
      26              : 
      27              : // TableNum is an identifier for a table within a database.
      28              : type TableNum = base.TableNum
      29              : 
      30              : // TableInfo exports the manifest.TableInfo type.
      31              : type TableInfo = manifest.TableInfo
      32              : 
      33            1 : func tablesTotalSize(tables []TableInfo) uint64 {
      34            1 :         var size uint64
      35            1 :         for i := range tables {
      36            1 :                 size += tables[i].Size
      37            1 :         }
      38            1 :         return size
      39              : }
      40              : 
      41            1 : func formatFileNums(tables []TableInfo) string {
      42            1 :         var buf strings.Builder
      43            1 :         for i := range tables {
      44            1 :                 if i > 0 {
      45            1 :                         buf.WriteString(" ")
      46            1 :                 }
      47            1 :                 buf.WriteString(tables[i].FileNum.String())
      48              :         }
      49            1 :         return buf.String()
      50              : }
      51              : 
      52              : // DataCorruptionInfo contains the information for a DataCorruption event.
      53              : type DataCorruptionInfo struct {
      54              :         // Path of the file that is corrupted. For remote files the path starts with
      55              :         // "remote://".
      56              :         Path     string
      57              :         IsRemote bool
      58              :         // Locator is only set when IsRemote is true (note that an empty Locator is
      59              :         // valid even then).
      60              :         Locator remote.Locator
      61              :         // Bounds indicates the keyspace range that is affected.
      62              :         Bounds base.UserKeyBounds
      63              :         // Details of the error. See cockroachdb/error for how to format with or
      64              :         // without redaction.
      65              :         Details error
      66              : }
      67              : 
      68            1 : func (i DataCorruptionInfo) String() string {
      69            1 :         return redact.StringWithoutMarkers(i)
      70            1 : }
      71              : 
      72              : // SafeFormat implements redact.SafeFormatter.
      73            1 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
      74            1 :         w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
      75            1 :         if i.IsRemote {
      76            1 :                 w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
      77            1 :         }
      78            1 :         w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
      79              : }
      80              : 
      81              : // LevelInfo contains info pertaining to a particular level.
      82              : type LevelInfo struct {
      83              :         Level  int
      84              :         Tables []TableInfo
      85              :         Blobs  []BlobFileInfo
      86              :         Score  float64
      87              : }
      88              : 
      89            0 : func (i LevelInfo) String() string {
      90            0 :         return redact.StringWithoutMarkers(i)
      91            0 : }
      92              : 
      93              : // SafeFormat implements redact.SafeFormatter.
      94            1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
      95            1 :         blobInfo := redact.SafeString("")
      96            1 :         if len(i.Blobs) > 0 {
      97            0 :                 pluralBlob := redact.SafeString("s")
      98            0 :                 if len(i.Blobs) == 1 {
      99            0 :                         pluralBlob = ""
     100            0 :                 }
     101            0 :                 blobInfo = redact.SafeString(fmt.Sprintf(" blob%s [%s] (%s)",
     102            0 :                         pluralBlob, formatBlobFileNums(i.Blobs), humanize.Bytes.Uint64(blobsTotalSize(i.Blobs))))
     103              :         }
     104            1 :         w.Printf("L%d [%s] (%s)%s Score=%.2f",
     105            1 :                 redact.Safe(i.Level),
     106            1 :                 redact.Safe(formatFileNums(i.Tables)),
     107            1 :                 redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
     108            1 :                 blobInfo,
     109            1 :                 redact.Safe(i.Score))
     110              : }
     111              : 
     112              : // BlobFileCreateInfo contains the info for a blob file creation event.
     113              : type BlobFileCreateInfo struct {
     114              :         JobID int
     115              :         // Reason is the reason for the table creation: "compacting", "flushing", or
     116              :         // "ingesting".
     117              :         Reason  string
     118              :         Path    string
     119              :         FileNum base.DiskFileNum
     120              : }
     121              : 
     122            1 : func (i BlobFileCreateInfo) String() string {
     123            1 :         return redact.StringWithoutMarkers(i)
     124            1 : }
     125              : 
     126              : // SafeFormat implements redact.SafeFormatter.
     127            1 : func (i BlobFileCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     128            1 :         w.Printf("[JOB %d] %s: blob file created %s",
     129            1 :                 redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
     130            1 : }
     131              : 
     132              : // BlobFileDeleteInfo contains the info for a blob file deletion event.
     133              : type BlobFileDeleteInfo struct {
     134              :         JobID   int
     135              :         Path    string
     136              :         FileNum base.DiskFileNum
     137              :         Err     error
     138              : }
     139              : 
     140            1 : func (i BlobFileDeleteInfo) String() string {
     141            1 :         return redact.StringWithoutMarkers(i)
     142            1 : }
     143              : 
     144              : // SafeFormat implements redact.SafeFormatter.
     145            1 : func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     146            1 :         if i.Err != nil {
     147            0 :                 w.Printf("[JOB %d] blob file delete error %s: %s",
     148            0 :                         redact.Safe(i.JobID), i.FileNum, i.Err)
     149            0 :                 return
     150            0 :         }
     151            1 :         w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
     152              : }
     153              : 
     154              : // BlobFileRewriteInfo contains the info for a blob file rewrite event.
     155              : type BlobFileRewriteInfo struct {
     156              :         // JobID is the ID of the job.
     157              :         JobID int
     158              :         // Input contains the input tables for the compaction organized by level.
     159              :         Input BlobFileInfo
     160              :         // Output contains the output tables generated by the compaction. The output
     161              :         // info is empty for the compaction begin event.
     162              :         Output BlobFileInfo
     163              :         // Duration is the time spent compacting, including reading and writing
     164              :         // files.
     165              :         Duration time.Duration
     166              :         // TotalDuration is the total wall-time duration of the compaction,
     167              :         // including applying the compaction to the database. TotalDuration is
     168              :         // always ≥ Duration.
     169              :         TotalDuration time.Duration
     170              :         Done          bool
     171              :         // Err is set only if Done is true. If non-nil, indicates that the compaction
     172              :         // failed. Note that err can be ErrCancelledCompaction, which can happen
     173              :         // during normal operation.
     174              :         Err error
     175              : }
     176              : 
     177            1 : func (i BlobFileRewriteInfo) String() string {
     178            1 :         return redact.StringWithoutMarkers(i)
     179            1 : }
     180              : 
     181              : // SafeFormat implements redact.SafeFormatter.
     182            1 : func (i BlobFileRewriteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     183            1 :         if i.Err != nil {
     184            0 :                 w.Printf("[JOB %d] blob file (%s, %s) rewrite error: %s",
     185            0 :                         redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum, i.Err)
     186            0 :                 return
     187            0 :         }
     188              : 
     189            1 :         if !i.Done {
     190            1 :                 w.Printf("[JOB %d] rewriting blob file %s (physical file %s)",
     191            1 :                         redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum)
     192            1 :                 return
     193            1 :         }
     194            1 :         w.Printf("[JOB %d] rewrote blob file (%s, %s) (%s) -> (%s, %s) (%s), in %.1fs (%.1fs total)",
     195            1 :                 redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum,
     196            1 :                 redact.Safe(humanize.Bytes.Uint64(i.Input.Size)),
     197            1 :                 i.Output.BlobFileID, i.Output.DiskFileNum,
     198            1 :                 redact.Safe(humanize.Bytes.Uint64(i.Output.Size)),
     199            1 :                 redact.Safe(i.Duration.Seconds()),
     200            1 :                 redact.Safe(i.TotalDuration.Seconds()))
     201              : }
     202              : 
     203              : // BlobFileInfo describes a blob file.
     204              : type BlobFileInfo struct {
     205              :         // BlobFileID is the logical ID of the blob file.
     206              :         BlobFileID base.BlobFileID
     207              :         // DiskFileNum is the file number of the blob file on disk.
     208              :         DiskFileNum base.DiskFileNum
     209              :         // Size is the physical size of the file in bytes.
     210              :         Size uint64
     211              :         // ValueSize is the pre-compressed size of the values in the blob file in
     212              :         // bytes.
     213              :         ValueSize uint64
     214              :         // MVCCGarbageSize is the pre-compressed size of potential MVCC garbage in '
     215              :         // the blob file in bytes.
     216              :         MVCCGarbageSize uint64
     217              : }
     218              : 
     219            1 : func blobsTotalSize(blobs []BlobFileInfo) uint64 {
     220            1 :         var size uint64
     221            1 :         for i := range blobs {
     222            1 :                 size += blobs[i].Size
     223            1 :         }
     224            1 :         return size
     225              : }
     226              : 
     227            1 : func formatBlobFileNums(blobs []BlobFileInfo) string {
     228            1 :         var buf strings.Builder
     229            1 :         for i, blob := range blobs {
     230            1 :                 if i > 0 {
     231            1 :                         buf.WriteString(" ")
     232            1 :                 }
     233            1 :                 buf.WriteString(blob.DiskFileNum.String())
     234            1 :                 if blob.MVCCGarbageSize > 0 {
     235            1 :                         buf.WriteString(fmt.Sprintf(" (MVCCGarbage: %s)",
     236            1 :                                 redact.Safe(crhumanize.Percent(blob.MVCCGarbageSize, blob.ValueSize))))
     237            1 :                 }
     238              :         }
     239            1 :         return buf.String()
     240              : }
     241              : 
     242              : // CompactionInfo contains the info for a compaction event.
     243              : type CompactionInfo struct {
     244              :         // JobID is the ID of the compaction job.
     245              :         JobID int
     246              :         // Reason is the reason for the compaction.
     247              :         Reason string
     248              :         // Input contains the input tables for the compaction organized by level.
     249              :         Input []LevelInfo
     250              :         // Output contains the output tables generated by the compaction. The output
     251              :         // tables are empty for the compaction begin event.
     252              :         Output LevelInfo
     253              :         // Duration is the time spent compacting, including reading and writing
     254              :         // sstables.
     255              :         Duration time.Duration
     256              :         // TotalDuration is the total wall-time duration of the compaction,
     257              :         // including applying the compaction to the database. TotalDuration is
     258              :         // always ≥ Duration.
     259              :         TotalDuration time.Duration
     260              :         Done          bool
     261              :         // Err is set only if Done is true. If non-nil, indicates that the compaction
     262              :         // failed. Note that err can be ErrCancelledCompaction, which can happen
     263              :         // during normal operation.
     264              :         Err error
     265              : 
     266              :         SingleLevelOverlappingRatio float64
     267              :         MultiLevelOverlappingRatio  float64
     268              : 
     269              :         // Annotations specifies additional info to appear in a compaction's event log line
     270              :         Annotations compactionAnnotations
     271              : }
     272              : 
     273              : type compactionAnnotations []string
     274              : 
     275              : // SafeFormat implements redact.SafeFormatter.
     276            1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
     277            1 :         if len(ca) == 0 {
     278            0 :                 return
     279            0 :         }
     280            1 :         for i := range ca {
     281            1 :                 if i != 0 {
     282            1 :                         w.Print(" ")
     283            1 :                 }
     284            1 :                 w.Printf("%s", redact.SafeString(ca[i]))
     285              :         }
     286              : }
     287              : 
     288            1 : func (i CompactionInfo) String() string {
     289            1 :         return redact.StringWithoutMarkers(i)
     290            1 : }
     291              : 
     292              : // SafeFormat implements redact.SafeFormatter.
     293            1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     294            1 :         if i.Err != nil {
     295            1 :                 w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
     296            1 :                         redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
     297            1 :                 return
     298            1 :         }
     299              : 
     300            1 :         if !i.Done {
     301            1 :                 w.Printf("[JOB %d] compacting(%s) ",
     302            1 :                         redact.Safe(i.JobID),
     303            1 :                         redact.SafeString(i.Reason))
     304            1 :                 if len(i.Annotations) > 0 {
     305            1 :                         w.Printf("%s ", i.Annotations)
     306            1 :                 }
     307            1 :                 w.Printf("%s; ", levelInfos(i.Input))
     308            1 :                 w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
     309            1 :                 return
     310              :         }
     311            1 :         outputSize := tablesTotalSize(i.Output.Tables)
     312            1 :         w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
     313            1 :         if len(i.Annotations) > 0 {
     314            1 :                 w.Printf("%s ", i.Annotations)
     315            1 :         }
     316            1 :         w.Print(levelInfos(i.Input))
     317            1 :         w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
     318            1 :                 redact.Safe(i.Output.Level),
     319            1 :                 redact.Safe(formatFileNums(i.Output.Tables)),
     320            1 :                 redact.Safe(humanize.Bytes.Uint64(outputSize)),
     321            1 :                 redact.Safe(i.Duration.Seconds()),
     322            1 :                 redact.Safe(i.TotalDuration.Seconds()),
     323            1 :                 redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     324              : }
     325              : 
     326              : type levelInfos []LevelInfo
     327              : 
     328            1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
     329            1 :         for j, levelInfo := range i {
     330            1 :                 if j > 0 {
     331            1 :                         w.Printf(" + ")
     332            1 :                 }
     333            1 :                 w.Print(levelInfo)
     334              :         }
     335              : }
     336              : 
     337              : // DiskSlowInfo contains the info for a disk slowness event when writing to a
     338              : // file.
     339              : type DiskSlowInfo = vfs.DiskSlowInfo
     340              : 
     341              : // FlushInfo contains the info for a flush event.
     342              : type FlushInfo struct {
     343              :         // JobID is the ID of the flush job.
     344              :         JobID int
     345              :         // Reason is the reason for the flush.
     346              :         Reason string
     347              :         // Input contains the count of input memtables that were flushed.
     348              :         Input int
     349              :         // InputBytes contains the total in-memory size of the memtable(s) that were
     350              :         // flushed. This size includes skiplist indexing data structures.
     351              :         InputBytes uint64
     352              :         // OutputTables contains the output table generated by the flush. The output info
     353              :         // is empty for the flush begin event.
     354              :         OutputTables []TableInfo
     355              :         // OutputBlobs contains the output table generated by the flush. The output info
     356              :         // is empty for the flush begin event.
     357              :         OutputBlobs []BlobFileInfo
     358              :         // Duration is the time spent flushing. This duration includes writing and
     359              :         // syncing all of the flushed keys to sstables.
     360              :         Duration time.Duration
     361              :         // TotalDuration is the total wall-time duration of the flush, including
     362              :         // applying the flush to the database. TotalDuration is always ≥ Duration.
     363              :         TotalDuration time.Duration
     364              :         // Ingest is set to true if the flush is handling tables that were added to
     365              :         // the flushable queue via an ingestion operation.
     366              :         Ingest bool
     367              :         // IngestLevels are the output levels for each ingested table in the flush.
     368              :         // This field is only populated when Ingest is true.
     369              :         IngestLevels []int
     370              :         Done         bool
     371              :         Err          error
     372              : }
     373              : 
     374            1 : func (i FlushInfo) String() string {
     375            1 :         return redact.StringWithoutMarkers(i)
     376            1 : }
     377              : 
     378              : // SafeFormat implements redact.SafeFormatter.
     379            1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     380            1 :         if i.Err != nil {
     381            1 :                 w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
     382            1 :                 return
     383            1 :         }
     384              : 
     385            1 :         plural := redact.SafeString("s")
     386            1 :         if i.Input == 1 {
     387            1 :                 plural = ""
     388            1 :         }
     389            1 :         blobInfo := redact.SafeString("")
     390            1 :         if len(i.OutputBlobs) > 0 {
     391            1 :                 pluralBlob := redact.SafeString("s")
     392            1 :                 if len(i.OutputBlobs) == 1 {
     393            1 :                         pluralBlob = ""
     394            1 :                 }
     395            1 :                 blobInfo = redact.SafeString(fmt.Sprintf(" blob%s [%s] (%s)",
     396            1 :                         pluralBlob, formatBlobFileNums(i.OutputBlobs), humanize.Bytes.Uint64(blobsTotalSize(i.OutputBlobs))))
     397              :         }
     398            1 :         if !i.Done {
     399            1 :                 w.Printf("[JOB %d] ", redact.Safe(i.JobID))
     400            1 :                 if !i.Ingest {
     401            1 :                         w.Printf("flushing %d memtable", redact.Safe(i.Input))
     402            1 :                         w.SafeString(plural)
     403            1 :                         w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
     404            1 :                 } else {
     405            1 :                         w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
     406            1 :                 }
     407            1 :                 return
     408              :         }
     409              : 
     410            1 :         outputSize := tablesTotalSize(i.OutputTables)
     411            1 :         if !i.Ingest {
     412            1 :                 if invariants.Enabled && len(i.IngestLevels) > 0 {
     413            0 :                         panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
     414              :                 }
     415            1 :                 w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s)%s, in %.1fs (%.1fs total), output rate %s/s",
     416            1 :                         redact.Safe(i.JobID), redact.Safe(i.Input), plural,
     417            1 :                         redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
     418            1 :                         redact.Safe(formatFileNums(i.OutputTables)),
     419            1 :                         redact.Safe(humanize.Bytes.Uint64(outputSize)),
     420            1 :                         blobInfo,
     421            1 :                         redact.Safe(i.Duration.Seconds()),
     422            1 :                         redact.Safe(i.TotalDuration.Seconds()),
     423            1 :                         redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     424            1 :         } else {
     425            1 :                 if invariants.Enabled && len(i.IngestLevels) == 0 {
     426            0 :                         panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
     427              :                 }
     428            1 :                 w.Printf("[JOB %d] flushed %d ingested flushable%s",
     429            1 :                         redact.Safe(i.JobID), redact.Safe(len(i.OutputTables)), plural)
     430            1 :                 for j, level := range i.IngestLevels {
     431            1 :                         file := i.OutputTables[j]
     432            1 :                         if j > 0 {
     433            1 :                                 w.Printf(" +")
     434            1 :                         }
     435            1 :                         w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
     436              :                 }
     437            1 :                 w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
     438            1 :                         redact.Safe(i.Duration.Seconds()),
     439            1 :                         redact.Safe(i.TotalDuration.Seconds()),
     440            1 :                         redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     441              :         }
     442              : }
     443              : 
     444              : // DownloadInfo contains the info for a DB.Download() event.
     445              : type DownloadInfo struct {
     446              :         // JobID is the ID of the download job.
     447              :         JobID int
     448              : 
     449              :         Spans []DownloadSpan
     450              : 
     451              :         // Duration is the time since the operation was started.
     452              :         Duration                    time.Duration
     453              :         DownloadCompactionsLaunched int
     454              : 
     455              :         // RestartCount indicates that the download operation restarted because it
     456              :         // noticed that new external files were ingested. A DownloadBegin event with
     457              :         // RestartCount = 0 is the start of the operation; each time we restart it we
     458              :         // have another DownloadBegin event with RestartCount > 0.
     459              :         RestartCount int
     460              :         Done         bool
     461              :         Err          error
     462              : }
     463              : 
     464            1 : func (i DownloadInfo) String() string {
     465            1 :         return redact.StringWithoutMarkers(i)
     466            1 : }
     467              : 
     468              : // SafeFormat implements redact.SafeFormatter.
     469            1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     470            1 :         switch {
     471            0 :         case i.Err != nil:
     472            0 :                 w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
     473              : 
     474            1 :         case i.Done:
     475            1 :                 w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
     476            1 :                         redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
     477              : 
     478            1 :         default:
     479            1 :                 if i.RestartCount == 0 {
     480            1 :                         w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
     481            1 :                 } else {
     482            0 :                         w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
     483            0 :                                 redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
     484            0 :                                 redact.Safe(i.DownloadCompactionsLaunched))
     485            0 :                 }
     486              :         }
     487              : }
     488              : 
     489              : // ManifestCreateInfo contains info about a manifest creation event.
     490              : type ManifestCreateInfo struct {
     491              :         // JobID is the ID of the job the caused the manifest to be created.
     492              :         JobID int
     493              :         Path  string
     494              :         // The file number of the new Manifest.
     495              :         FileNum base.DiskFileNum
     496              :         Err     error
     497              : }
     498              : 
     499            1 : func (i ManifestCreateInfo) String() string {
     500            1 :         return redact.StringWithoutMarkers(i)
     501            1 : }
     502              : 
     503              : // SafeFormat implements redact.SafeFormatter.
     504            1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     505            1 :         if i.Err != nil {
     506            0 :                 w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
     507            0 :                 return
     508            0 :         }
     509            1 :         w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
     510              : }
     511              : 
     512              : // ManifestDeleteInfo contains the info for a Manifest deletion event.
     513              : type ManifestDeleteInfo struct {
     514              :         // JobID is the ID of the job the caused the Manifest to be deleted.
     515              :         JobID   int
     516              :         Path    string
     517              :         FileNum base.DiskFileNum
     518              :         Err     error
     519              : }
     520              : 
     521            1 : func (i ManifestDeleteInfo) String() string {
     522            1 :         return redact.StringWithoutMarkers(i)
     523            1 : }
     524              : 
     525              : // SafeFormat implements redact.SafeFormatter.
     526            1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     527            1 :         if i.Err != nil {
     528            0 :                 w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
     529            0 :                 return
     530            0 :         }
     531            1 :         w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
     532              : }
     533              : 
     534              : // TableCreateInfo contains the info for a table creation event.
     535              : type TableCreateInfo struct {
     536              :         JobID int
     537              :         // Reason is the reason for the table creation: "compacting", "flushing", or
     538              :         // "ingesting".
     539              :         Reason  string
     540              :         Path    string
     541              :         FileNum base.DiskFileNum
     542              : }
     543              : 
     544            1 : func (i TableCreateInfo) String() string {
     545            1 :         return redact.StringWithoutMarkers(i)
     546            1 : }
     547              : 
     548              : // SafeFormat implements redact.SafeFormatter.
     549            1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     550            1 :         w.Printf("[JOB %d] %s: sstable created %s",
     551            1 :                 redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
     552            1 : }
     553              : 
     554              : // TableDeleteInfo contains the info for a table deletion event.
     555              : type TableDeleteInfo struct {
     556              :         JobID   int
     557              :         Path    string
     558              :         FileNum base.DiskFileNum
     559              :         Err     error
     560              : }
     561              : 
     562            1 : func (i TableDeleteInfo) String() string {
     563            1 :         return redact.StringWithoutMarkers(i)
     564            1 : }
     565              : 
     566              : // SafeFormat implements redact.SafeFormatter.
     567            1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     568            1 :         if i.Err != nil {
     569            0 :                 w.Printf("[JOB %d] sstable delete error %s: %s",
     570            0 :                         redact.Safe(i.JobID), i.FileNum, i.Err)
     571            0 :                 return
     572            0 :         }
     573            1 :         w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
     574              : }
     575              : 
     576              : // TableIngestInfo contains the info for a table ingestion event.
     577              : type TableIngestInfo struct {
     578              :         // JobID is the ID of the job the caused the table to be ingested.
     579              :         JobID  int
     580              :         Tables []struct {
     581              :                 TableInfo
     582              :                 Level int
     583              :         }
     584              :         // GlobalSeqNum is the sequence number that was assigned to all entries in
     585              :         // the ingested table.
     586              :         GlobalSeqNum base.SeqNum
     587              :         // flushable indicates whether the ingested sstable was treated as a
     588              :         // flushable.
     589              :         flushable bool
     590              :         Err       error
     591              :         // WaitFlushDuration is the time spent waiting for memtable flushes to
     592              :         // complete, given that an overlap between ingesting sstables and memtables
     593              :         // exists.
     594              :         WaitFlushDuration time.Duration
     595              :         // ManifestUpdateDuration is the time spent updating the manifest.
     596              :         ManifestUpdateDuration time.Duration
     597              :         // BlockReadDuration is the total time spent reading blocks for the ingested
     598              :         // sstable.
     599              :         BlockReadDuration time.Duration
     600              :         // BlockReadBytes is the total number of bytes from blocks read for the
     601              :         // ingested sstable. This does not include bytes read from the block cache.
     602              :         BlockReadBytes uint64
     603              : }
     604              : 
     605            1 : func (i TableIngestInfo) String() string {
     606            1 :         return redact.StringWithoutMarkers(i)
     607            1 : }
     608              : 
     609              : // SafeFormat implements redact.SafeFormatter.
     610            1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     611            1 :         if i.Err != nil {
     612            0 :                 w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
     613            0 :                 return
     614            0 :         }
     615              : 
     616            1 :         if i.flushable {
     617            1 :                 w.Printf("[JOB %d] ingested as flushable, memtable flushes took %.1fs:", redact.Safe(i.JobID),
     618            1 :                         redact.Safe(i.WaitFlushDuration.Seconds()))
     619            1 :         } else {
     620            1 :                 w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
     621            1 :         }
     622              : 
     623            1 :         for j := range i.Tables {
     624            1 :                 t := &i.Tables[j]
     625            1 :                 if j > 0 {
     626            1 :                         w.Printf(",")
     627            1 :                 }
     628            1 :                 levelStr := ""
     629            1 :                 if !i.flushable {
     630            1 :                         levelStr = fmt.Sprintf("L%d:", t.Level)
     631            1 :                 }
     632            1 :                 w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
     633            1 :                         redact.Safe(humanize.Bytes.Uint64(t.Size)))
     634              :         }
     635            1 :         w.Printf("; manifest update took %.1fs; block reads took %.1fs with %s block bytes read",
     636            1 :                 redact.Safe(i.ManifestUpdateDuration.Seconds()), redact.Safe(i.BlockReadDuration.Seconds()),
     637            1 :                 redact.Safe(humanize.Bytes.Uint64(i.BlockReadBytes)))
     638              : }
     639              : 
     640              : // TableStatsInfo contains the info for a table stats loaded event.
     641              : type TableStatsInfo struct {
     642              :         // JobID is the ID of the job that finished loading the initial tables'
     643              :         // stats.
     644              :         JobID int
     645              : }
     646              : 
     647            1 : func (i TableStatsInfo) String() string {
     648            1 :         return redact.StringWithoutMarkers(i)
     649            1 : }
     650              : 
     651              : // SafeFormat implements redact.SafeFormatter.
     652            1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     653            1 :         w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
     654            1 : }
     655              : 
     656              : // TableValidatedInfo contains information on the result of a validation run
     657              : // on an sstable.
     658              : type TableValidatedInfo struct {
     659              :         JobID int
     660              :         Meta  *manifest.TableMetadata
     661              : }
     662              : 
     663            1 : func (i TableValidatedInfo) String() string {
     664            1 :         return redact.StringWithoutMarkers(i)
     665            1 : }
     666              : 
     667              : // SafeFormat implements redact.SafeFormatter.
     668            1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     669            1 :         w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
     670            1 : }
     671              : 
     672              : // WALCreateInfo contains info about a WAL creation event.
     673              : type WALCreateInfo struct {
     674              :         // JobID is the ID of the job the caused the WAL to be created.
     675              :         JobID int
     676              :         Path  string
     677              :         // The file number of the new WAL.
     678              :         FileNum base.DiskFileNum
     679              :         // The file number of a previous WAL which was recycled to create this
     680              :         // one. Zero if recycling did not take place.
     681              :         RecycledFileNum base.DiskFileNum
     682              :         Err             error
     683              : }
     684              : 
     685            1 : func (i WALCreateInfo) String() string {
     686            1 :         return redact.StringWithoutMarkers(i)
     687            1 : }
     688              : 
     689              : // SafeFormat implements redact.SafeFormatter.
     690            1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     691            1 :         if i.Err != nil {
     692            0 :                 w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
     693            0 :                 return
     694            0 :         }
     695              : 
     696            1 :         if i.RecycledFileNum == 0 {
     697            1 :                 w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
     698            1 :                 return
     699            1 :         }
     700              : 
     701            1 :         w.Printf("[JOB %d] WAL created %s (recycled %s)",
     702            1 :                 redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
     703              : }
     704              : 
     705              : // WALDeleteInfo contains the info for a WAL deletion event.
     706              : //
     707              : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
     708              : // is insufficient to infer whether primary or secondary.
     709              : type WALDeleteInfo struct {
     710              :         // JobID is the ID of the job the caused the WAL to be deleted.
     711              :         JobID   int
     712              :         Path    string
     713              :         FileNum base.DiskFileNum
     714              :         Err     error
     715              : }
     716              : 
     717            1 : func (i WALDeleteInfo) String() string {
     718            1 :         return redact.StringWithoutMarkers(i)
     719            1 : }
     720              : 
     721              : // SafeFormat implements redact.SafeFormatter.
     722            1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     723            1 :         if i.Err != nil {
     724            1 :                 w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
     725            1 :                 return
     726            1 :         }
     727            1 :         w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
     728              : }
     729              : 
     730              : // WriteStallBeginInfo contains the info for a write stall begin event.
     731              : type WriteStallBeginInfo struct {
     732              :         Reason string
     733              : }
     734              : 
     735            1 : func (i WriteStallBeginInfo) String() string {
     736            1 :         return redact.StringWithoutMarkers(i)
     737            1 : }
     738              : 
     739              : // SafeFormat implements redact.SafeFormatter.
     740            1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     741            1 :         w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
     742            1 : }
     743              : 
     744              : // LowDiskSpaceInfo contains the information for a LowDiskSpace
     745              : // event.
     746              : type LowDiskSpaceInfo struct {
     747              :         // AvailBytes is the disk space available to the current process in bytes.
     748              :         AvailBytes uint64
     749              :         // TotalBytes is the total disk space in bytes.
     750              :         TotalBytes uint64
     751              :         // PercentThreshold is one of a set of fixed percentages in the
     752              :         // lowDiskSpaceThresholds below. This event was issued because the disk
     753              :         // space went below this threshold.
     754              :         PercentThreshold int
     755              : }
     756              : 
     757            0 : func (i LowDiskSpaceInfo) String() string {
     758            0 :         return redact.StringWithoutMarkers(i)
     759            0 : }
     760              : 
     761              : // SafeFormat implements redact.SafeFormatter.
     762            0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     763            0 :         w.Printf(
     764            0 :                 "available disk space under %d%% (%s of %s)",
     765            0 :                 redact.Safe(i.PercentThreshold),
     766            0 :                 redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
     767            0 :                 redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
     768            0 :         )
     769            0 : }
     770              : 
     771              : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
     772              : type PossibleAPIMisuseInfo struct {
     773              :         Kind APIMisuseKind
     774              : 
     775              :         // UserKey is set for the following kinds:
     776              :         //  - IneffectualSingleDelete,
     777              :         //  - NondeterministicSingleDelete,
     778              :         //  - MissizedDelete,
     779              :         //  - InvalidValue.
     780              :         UserKey []byte
     781              : 
     782              :         // ExtraInfo is set for the following kinds:
     783              :         //  - MissizedDelete: contains "elidedSize=<size>,expectedSize=<size>"
     784              :         //  - InvalidValue: contains "callback=<callbackName>,value=<value>,err=<err>"
     785              :         ExtraInfo redact.RedactableString
     786              : }
     787              : 
     788            1 : func (i PossibleAPIMisuseInfo) String() string {
     789            1 :         return redact.StringWithoutMarkers(i)
     790            1 : }
     791              : 
     792              : // SafeFormat implements redact.SafeFormatter.
     793            1 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     794            1 :         switch i.Kind {
     795            1 :         case IneffectualSingleDelete, NondeterministicSingleDelete:
     796            1 :                 w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
     797            1 :         case MissizedDelete:
     798            1 :                 w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
     799            0 :         case InvalidValue:
     800            0 :                 w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
     801            0 :         default:
     802            0 :                 if invariants.Enabled {
     803            0 :                         panic("invalid API misuse event")
     804              :                 }
     805            0 :                 w.Printf("invalid API misuse event")
     806              :         }
     807              : }
     808              : 
     809              : // APIMisuseKind identifies the type of API misuse represented by a
     810              : // PossibleAPIMisuse event.
     811              : type APIMisuseKind int8
     812              : 
     813              : const (
     814              :         // IneffectualSingleDelete is emitted in compactions/flushes if any
     815              :         // single delete is being elided without deleting a point set/merge.
     816              :         //
     817              :         // This event can sometimes be a false positive because of delete-only
     818              :         // compactions which can cause a recent RANGEDEL to peek below an older
     819              :         // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
     820              :         //
     821              :         // Example:
     822              :         //   RANGEDEL [a, c)#10 in L0
     823              :         //   SINGLEDEL b#5 in L1
     824              :         //   SET b#3 in L6
     825              :         //
     826              :         // If the L6 file containing the SET is narrow and the L1 file containing
     827              :         // the SINGLEDEL is wide, a delete-only compaction can remove the file in
     828              :         // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
     829              :         // compacted down, it will not find any SET to delete, resulting in the
     830              :         // ineffectual callback.
     831              :         IneffectualSingleDelete APIMisuseKind = iota
     832              : 
     833              :         // NondeterministicSingleDelete is emitted in compactions/flushes if any
     834              :         // single delete has consumed a Set/Merge, and there is another immediately
     835              :         // older Set/SetWithDelete/Merge. The user of Pebble has violated the
     836              :         // invariant under which SingleDelete can be used correctly.
     837              :         //
     838              :         // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
     839              :         // ways some of these keys can first meet in a compaction.
     840              :         //
     841              :         // - All 3 keys in the same compaction: this callback will detect the
     842              :         //   violation.
     843              :         //
     844              :         // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
     845              :         //   disappear. The violation will not be detected, and the DB will have
     846              :         //   Set#1 which is likely incorrect (from the user's perspective).
     847              :         //
     848              :         // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
     849              :         //   which will later be consumed by SingleDelete#3. The violation will
     850              :         //   not be detected and the DB will be correct.
     851              :         //
     852              :         // This event can sometimes be a false positive because of delete-only
     853              :         // compactions which can cause a recent RANGEDEL to peek below an older
     854              :         // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
     855              :         //
     856              :         // Example:
     857              :         //   RANGEDEL [a, z)#60 in L0
     858              :         //   SINGLEDEL g#50 in L1
     859              :         //   SET g#40 in L2
     860              :         //   RANGEDEL [g,h)#30 in L3
     861              :         //   SET g#20 in L6
     862              :         //
     863              :         // In this example, the two SETs represent the same user write, and the
     864              :         // RANGEDELs are caused by the CockroachDB range being dropped. That is,
     865              :         // the user wrote to g once, range was dropped, then added back, which
     866              :         // caused the SET again, then at some point g was validly deleted using a
     867              :         // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
     868              :         // get fragmented due to compactions it has been part of. Say this L3 file
     869              :         // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
     870              :         // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
     871              :         // using a delete-only compaction, resulting in an LSM with state:
     872              :         //
     873              :         //   RANGEDEL [a, z)#60 in L0
     874              :         //   SINGLEDEL g#50 in L1
     875              :         //   SET g#40 in L2
     876              :         //   SET g#20 in L6
     877              :         //
     878              :         // A multi-level compaction involving L1, L2, L6 will cause the invariant
     879              :         // violation callback. This example doesn't need multi-level compactions:
     880              :         // say there was a Pebble snapshot at g#21 preventing g#20 from being
     881              :         // dropped when it meets g#40 in a compaction. That snapshot will not save
     882              :         // RANGEDEL [g,h)#30, so we can have:
     883              :         //
     884              :         //   SINGLEDEL g#50 in L1
     885              :         //   SET g#40, SET g#20 in L6
     886              :         //
     887              :         // And say the snapshot is removed and then the L1 and L6 compaction
     888              :         // happens, resulting in the invariant violation callback.
     889              :         NondeterministicSingleDelete
     890              : 
     891              :         // MissizedDelete is emitted when a DELSIZED tombstone is found that did
     892              :         // not accurately record the size of the value it deleted. This can lead to
     893              :         // incorrect behavior in compactions.
     894              :         MissizedDelete
     895              : 
     896              :         // InvalidValue is emitted when a user-implemented callback (such as
     897              :         // ShortAttributeExtractor) returns an error for a committed value. This
     898              :         // suggests that either the callback is not implemented for all possible
     899              :         // values or a malformed value was committed to the DB.
     900              :         InvalidValue
     901              : )
     902              : 
     903            1 : func (k APIMisuseKind) String() string {
     904            1 :         switch k {
     905            1 :         case IneffectualSingleDelete:
     906            1 :                 return "ineffectual SINGLEDEL"
     907            0 :         case NondeterministicSingleDelete:
     908            0 :                 return "nondeterministic SINGLEDEL"
     909            1 :         case MissizedDelete:
     910            1 :                 return "missized DELSIZED"
     911            0 :         case InvalidValue:
     912            0 :                 return "invalid value"
     913            0 :         default:
     914            0 :                 return "unknown"
     915              :         }
     916              : }
     917              : 
     918              : // EventListener contains a set of functions that will be invoked when various
     919              : // significant DB events occur. Note that the functions should not run for an
     920              : // excessive amount of time as they are invoked synchronously by the DB and may
     921              : // block continued DB work. For a similar reason it is advisable to not perform
     922              : // any synchronous calls back into the DB.
     923              : type EventListener struct {
     924              :         // BackgroundError is invoked whenever an error occurs during a background
     925              :         // operation such as flush or compaction.
     926              :         BackgroundError func(error)
     927              : 
     928              :         // BlobFileCreated is invoked after a blob file has been created.
     929              :         BlobFileCreated func(BlobFileCreateInfo)
     930              : 
     931              :         // BlobFileDeleted is invoked after a blob file has been deleted.
     932              :         BlobFileDeleted func(BlobFileDeleteInfo)
     933              : 
     934              :         // BlobFileRewriteBegin is invoked when a blob file rewrite compaction begins.
     935              :         BlobFileRewriteBegin func(BlobFileRewriteInfo)
     936              : 
     937              :         // BlobFileRewriteEnd is invoked when a blob file rewrite compaction ends.
     938              :         BlobFileRewriteEnd func(BlobFileRewriteInfo)
     939              : 
     940              :         // DataCorruption is invoked when an on-disk corruption is detected. It should
     941              :         // not block, as it is called synchronously in read paths.
     942              :         DataCorruption func(DataCorruptionInfo)
     943              : 
     944              :         // CompactionBegin is invoked after the inputs to a compaction have been
     945              :         // determined, but before the compaction has produced any output.
     946              :         CompactionBegin func(CompactionInfo)
     947              : 
     948              :         // CompactionEnd is invoked after a compaction has completed and the result
     949              :         // has been installed.
     950              :         CompactionEnd func(CompactionInfo)
     951              : 
     952              :         // DiskSlow is invoked after a disk write operation on a file created with a
     953              :         // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
     954              :         // observed to exceed the specified disk slowness threshold duration. DiskSlow
     955              :         // is called on a goroutine that is monitoring slowness/stuckness. The callee
     956              :         // MUST return without doing any IO, or blocking on anything (like a mutex)
     957              :         // that is waiting on IO. This is imperative in order to reliably monitor for
     958              :         // slowness, since if this goroutine gets stuck, the monitoring will stop
     959              :         // working.
     960              :         DiskSlow func(DiskSlowInfo)
     961              : 
     962              :         // FlushBegin is invoked after the inputs to a flush have been determined,
     963              :         // but before the flush has produced any output.
     964              :         FlushBegin func(FlushInfo)
     965              : 
     966              :         // FlushEnd is invoked after a flush has complated and the result has been
     967              :         // installed.
     968              :         FlushEnd func(FlushInfo)
     969              : 
     970              :         // DownloadBegin is invoked when a db.Download operation starts or restarts
     971              :         // (restarts are caused by new external tables being ingested during the
     972              :         // operation).
     973              :         DownloadBegin func(DownloadInfo)
     974              : 
     975              :         // DownloadEnd is invoked when a db.Download operation completes.
     976              :         DownloadEnd func(DownloadInfo)
     977              : 
     978              :         // FormatUpgrade is invoked after the database's FormatMajorVersion
     979              :         // is upgraded.
     980              :         FormatUpgrade func(FormatMajorVersion)
     981              : 
     982              :         // ManifestCreated is invoked after a manifest has been created.
     983              :         ManifestCreated func(ManifestCreateInfo)
     984              : 
     985              :         // ManifestDeleted is invoked after a manifest has been deleted.
     986              :         ManifestDeleted func(ManifestDeleteInfo)
     987              : 
     988              :         // TableCreated is invoked when a table has been created.
     989              :         TableCreated func(TableCreateInfo)
     990              : 
     991              :         // TableDeleted is invoked after a table has been deleted.
     992              :         TableDeleted func(TableDeleteInfo)
     993              : 
     994              :         // TableIngested is invoked after an externally created table has been
     995              :         // ingested via a call to DB.Ingest().
     996              :         TableIngested func(TableIngestInfo)
     997              : 
     998              :         // TableStatsLoaded is invoked at most once, when the table stats
     999              :         // collector has loaded statistics for all tables that existed at Open.
    1000              :         TableStatsLoaded func(TableStatsInfo)
    1001              : 
    1002              :         // TableValidated is invoked after validation runs on an sstable.
    1003              :         TableValidated func(TableValidatedInfo)
    1004              : 
    1005              :         // WALCreated is invoked after a WAL has been created.
    1006              :         WALCreated func(WALCreateInfo)
    1007              : 
    1008              :         // WALDeleted is invoked after a WAL has been deleted.
    1009              :         WALDeleted func(WALDeleteInfo)
    1010              : 
    1011              :         // WriteStallBegin is invoked when writes are intentionally delayed.
    1012              :         WriteStallBegin func(WriteStallBeginInfo)
    1013              : 
    1014              :         // WriteStallEnd is invoked when delayed writes are released.
    1015              :         WriteStallEnd func()
    1016              : 
    1017              :         // LowDiskSpace is invoked periodically when the disk space is running
    1018              :         // low.
    1019              :         LowDiskSpace func(LowDiskSpaceInfo)
    1020              : 
    1021              :         // PossibleAPIMisuse is invoked when a possible API misuse is detected.
    1022              :         PossibleAPIMisuse func(PossibleAPIMisuseInfo)
    1023              : }
    1024              : 
    1025              : // EnsureDefaults ensures that background error events are logged to the
    1026              : // specified logger if a handler for those events hasn't been otherwise
    1027              : // specified. Ensure all handlers are non-nil so that we don't have to check
    1028              : // for nil-ness before invoking.
    1029            1 : func (l *EventListener) EnsureDefaults(logger Logger) {
    1030            1 :         if l.BackgroundError == nil {
    1031            1 :                 if logger != nil {
    1032            1 :                         l.BackgroundError = func(err error) {
    1033            1 :                                 logger.Errorf("background error: %s", err)
    1034            1 :                         }
    1035            1 :                 } else {
    1036            1 :                         l.BackgroundError = func(error) {}
    1037              :                 }
    1038              :         }
    1039            1 :         if l.BlobFileCreated == nil {
    1040            1 :                 l.BlobFileCreated = func(info BlobFileCreateInfo) {}
    1041              :         }
    1042            1 :         if l.BlobFileDeleted == nil {
    1043            1 :                 l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
    1044              :         }
    1045            1 :         if l.BlobFileRewriteBegin == nil {
    1046            1 :                 l.BlobFileRewriteBegin = func(info BlobFileRewriteInfo) {}
    1047              :         }
    1048            1 :         if l.BlobFileRewriteEnd == nil {
    1049            1 :                 l.BlobFileRewriteEnd = func(info BlobFileRewriteInfo) {}
    1050              :         }
    1051            1 :         if l.DataCorruption == nil {
    1052            1 :                 if logger != nil {
    1053            1 :                         l.DataCorruption = func(info DataCorruptionInfo) {
    1054            1 :                                 logger.Fatalf("%s", info)
    1055            1 :                         }
    1056            1 :                 } else {
    1057            1 :                         l.DataCorruption = func(info DataCorruptionInfo) {}
    1058              :                 }
    1059              :         }
    1060            1 :         if l.CompactionBegin == nil {
    1061            1 :                 l.CompactionBegin = func(info CompactionInfo) {}
    1062              :         }
    1063            1 :         if l.CompactionEnd == nil {
    1064            1 :                 l.CompactionEnd = func(info CompactionInfo) {}
    1065              :         }
    1066            1 :         if l.DiskSlow == nil {
    1067            1 :                 l.DiskSlow = func(info DiskSlowInfo) {}
    1068              :         }
    1069            1 :         if l.FlushBegin == nil {
    1070            1 :                 l.FlushBegin = func(info FlushInfo) {}
    1071              :         }
    1072            1 :         if l.FlushEnd == nil {
    1073            1 :                 l.FlushEnd = func(info FlushInfo) {}
    1074              :         }
    1075            1 :         if l.DownloadBegin == nil {
    1076            1 :                 l.DownloadBegin = func(info DownloadInfo) {}
    1077              :         }
    1078            1 :         if l.DownloadEnd == nil {
    1079            1 :                 l.DownloadEnd = func(info DownloadInfo) {}
    1080              :         }
    1081            1 :         if l.FormatUpgrade == nil {
    1082            1 :                 l.FormatUpgrade = func(v FormatMajorVersion) {}
    1083              :         }
    1084            1 :         if l.ManifestCreated == nil {
    1085            1 :                 l.ManifestCreated = func(info ManifestCreateInfo) {}
    1086              :         }
    1087            1 :         if l.ManifestDeleted == nil {
    1088            1 :                 l.ManifestDeleted = func(info ManifestDeleteInfo) {}
    1089              :         }
    1090            1 :         if l.TableCreated == nil {
    1091            1 :                 l.TableCreated = func(info TableCreateInfo) {}
    1092              :         }
    1093            1 :         if l.TableDeleted == nil {
    1094            1 :                 l.TableDeleted = func(info TableDeleteInfo) {}
    1095              :         }
    1096            1 :         if l.TableIngested == nil {
    1097            1 :                 l.TableIngested = func(info TableIngestInfo) {}
    1098              :         }
    1099            1 :         if l.TableStatsLoaded == nil {
    1100            1 :                 l.TableStatsLoaded = func(info TableStatsInfo) {}
    1101              :         }
    1102            1 :         if l.TableValidated == nil {
    1103            1 :                 l.TableValidated = func(validated TableValidatedInfo) {}
    1104              :         }
    1105            1 :         if l.WALCreated == nil {
    1106            1 :                 l.WALCreated = func(info WALCreateInfo) {}
    1107              :         }
    1108            1 :         if l.WALDeleted == nil {
    1109            1 :                 l.WALDeleted = func(info WALDeleteInfo) {}
    1110              :         }
    1111            1 :         if l.WriteStallBegin == nil {
    1112            1 :                 l.WriteStallBegin = func(info WriteStallBeginInfo) {}
    1113              :         }
    1114            1 :         if l.WriteStallEnd == nil {
    1115            1 :                 l.WriteStallEnd = func() {}
    1116              :         }
    1117            1 :         if l.LowDiskSpace == nil {
    1118            1 :                 l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
    1119              :         }
    1120            1 :         if l.PossibleAPIMisuse == nil {
    1121            1 :                 l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
    1122              :         }
    1123              : }
    1124              : 
    1125              : // MakeLoggingEventListener creates an EventListener that logs all events to the
    1126              : // specified logger.
    1127            1 : func MakeLoggingEventListener(logger Logger) EventListener {
    1128            1 :         if logger == nil {
    1129            1 :                 logger = DefaultLogger
    1130            1 :         }
    1131              : 
    1132            1 :         return EventListener{
    1133            1 :                 BackgroundError: func(err error) {
    1134            1 :                         logger.Errorf("background error: %s", err)
    1135            1 :                 },
    1136            1 :                 BlobFileCreated: func(info BlobFileCreateInfo) {
    1137            1 :                         logger.Infof("%s", info)
    1138            1 :                 },
    1139            1 :                 BlobFileDeleted: func(info BlobFileDeleteInfo) {
    1140            1 :                         logger.Infof("%s", info)
    1141            1 :                 },
    1142            1 :                 BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
    1143            1 :                         logger.Infof("%s", info)
    1144            1 :                 },
    1145            1 :                 BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
    1146            1 :                         logger.Infof("%s", info)
    1147            1 :                 },
    1148            1 :                 DataCorruption: func(info DataCorruptionInfo) {
    1149            1 :                         logger.Errorf("%s", info)
    1150            1 :                 },
    1151            1 :                 CompactionBegin: func(info CompactionInfo) {
    1152            1 :                         logger.Infof("%s", info)
    1153            1 :                 },
    1154            1 :                 CompactionEnd: func(info CompactionInfo) {
    1155            1 :                         logger.Infof("%s", info)
    1156            1 :                 },
    1157            0 :                 DiskSlow: func(info DiskSlowInfo) {
    1158            0 :                         logger.Infof("%s", info)
    1159            0 :                 },
    1160            1 :                 FlushBegin: func(info FlushInfo) {
    1161            1 :                         logger.Infof("%s", info)
    1162            1 :                 },
    1163            1 :                 FlushEnd: func(info FlushInfo) {
    1164            1 :                         logger.Infof("%s", info)
    1165            1 :                 },
    1166            1 :                 DownloadBegin: func(info DownloadInfo) {
    1167            1 :                         logger.Infof("%s", info)
    1168            1 :                 },
    1169            1 :                 DownloadEnd: func(info DownloadInfo) {
    1170            1 :                         logger.Infof("%s", info)
    1171            1 :                 },
    1172            1 :                 FormatUpgrade: func(v FormatMajorVersion) {
    1173            1 :                         logger.Infof("upgraded to format version: %s", v)
    1174            1 :                 },
    1175            1 :                 ManifestCreated: func(info ManifestCreateInfo) {
    1176            1 :                         logger.Infof("%s", info)
    1177            1 :                 },
    1178            1 :                 ManifestDeleted: func(info ManifestDeleteInfo) {
    1179            1 :                         logger.Infof("%s", info)
    1180            1 :                 },
    1181            1 :                 TableCreated: func(info TableCreateInfo) {
    1182            1 :                         logger.Infof("%s", info)
    1183            1 :                 },
    1184            1 :                 TableDeleted: func(info TableDeleteInfo) {
    1185            1 :                         logger.Infof("%s", info)
    1186            1 :                 },
    1187            1 :                 TableIngested: func(info TableIngestInfo) {
    1188            1 :                         logger.Infof("%s", info)
    1189            1 :                 },
    1190            1 :                 TableStatsLoaded: func(info TableStatsInfo) {
    1191            1 :                         logger.Infof("%s", info)
    1192            1 :                 },
    1193            1 :                 TableValidated: func(info TableValidatedInfo) {
    1194            1 :                         logger.Infof("%s", info)
    1195            1 :                 },
    1196            1 :                 WALCreated: func(info WALCreateInfo) {
    1197            1 :                         logger.Infof("%s", info)
    1198            1 :                 },
    1199            1 :                 WALDeleted: func(info WALDeleteInfo) {
    1200            1 :                         logger.Infof("%s", info)
    1201            1 :                 },
    1202            1 :                 WriteStallBegin: func(info WriteStallBeginInfo) {
    1203            1 :                         logger.Infof("%s", info)
    1204            1 :                 },
    1205            1 :                 WriteStallEnd: func() {
    1206            1 :                         logger.Infof("write stall ending")
    1207            1 :                 },
    1208            0 :                 LowDiskSpace: func(info LowDiskSpaceInfo) {
    1209            0 :                         logger.Infof("%s", info)
    1210            0 :                 },
    1211            1 :                 PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
    1212            1 :                         logger.Infof("%s", info)
    1213            1 :                 },
    1214              :         }
    1215              : }
    1216              : 
    1217              : // TeeEventListener wraps two EventListeners, forwarding all events to both.
    1218            1 : func TeeEventListener(a, b EventListener) EventListener {
    1219            1 :         a.EnsureDefaults(nil)
    1220            1 :         b.EnsureDefaults(nil)
    1221            1 :         return EventListener{
    1222            1 :                 BackgroundError: func(err error) {
    1223            1 :                         a.BackgroundError(err)
    1224            1 :                         b.BackgroundError(err)
    1225            1 :                 },
    1226            0 :                 BlobFileCreated: func(info BlobFileCreateInfo) {
    1227            0 :                         a.BlobFileCreated(info)
    1228            0 :                         b.BlobFileCreated(info)
    1229            0 :                 },
    1230            0 :                 BlobFileDeleted: func(info BlobFileDeleteInfo) {
    1231            0 :                         a.BlobFileDeleted(info)
    1232            0 :                         b.BlobFileDeleted(info)
    1233            0 :                 },
    1234            0 :                 BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
    1235            0 :                         a.BlobFileRewriteBegin(info)
    1236            0 :                         b.BlobFileRewriteBegin(info)
    1237            0 :                 },
    1238            0 :                 BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
    1239            0 :                         a.BlobFileRewriteEnd(info)
    1240            0 :                         b.BlobFileRewriteEnd(info)
    1241            0 :                 },
    1242            1 :                 DataCorruption: func(info DataCorruptionInfo) {
    1243            1 :                         a.DataCorruption(info)
    1244            1 :                         b.DataCorruption(info)
    1245            1 :                 },
    1246            1 :                 CompactionBegin: func(info CompactionInfo) {
    1247            1 :                         a.CompactionBegin(info)
    1248            1 :                         b.CompactionBegin(info)
    1249            1 :                 },
    1250            1 :                 CompactionEnd: func(info CompactionInfo) {
    1251            1 :                         a.CompactionEnd(info)
    1252            1 :                         b.CompactionEnd(info)
    1253            1 :                 },
    1254            0 :                 DiskSlow: func(info DiskSlowInfo) {
    1255            0 :                         a.DiskSlow(info)
    1256            0 :                         b.DiskSlow(info)
    1257            0 :                 },
    1258            1 :                 FlushBegin: func(info FlushInfo) {
    1259            1 :                         a.FlushBegin(info)
    1260            1 :                         b.FlushBegin(info)
    1261            1 :                 },
    1262            1 :                 FlushEnd: func(info FlushInfo) {
    1263            1 :                         a.FlushEnd(info)
    1264            1 :                         b.FlushEnd(info)
    1265            1 :                 },
    1266            0 :                 DownloadBegin: func(info DownloadInfo) {
    1267            0 :                         a.DownloadBegin(info)
    1268            0 :                         b.DownloadBegin(info)
    1269            0 :                 },
    1270            0 :                 DownloadEnd: func(info DownloadInfo) {
    1271            0 :                         a.DownloadEnd(info)
    1272            0 :                         b.DownloadEnd(info)
    1273            0 :                 },
    1274            1 :                 FormatUpgrade: func(v FormatMajorVersion) {
    1275            1 :                         a.FormatUpgrade(v)
    1276            1 :                         b.FormatUpgrade(v)
    1277            1 :                 },
    1278            1 :                 ManifestCreated: func(info ManifestCreateInfo) {
    1279            1 :                         a.ManifestCreated(info)
    1280            1 :                         b.ManifestCreated(info)
    1281            1 :                 },
    1282            0 :                 ManifestDeleted: func(info ManifestDeleteInfo) {
    1283            0 :                         a.ManifestDeleted(info)
    1284            0 :                         b.ManifestDeleted(info)
    1285            0 :                 },
    1286            1 :                 TableCreated: func(info TableCreateInfo) {
    1287            1 :                         a.TableCreated(info)
    1288            1 :                         b.TableCreated(info)
    1289            1 :                 },
    1290            1 :                 TableDeleted: func(info TableDeleteInfo) {
    1291            1 :                         a.TableDeleted(info)
    1292            1 :                         b.TableDeleted(info)
    1293            1 :                 },
    1294            1 :                 TableIngested: func(info TableIngestInfo) {
    1295            1 :                         a.TableIngested(info)
    1296            1 :                         b.TableIngested(info)
    1297            1 :                 },
    1298            1 :                 TableStatsLoaded: func(info TableStatsInfo) {
    1299            1 :                         a.TableStatsLoaded(info)
    1300            1 :                         b.TableStatsLoaded(info)
    1301            1 :                 },
    1302            0 :                 TableValidated: func(info TableValidatedInfo) {
    1303            0 :                         a.TableValidated(info)
    1304            0 :                         b.TableValidated(info)
    1305            0 :                 },
    1306            1 :                 WALCreated: func(info WALCreateInfo) {
    1307            1 :                         a.WALCreated(info)
    1308            1 :                         b.WALCreated(info)
    1309            1 :                 },
    1310            1 :                 WALDeleted: func(info WALDeleteInfo) {
    1311            1 :                         a.WALDeleted(info)
    1312            1 :                         b.WALDeleted(info)
    1313            1 :                 },
    1314            0 :                 WriteStallBegin: func(info WriteStallBeginInfo) {
    1315            0 :                         a.WriteStallBegin(info)
    1316            0 :                         b.WriteStallBegin(info)
    1317            0 :                 },
    1318            0 :                 WriteStallEnd: func() {
    1319            0 :                         a.WriteStallEnd()
    1320            0 :                         b.WriteStallEnd()
    1321            0 :                 },
    1322            0 :                 LowDiskSpace: func(info LowDiskSpaceInfo) {
    1323            0 :                         a.LowDiskSpace(info)
    1324            0 :                         b.LowDiskSpace(info)
    1325            0 :                 },
    1326            0 :                 PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
    1327            0 :                         a.PossibleAPIMisuse(info)
    1328            0 :                         b.PossibleAPIMisuse(info)
    1329            0 :                 },
    1330              :         }
    1331              : }
    1332              : 
    1333              : // lowDiskSpaceReporter contains the logic to report low disk space events.
    1334              : // Report is called whenever we get the disk usage statistics.
    1335              : //
    1336              : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
    1337              : // whenever we reach a new threshold. We periodically repost the event every 30
    1338              : // minutes until we are above all thresholds.
    1339              : type lowDiskSpaceReporter struct {
    1340              :         mu struct {
    1341              :                 sync.Mutex
    1342              :                 lastNoticeThreshold int
    1343              :                 lastNoticeTime      crtime.Mono
    1344              :         }
    1345              : }
    1346              : 
    1347              : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
    1348              : 
    1349              : const lowDiskSpaceFrequency = 30 * time.Minute
    1350              : 
    1351            1 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
    1352            1 :         threshold, ok := r.findThreshold(availBytes, totalBytes)
    1353            1 :         if !ok {
    1354            1 :                 // Normal path.
    1355            1 :                 return
    1356            1 :         }
    1357            1 :         if r.shouldReport(threshold, crtime.NowMono()) {
    1358            1 :                 el.LowDiskSpace(LowDiskSpaceInfo{
    1359            1 :                         AvailBytes:       availBytes,
    1360            1 :                         TotalBytes:       totalBytes,
    1361            1 :                         PercentThreshold: threshold,
    1362            1 :                 })
    1363            1 :         }
    1364              : }
    1365              : 
    1366              : // shouldReport returns true if we should report an event. Updates
    1367              : // lastNoticeTime/lastNoticeThreshold appropriately.
    1368            1 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
    1369            1 :         r.mu.Lock()
    1370            1 :         defer r.mu.Unlock()
    1371            1 :         if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
    1372            1 :                 now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
    1373            1 :                 r.mu.lastNoticeThreshold = threshold
    1374            1 :                 r.mu.lastNoticeTime = now
    1375            1 :                 return true
    1376            1 :         }
    1377            1 :         return false
    1378              : }
    1379              : 
    1380              : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
    1381              : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
    1382              : // there is more free space than the highest threshold).
    1383              : func (r *lowDiskSpaceReporter) findThreshold(
    1384              :         availBytes, totalBytes uint64,
    1385            1 : ) (threshold int, ok bool) {
    1386            1 :         // Note: in the normal path, we exit the loop during the first iteration.
    1387            1 :         for i, t := range lowDiskSpaceThresholds {
    1388            1 :                 if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
    1389            1 :                         break
    1390              :                 }
    1391            1 :                 threshold = t
    1392            1 :                 ok = true
    1393              :         }
    1394            1 :         return threshold, ok
    1395              : }
    1396              : 
    1397              : // reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
    1398              : // to the event listener and also adds a DataCorruptionInfo payload to the error.
    1399            1 : func (d *DB) reportCorruption(meta base.ObjectInfo, err error) error {
    1400            1 :         if invariants.Enabled && !IsCorruptionError(err) {
    1401            0 :                 panic("not a corruption error")
    1402              :         }
    1403            1 :         fileType, fileNum := meta.FileInfo()
    1404            1 : 
    1405            1 :         objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
    1406            1 :         if lookupErr != nil {
    1407            1 :                 // If the object is not known to the provider, it must be a local object
    1408            1 :                 // that was missing when we opened the store. Remote objects have their
    1409            1 :                 // metadata in a catalog, so even if the backing object is deleted, the
    1410            1 :                 // DiskFileNum would still be known.
    1411            1 :                 objMeta = objstorage.ObjectMetadata{DiskFileNum: fileNum, FileType: fileType}
    1412            1 :         }
    1413            1 :         path := d.objProvider.Path(objMeta)
    1414            1 :         if objMeta.IsRemote() {
    1415            1 :                 // Remote path (which include the locator and full path) might not always be
    1416            1 :                 // safe.
    1417            1 :                 err = errors.WithHintf(err, "path: %s", path)
    1418            1 :         } else {
    1419            1 :                 // Local paths are safe: they start with the store directory and the
    1420            1 :                 // filename is generated by Pebble.
    1421            1 :                 err = errors.WithHintf(err, "path: %s", redact.Safe(path))
    1422            1 :         }
    1423            1 :         info := DataCorruptionInfo{
    1424            1 :                 Path:     path,
    1425            1 :                 IsRemote: objMeta.IsRemote(),
    1426            1 :                 Locator:  objMeta.Remote.Locator,
    1427            1 :                 Bounds:   meta.UserKeyBounds(),
    1428            1 :                 Details:  err,
    1429            1 :         }
    1430            1 :         d.opts.EventListener.DataCorruption(info)
    1431            1 :         // We don't use errors.Join() because that also annotates with this stack
    1432            1 :         // trace which would not be useful.
    1433            1 :         return errorsjoin.Join(err, &corruptionDetailError{info: info})
    1434              : }
    1435              : 
    1436              : type corruptionDetailError struct {
    1437              :         info DataCorruptionInfo
    1438              : }
    1439              : 
    1440            1 : func (e *corruptionDetailError) Error() string {
    1441            1 :         return "<corruption detail carrier>"
    1442            1 : }
    1443              : 
    1444              : // ExtractDataCorruptionInfo extracts the DataCorruptionInfo details from a
    1445              : // corruption error. Returns nil if there is no such detail.
    1446            1 : func ExtractDataCorruptionInfo(err error) *DataCorruptionInfo {
    1447            1 :         var e *corruptionDetailError
    1448            1 :         if errors.As(err, &e) {
    1449            1 :                 return &e.info
    1450            1 :         }
    1451            0 :         return nil
    1452              : }
        

Generated by: LCOV version 2.0-1