LCOV - code coverage report
Current view: top level - pebble - event.go (source / functions) Coverage Total Hit
Test: 2025-08-09 08:18Z 71cce7bd - tests + meta.lcov Lines: 80.1 % 662 530
Test Date: 2025-08-09 08:20:40 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "fmt"
       9              :         "strings"
      10              :         "sync"
      11              :         "time"
      12              : 
      13              :         "github.com/cockroachdb/crlib/crtime"
      14              :         "github.com/cockroachdb/errors"
      15              :         errorsjoin "github.com/cockroachdb/errors/join"
      16              :         "github.com/cockroachdb/pebble/internal/base"
      17              :         "github.com/cockroachdb/pebble/internal/humanize"
      18              :         "github.com/cockroachdb/pebble/internal/invariants"
      19              :         "github.com/cockroachdb/pebble/internal/manifest"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/objstorage/remote"
      22              :         "github.com/cockroachdb/pebble/vfs"
      23              :         "github.com/cockroachdb/redact"
      24              : )
      25              : 
      26              : // TableNum is an identifier for a table within a database.
      27              : type TableNum = base.TableNum
      28              : 
      29              : // TableInfo exports the manifest.TableInfo type.
      30              : type TableInfo = manifest.TableInfo
      31              : 
      32            2 : func tablesTotalSize(tables []TableInfo) uint64 {
      33            2 :         var size uint64
      34            2 :         for i := range tables {
      35            2 :                 size += tables[i].Size
      36            2 :         }
      37            2 :         return size
      38              : }
      39              : 
      40            2 : func formatFileNums(tables []TableInfo) string {
      41            2 :         var buf strings.Builder
      42            2 :         for i := range tables {
      43            2 :                 if i > 0 {
      44            2 :                         buf.WriteString(" ")
      45            2 :                 }
      46            2 :                 buf.WriteString(tables[i].FileNum.String())
      47              :         }
      48            2 :         return buf.String()
      49              : }
      50              : 
      51              : // DataCorruptionInfo contains the information for a DataCorruption event.
      52              : type DataCorruptionInfo struct {
      53              :         // Path of the file that is corrupted. For remote files the path starts with
      54              :         // "remote://".
      55              :         Path     string
      56              :         IsRemote bool
      57              :         // Locator is only set when IsRemote is true (note that an empty Locator is
      58              :         // valid even then).
      59              :         Locator remote.Locator
      60              :         // Bounds indicates the keyspace range that is affected.
      61              :         Bounds base.UserKeyBounds
      62              :         // Details of the error. See cockroachdb/error for how to format with or
      63              :         // without redaction.
      64              :         Details error
      65              : }
      66              : 
      67            1 : func (i DataCorruptionInfo) String() string {
      68            1 :         return redact.StringWithoutMarkers(i)
      69            1 : }
      70              : 
      71              : // SafeFormat implements redact.SafeFormatter.
      72            1 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
      73            1 :         w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
      74            1 :         if i.IsRemote {
      75            0 :                 w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
      76            0 :         }
      77            1 :         w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
      78              : }
      79              : 
      80              : // LevelInfo contains info pertaining to a particular level.
      81              : type LevelInfo struct {
      82              :         Level  int
      83              :         Tables []TableInfo
      84              :         Score  float64
      85              : }
      86              : 
      87            0 : func (i LevelInfo) String() string {
      88            0 :         return redact.StringWithoutMarkers(i)
      89            0 : }
      90              : 
      91              : // SafeFormat implements redact.SafeFormatter.
      92            2 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
      93            2 :         w.Printf("L%d [%s] (%s) Score=%.2f",
      94            2 :                 redact.Safe(i.Level),
      95            2 :                 redact.Safe(formatFileNums(i.Tables)),
      96            2 :                 redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
      97            2 :                 redact.Safe(i.Score))
      98            2 : }
      99              : 
     100              : // BlobFileCreateInfo contains the info for a blob file creation event.
     101              : type BlobFileCreateInfo struct {
     102              :         JobID int
     103              :         // Reason is the reason for the table creation: "compacting", "flushing", or
     104              :         // "ingesting".
     105              :         Reason  string
     106              :         Path    string
     107              :         FileNum base.DiskFileNum
     108              : }
     109              : 
     110            2 : func (i BlobFileCreateInfo) String() string {
     111            2 :         return redact.StringWithoutMarkers(i)
     112            2 : }
     113              : 
     114              : // SafeFormat implements redact.SafeFormatter.
     115            2 : func (i BlobFileCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     116            2 :         w.Printf("[JOB %d] %s: blob file created %s",
     117            2 :                 redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
     118            2 : }
     119              : 
     120              : // BlobFileDeleteInfo contains the info for a blob file deletion event.
     121              : type BlobFileDeleteInfo struct {
     122              :         JobID   int
     123              :         Path    string
     124              :         FileNum base.DiskFileNum
     125              :         Err     error
     126              : }
     127              : 
     128            2 : func (i BlobFileDeleteInfo) String() string {
     129            2 :         return redact.StringWithoutMarkers(i)
     130            2 : }
     131              : 
     132              : // SafeFormat implements redact.SafeFormatter.
     133            2 : func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     134            2 :         if i.Err != nil {
     135            0 :                 w.Printf("[JOB %d] blob file delete error %s: %s",
     136            0 :                         redact.Safe(i.JobID), i.FileNum, i.Err)
     137            0 :                 return
     138            0 :         }
     139            2 :         w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
     140              : }
     141              : 
     142              : // BlobFileRewriteInfo contains the info for a blob file rewrite event.
     143              : type BlobFileRewriteInfo struct {
     144              :         // JobID is the ID of the job.
     145              :         JobID int
     146              :         // Input contains the input tables for the compaction organized by level.
     147              :         Input BlobFileInfo
     148              :         // Output contains the output tables generated by the compaction. The output
     149              :         // info is empty for the compaction begin event.
     150              :         Output BlobFileInfo
     151              :         // Duration is the time spent compacting, including reading and writing
     152              :         // files.
     153              :         Duration time.Duration
     154              :         // TotalDuration is the total wall-time duration of the compaction,
     155              :         // including applying the compaction to the database. TotalDuration is
     156              :         // always ≥ Duration.
     157              :         TotalDuration time.Duration
     158              :         Done          bool
     159              :         // Err is set only if Done is true. If non-nil, indicates that the compaction
     160              :         // failed. Note that err can be ErrCancelledCompaction, which can happen
     161              :         // during normal operation.
     162              :         Err error
     163              : }
     164              : 
     165            2 : func (i BlobFileRewriteInfo) String() string {
     166            2 :         return redact.StringWithoutMarkers(i)
     167            2 : }
     168              : 
     169              : // SafeFormat implements redact.SafeFormatter.
     170            2 : func (i BlobFileRewriteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     171            2 :         if i.Err != nil {
     172            1 :                 w.Printf("[JOB %d] blob file (%s, %s) rewrite error: %s",
     173            1 :                         redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum, i.Err)
     174            1 :                 return
     175            1 :         }
     176              : 
     177            2 :         if !i.Done {
     178            2 :                 w.Printf("[JOB %d] rewriting blob file %s (physical file %s)",
     179            2 :                         redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum)
     180            2 :                 return
     181            2 :         }
     182            2 :         w.Printf("[JOB %d] rewrote blob file (%s, %s) -> (%s, %s), in %.1fs (%.1fs total)",
     183            2 :                 redact.Safe(i.JobID), i.Input.BlobFileID, i.Input.DiskFileNum,
     184            2 :                 i.Output.BlobFileID, i.Output.DiskFileNum,
     185            2 :                 redact.Safe(i.Duration.Seconds()),
     186            2 :                 redact.Safe(i.TotalDuration.Seconds()))
     187              : }
     188              : 
     189              : // BlobFileInfo describes a blob file.
     190              : type BlobFileInfo struct {
     191              :         // BlobFileID is the logical ID of the blob file.
     192              :         BlobFileID base.BlobFileID
     193              :         // DiskFileNum is the file number of the blob file on disk.
     194              :         DiskFileNum base.DiskFileNum
     195              :         // Size is the physical size of the file in bytes.
     196              :         Size uint64
     197              :         // ValueSize is the pre-compressed size of the values in the blob file in
     198              :         // bytes.
     199              :         ValueSize uint64
     200              : }
     201              : 
     202              : // CompactionInfo contains the info for a compaction event.
     203              : type CompactionInfo struct {
     204              :         // JobID is the ID of the compaction job.
     205              :         JobID int
     206              :         // Reason is the reason for the compaction.
     207              :         Reason string
     208              :         // Input contains the input tables for the compaction organized by level.
     209              :         Input []LevelInfo
     210              :         // Output contains the output tables generated by the compaction. The output
     211              :         // tables are empty for the compaction begin event.
     212              :         Output LevelInfo
     213              :         // Duration is the time spent compacting, including reading and writing
     214              :         // sstables.
     215              :         Duration time.Duration
     216              :         // TotalDuration is the total wall-time duration of the compaction,
     217              :         // including applying the compaction to the database. TotalDuration is
     218              :         // always ≥ Duration.
     219              :         TotalDuration time.Duration
     220              :         Done          bool
     221              :         // Err is set only if Done is true. If non-nil, indicates that the compaction
     222              :         // failed. Note that err can be ErrCancelledCompaction, which can happen
     223              :         // during normal operation.
     224              :         Err error
     225              : 
     226              :         SingleLevelOverlappingRatio float64
     227              :         MultiLevelOverlappingRatio  float64
     228              : 
     229              :         // Annotations specifies additional info to appear in a compaction's event log line
     230              :         Annotations compactionAnnotations
     231              : }
     232              : 
     233              : type compactionAnnotations []string
     234              : 
     235              : // SafeFormat implements redact.SafeFormatter.
     236            2 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
     237            2 :         if len(ca) == 0 {
     238            0 :                 return
     239            0 :         }
     240            2 :         for i := range ca {
     241            2 :                 if i != 0 {
     242            2 :                         w.Print(" ")
     243            2 :                 }
     244            2 :                 w.Printf("%s", redact.SafeString(ca[i]))
     245              :         }
     246              : }
     247              : 
     248            2 : func (i CompactionInfo) String() string {
     249            2 :         return redact.StringWithoutMarkers(i)
     250            2 : }
     251              : 
     252              : // SafeFormat implements redact.SafeFormatter.
     253            2 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     254            2 :         if i.Err != nil {
     255            2 :                 w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
     256            2 :                         redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
     257            2 :                 return
     258            2 :         }
     259              : 
     260            2 :         if !i.Done {
     261            2 :                 w.Printf("[JOB %d] compacting(%s) ",
     262            2 :                         redact.Safe(i.JobID),
     263            2 :                         redact.SafeString(i.Reason))
     264            2 :                 if len(i.Annotations) > 0 {
     265            2 :                         w.Printf("%s ", i.Annotations)
     266            2 :                 }
     267            2 :                 w.Printf("%s; ", levelInfos(i.Input))
     268            2 :                 w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
     269            2 :                 return
     270              :         }
     271            2 :         outputSize := tablesTotalSize(i.Output.Tables)
     272            2 :         w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
     273            2 :         if len(i.Annotations) > 0 {
     274            2 :                 w.Printf("%s ", i.Annotations)
     275            2 :         }
     276            2 :         w.Print(levelInfos(i.Input))
     277            2 :         w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
     278            2 :                 redact.Safe(i.Output.Level),
     279            2 :                 redact.Safe(formatFileNums(i.Output.Tables)),
     280            2 :                 redact.Safe(humanize.Bytes.Uint64(outputSize)),
     281            2 :                 redact.Safe(i.Duration.Seconds()),
     282            2 :                 redact.Safe(i.TotalDuration.Seconds()),
     283            2 :                 redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     284              : }
     285              : 
     286              : type levelInfos []LevelInfo
     287              : 
     288            2 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
     289            2 :         for j, levelInfo := range i {
     290            2 :                 if j > 0 {
     291            2 :                         w.Printf(" + ")
     292            2 :                 }
     293            2 :                 w.Print(levelInfo)
     294              :         }
     295              : }
     296              : 
     297              : // DiskSlowInfo contains the info for a disk slowness event when writing to a
     298              : // file.
     299              : type DiskSlowInfo = vfs.DiskSlowInfo
     300              : 
     301              : // FlushInfo contains the info for a flush event.
     302              : type FlushInfo struct {
     303              :         // JobID is the ID of the flush job.
     304              :         JobID int
     305              :         // Reason is the reason for the flush.
     306              :         Reason string
     307              :         // Input contains the count of input memtables that were flushed.
     308              :         Input int
     309              :         // InputBytes contains the total in-memory size of the memtable(s) that were
     310              :         // flushed. This size includes skiplist indexing data structures.
     311              :         InputBytes uint64
     312              :         // Output contains the ouptut table generated by the flush. The output info
     313              :         // is empty for the flush begin event.
     314              :         Output []TableInfo
     315              :         // Duration is the time spent flushing. This duration includes writing and
     316              :         // syncing all of the flushed keys to sstables.
     317              :         Duration time.Duration
     318              :         // TotalDuration is the total wall-time duration of the flush, including
     319              :         // applying the flush to the database. TotalDuration is always ≥ Duration.
     320              :         TotalDuration time.Duration
     321              :         // Ingest is set to true if the flush is handling tables that were added to
     322              :         // the flushable queue via an ingestion operation.
     323              :         Ingest bool
     324              :         // IngestLevels are the output levels for each ingested table in the flush.
     325              :         // This field is only populated when Ingest is true.
     326              :         IngestLevels []int
     327              :         Done         bool
     328              :         Err          error
     329              : }
     330              : 
     331            2 : func (i FlushInfo) String() string {
     332            2 :         return redact.StringWithoutMarkers(i)
     333            2 : }
     334              : 
     335              : // SafeFormat implements redact.SafeFormatter.
     336            2 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     337            2 :         if i.Err != nil {
     338            2 :                 w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
     339            2 :                 return
     340            2 :         }
     341              : 
     342            2 :         plural := redact.SafeString("s")
     343            2 :         if i.Input == 1 {
     344            2 :                 plural = ""
     345            2 :         }
     346            2 :         if !i.Done {
     347            2 :                 w.Printf("[JOB %d] ", redact.Safe(i.JobID))
     348            2 :                 if !i.Ingest {
     349            2 :                         w.Printf("flushing %d memtable", redact.Safe(i.Input))
     350            2 :                         w.SafeString(plural)
     351            2 :                         w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
     352            2 :                 } else {
     353            2 :                         w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
     354            2 :                 }
     355            2 :                 return
     356              :         }
     357              : 
     358            2 :         outputSize := tablesTotalSize(i.Output)
     359            2 :         if !i.Ingest {
     360            2 :                 if invariants.Enabled && len(i.IngestLevels) > 0 {
     361            0 :                         panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
     362              :                 }
     363            2 :                 w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
     364            2 :                         redact.Safe(i.JobID), redact.Safe(i.Input), plural,
     365            2 :                         redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
     366            2 :                         redact.Safe(formatFileNums(i.Output)),
     367            2 :                         redact.Safe(humanize.Bytes.Uint64(outputSize)),
     368            2 :                         redact.Safe(i.Duration.Seconds()),
     369            2 :                         redact.Safe(i.TotalDuration.Seconds()),
     370            2 :                         redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     371            2 :         } else {
     372            2 :                 if invariants.Enabled && len(i.IngestLevels) == 0 {
     373            0 :                         panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
     374              :                 }
     375            2 :                 w.Printf("[JOB %d] flushed %d ingested flushable%s",
     376            2 :                         redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
     377            2 :                 for j, level := range i.IngestLevels {
     378            2 :                         file := i.Output[j]
     379            2 :                         if j > 0 {
     380            2 :                                 w.Printf(" +")
     381            2 :                         }
     382            2 :                         w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
     383              :                 }
     384            2 :                 w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
     385            2 :                         redact.Safe(i.Duration.Seconds()),
     386            2 :                         redact.Safe(i.TotalDuration.Seconds()),
     387            2 :                         redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     388              :         }
     389              : }
     390              : 
     391              : // DownloadInfo contains the info for a DB.Download() event.
     392              : type DownloadInfo struct {
     393              :         // JobID is the ID of the download job.
     394              :         JobID int
     395              : 
     396              :         Spans []DownloadSpan
     397              : 
     398              :         // Duration is the time since the operation was started.
     399              :         Duration                    time.Duration
     400              :         DownloadCompactionsLaunched int
     401              : 
     402              :         // RestartCount indicates that the download operation restarted because it
     403              :         // noticed that new external files were ingested. A DownloadBegin event with
     404              :         // RestartCount = 0 is the start of the operation; each time we restart it we
     405              :         // have another DownloadBegin event with RestartCount > 0.
     406              :         RestartCount int
     407              :         Done         bool
     408              :         Err          error
     409              : }
     410              : 
     411            2 : func (i DownloadInfo) String() string {
     412            2 :         return redact.StringWithoutMarkers(i)
     413            2 : }
     414              : 
     415              : // SafeFormat implements redact.SafeFormatter.
     416            2 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     417            2 :         switch {
     418            0 :         case i.Err != nil:
     419            0 :                 w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
     420              : 
     421            2 :         case i.Done:
     422            2 :                 w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
     423            2 :                         redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
     424              : 
     425            2 :         default:
     426            2 :                 if i.RestartCount == 0 {
     427            2 :                         w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
     428            2 :                 } else {
     429            0 :                         w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
     430            0 :                                 redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
     431            0 :                                 redact.Safe(i.DownloadCompactionsLaunched))
     432            0 :                 }
     433              :         }
     434              : }
     435              : 
     436              : // ManifestCreateInfo contains info about a manifest creation event.
     437              : type ManifestCreateInfo struct {
     438              :         // JobID is the ID of the job the caused the manifest to be created.
     439              :         JobID int
     440              :         Path  string
     441              :         // The file number of the new Manifest.
     442              :         FileNum base.DiskFileNum
     443              :         Err     error
     444              : }
     445              : 
     446            2 : func (i ManifestCreateInfo) String() string {
     447            2 :         return redact.StringWithoutMarkers(i)
     448            2 : }
     449              : 
     450              : // SafeFormat implements redact.SafeFormatter.
     451            2 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     452            2 :         if i.Err != nil {
     453            0 :                 w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
     454            0 :                 return
     455            0 :         }
     456            2 :         w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
     457              : }
     458              : 
     459              : // ManifestDeleteInfo contains the info for a Manifest deletion event.
     460              : type ManifestDeleteInfo struct {
     461              :         // JobID is the ID of the job the caused the Manifest to be deleted.
     462              :         JobID   int
     463              :         Path    string
     464              :         FileNum base.DiskFileNum
     465              :         Err     error
     466              : }
     467              : 
     468            2 : func (i ManifestDeleteInfo) String() string {
     469            2 :         return redact.StringWithoutMarkers(i)
     470            2 : }
     471              : 
     472              : // SafeFormat implements redact.SafeFormatter.
     473            2 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     474            2 :         if i.Err != nil {
     475            0 :                 w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
     476            0 :                 return
     477            0 :         }
     478            2 :         w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
     479              : }
     480              : 
     481              : // TableCreateInfo contains the info for a table creation event.
     482              : type TableCreateInfo struct {
     483              :         JobID int
     484              :         // Reason is the reason for the table creation: "compacting", "flushing", or
     485              :         // "ingesting".
     486              :         Reason  string
     487              :         Path    string
     488              :         FileNum base.DiskFileNum
     489              : }
     490              : 
     491            2 : func (i TableCreateInfo) String() string {
     492            2 :         return redact.StringWithoutMarkers(i)
     493            2 : }
     494              : 
     495              : // SafeFormat implements redact.SafeFormatter.
     496            2 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     497            2 :         w.Printf("[JOB %d] %s: sstable created %s",
     498            2 :                 redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
     499            2 : }
     500              : 
     501              : // TableDeleteInfo contains the info for a table deletion event.
     502              : type TableDeleteInfo struct {
     503              :         JobID   int
     504              :         Path    string
     505              :         FileNum base.DiskFileNum
     506              :         Err     error
     507              : }
     508              : 
     509            2 : func (i TableDeleteInfo) String() string {
     510            2 :         return redact.StringWithoutMarkers(i)
     511            2 : }
     512              : 
     513              : // SafeFormat implements redact.SafeFormatter.
     514            2 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     515            2 :         if i.Err != nil {
     516            0 :                 w.Printf("[JOB %d] sstable delete error %s: %s",
     517            0 :                         redact.Safe(i.JobID), i.FileNum, i.Err)
     518            0 :                 return
     519            0 :         }
     520            2 :         w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
     521              : }
     522              : 
     523              : // TableIngestInfo contains the info for a table ingestion event.
     524              : type TableIngestInfo struct {
     525              :         // JobID is the ID of the job the caused the table to be ingested.
     526              :         JobID  int
     527              :         Tables []struct {
     528              :                 TableInfo
     529              :                 Level int
     530              :         }
     531              :         // GlobalSeqNum is the sequence number that was assigned to all entries in
     532              :         // the ingested table.
     533              :         GlobalSeqNum base.SeqNum
     534              :         // flushable indicates whether the ingested sstable was treated as a
     535              :         // flushable.
     536              :         flushable bool
     537              :         Err       error
     538              :         // WaitFlushDuration is the time spent waiting for memtable flushes to
     539              :         // complete, given that an overlap between ingesting sstables and memtables
     540              :         // exists.
     541              :         WaitFlushDuration time.Duration
     542              :         // ManifestUpdateDuration is the time spent updating the manifest.
     543              :         ManifestUpdateDuration time.Duration
     544              :         // BlockReadDuration is the total time spent reading blocks for the ingested
     545              :         // sstable.
     546              :         BlockReadDuration time.Duration
     547              :         // BlockReadBytes is the total number of bytes from blocks read for the
     548              :         // ingested sstable. This does not include bytes read from the block cache.
     549              :         BlockReadBytes uint64
     550              : }
     551              : 
     552            2 : func (i TableIngestInfo) String() string {
     553            2 :         return redact.StringWithoutMarkers(i)
     554            2 : }
     555              : 
     556              : // SafeFormat implements redact.SafeFormatter.
     557            2 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     558            2 :         if i.Err != nil {
     559            0 :                 w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
     560            0 :                 return
     561            0 :         }
     562              : 
     563            2 :         if i.flushable {
     564            2 :                 w.Printf("[JOB %d] ingested as flushable, memtable flushes took %.1fs:", redact.Safe(i.JobID),
     565            2 :                         redact.Safe(i.WaitFlushDuration.Seconds()))
     566            2 :         } else {
     567            2 :                 w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
     568            2 :         }
     569              : 
     570            2 :         for j := range i.Tables {
     571            2 :                 t := &i.Tables[j]
     572            2 :                 if j > 0 {
     573            2 :                         w.Printf(",")
     574            2 :                 }
     575            2 :                 levelStr := ""
     576            2 :                 if !i.flushable {
     577            2 :                         levelStr = fmt.Sprintf("L%d:", t.Level)
     578            2 :                 }
     579            2 :                 w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
     580            2 :                         redact.Safe(humanize.Bytes.Uint64(t.Size)))
     581              :         }
     582            2 :         w.Printf("; manifest update took %.1fs; block reads took %.1fs with %s block bytes read",
     583            2 :                 redact.Safe(i.ManifestUpdateDuration.Seconds()), redact.Safe(i.BlockReadDuration.Seconds()),
     584            2 :                 redact.Safe(humanize.Bytes.Uint64(i.BlockReadBytes)))
     585              : }
     586              : 
     587              : // TableStatsInfo contains the info for a table stats loaded event.
     588              : type TableStatsInfo struct {
     589              :         // JobID is the ID of the job that finished loading the initial tables'
     590              :         // stats.
     591              :         JobID int
     592              : }
     593              : 
     594            2 : func (i TableStatsInfo) String() string {
     595            2 :         return redact.StringWithoutMarkers(i)
     596            2 : }
     597              : 
     598              : // SafeFormat implements redact.SafeFormatter.
     599            2 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     600            2 :         w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
     601            2 : }
     602              : 
     603              : // TableValidatedInfo contains information on the result of a validation run
     604              : // on an sstable.
     605              : type TableValidatedInfo struct {
     606              :         JobID int
     607              :         Meta  *manifest.TableMetadata
     608              : }
     609              : 
     610            2 : func (i TableValidatedInfo) String() string {
     611            2 :         return redact.StringWithoutMarkers(i)
     612            2 : }
     613              : 
     614              : // SafeFormat implements redact.SafeFormatter.
     615            2 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     616            2 :         w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
     617            2 : }
     618              : 
     619              : // WALCreateInfo contains info about a WAL creation event.
     620              : type WALCreateInfo struct {
     621              :         // JobID is the ID of the job the caused the WAL to be created.
     622              :         JobID int
     623              :         Path  string
     624              :         // The file number of the new WAL.
     625              :         FileNum base.DiskFileNum
     626              :         // The file number of a previous WAL which was recycled to create this
     627              :         // one. Zero if recycling did not take place.
     628              :         RecycledFileNum base.DiskFileNum
     629              :         Err             error
     630              : }
     631              : 
     632            2 : func (i WALCreateInfo) String() string {
     633            2 :         return redact.StringWithoutMarkers(i)
     634            2 : }
     635              : 
     636              : // SafeFormat implements redact.SafeFormatter.
     637            2 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     638            2 :         if i.Err != nil {
     639            0 :                 w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
     640            0 :                 return
     641            0 :         }
     642              : 
     643            2 :         if i.RecycledFileNum == 0 {
     644            2 :                 w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
     645            2 :                 return
     646            2 :         }
     647              : 
     648            1 :         w.Printf("[JOB %d] WAL created %s (recycled %s)",
     649            1 :                 redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
     650              : }
     651              : 
     652              : // WALDeleteInfo contains the info for a WAL deletion event.
     653              : //
     654              : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
     655              : // is insufficient to infer whether primary or secondary.
     656              : type WALDeleteInfo struct {
     657              :         // JobID is the ID of the job the caused the WAL to be deleted.
     658              :         JobID   int
     659              :         Path    string
     660              :         FileNum base.DiskFileNum
     661              :         Err     error
     662              : }
     663              : 
     664            2 : func (i WALDeleteInfo) String() string {
     665            2 :         return redact.StringWithoutMarkers(i)
     666            2 : }
     667              : 
     668              : // SafeFormat implements redact.SafeFormatter.
     669            2 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     670            2 :         if i.Err != nil {
     671            1 :                 w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
     672            1 :                 return
     673            1 :         }
     674            2 :         w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
     675              : }
     676              : 
     677              : // WriteStallBeginInfo contains the info for a write stall begin event.
     678              : type WriteStallBeginInfo struct {
     679              :         Reason string
     680              : }
     681              : 
     682            2 : func (i WriteStallBeginInfo) String() string {
     683            2 :         return redact.StringWithoutMarkers(i)
     684            2 : }
     685              : 
     686              : // SafeFormat implements redact.SafeFormatter.
     687            2 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     688            2 :         w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
     689            2 : }
     690              : 
     691              : // LowDiskSpaceInfo contains the information for a LowDiskSpace
     692              : // event.
     693              : type LowDiskSpaceInfo struct {
     694              :         // AvailBytes is the disk space available to the current process in bytes.
     695              :         AvailBytes uint64
     696              :         // TotalBytes is the total disk space in bytes.
     697              :         TotalBytes uint64
     698              :         // PercentThreshold is one of a set of fixed percentages in the
     699              :         // lowDiskSpaceThresholds below. This event was issued because the disk
     700              :         // space went below this threshold.
     701              :         PercentThreshold int
     702              : }
     703              : 
     704            0 : func (i LowDiskSpaceInfo) String() string {
     705            0 :         return redact.StringWithoutMarkers(i)
     706            0 : }
     707              : 
     708              : // SafeFormat implements redact.SafeFormatter.
     709            0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     710            0 :         w.Printf(
     711            0 :                 "available disk space under %d%% (%s of %s)",
     712            0 :                 redact.Safe(i.PercentThreshold),
     713            0 :                 redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
     714            0 :                 redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
     715            0 :         )
     716            0 : }
     717              : 
     718              : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
     719              : type PossibleAPIMisuseInfo struct {
     720              :         Kind APIMisuseKind
     721              : 
     722              :         // UserKey is set for the following kinds:
     723              :         //  - IneffectualSingleDelete,
     724              :         //  - NondeterministicSingleDelete,
     725              :         //  - MissizedDelete,
     726              :         //  - InvalidValue.
     727              :         UserKey []byte
     728              : 
     729              :         // ExtraInfo is set for the following kinds:
     730              :         //  - MissizedDelete: contains "elidedSize=<size>,expectedSize=<size>"
     731              :         //  - InvalidValue: contains "callback=<callbackName>,value=<value>,err=<err>"
     732              :         ExtraInfo redact.RedactableString
     733              : }
     734              : 
     735            2 : func (i PossibleAPIMisuseInfo) String() string {
     736            2 :         return redact.StringWithoutMarkers(i)
     737            2 : }
     738              : 
     739              : // SafeFormat implements redact.SafeFormatter.
     740            2 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     741            2 :         switch i.Kind {
     742            2 :         case IneffectualSingleDelete, NondeterministicSingleDelete:
     743            2 :                 w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
     744            2 :         case MissizedDelete:
     745            2 :                 w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
     746            0 :         case InvalidValue:
     747            0 :                 w.Printf("possible API misuse: %s (key=%q, %s)", redact.Safe(i.Kind), i.UserKey, i.ExtraInfo)
     748            0 :         default:
     749            0 :                 if invariants.Enabled {
     750            0 :                         panic("invalid API misuse event")
     751              :                 }
     752            0 :                 w.Printf("invalid API misuse event")
     753              :         }
     754              : }
     755              : 
     756              : // APIMisuseKind identifies the type of API misuse represented by a
     757              : // PossibleAPIMisuse event.
     758              : type APIMisuseKind int8
     759              : 
     760              : const (
     761              :         // IneffectualSingleDelete is emitted in compactions/flushes if any
     762              :         // single delete is being elided without deleting a point set/merge.
     763              :         //
     764              :         // This event can sometimes be a false positive because of delete-only
     765              :         // compactions which can cause a recent RANGEDEL to peek below an older
     766              :         // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
     767              :         //
     768              :         // Example:
     769              :         //   RANGEDEL [a, c)#10 in L0
     770              :         //   SINGLEDEL b#5 in L1
     771              :         //   SET b#3 in L6
     772              :         //
     773              :         // If the L6 file containing the SET is narrow and the L1 file containing
     774              :         // the SINGLEDEL is wide, a delete-only compaction can remove the file in
     775              :         // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
     776              :         // compacted down, it will not find any SET to delete, resulting in the
     777              :         // ineffectual callback.
     778              :         IneffectualSingleDelete APIMisuseKind = iota
     779              : 
     780              :         // NondeterministicSingleDelete is emitted in compactions/flushes if any
     781              :         // single delete has consumed a Set/Merge, and there is another immediately
     782              :         // older Set/SetWithDelete/Merge. The user of Pebble has violated the
     783              :         // invariant under which SingleDelete can be used correctly.
     784              :         //
     785              :         // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
     786              :         // ways some of these keys can first meet in a compaction.
     787              :         //
     788              :         // - All 3 keys in the same compaction: this callback will detect the
     789              :         //   violation.
     790              :         //
     791              :         // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
     792              :         //   disappear. The violation will not be detected, and the DB will have
     793              :         //   Set#1 which is likely incorrect (from the user's perspective).
     794              :         //
     795              :         // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
     796              :         //   which will later be consumed by SingleDelete#3. The violation will
     797              :         //   not be detected and the DB will be correct.
     798              :         //
     799              :         // This event can sometimes be a false positive because of delete-only
     800              :         // compactions which can cause a recent RANGEDEL to peek below an older
     801              :         // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
     802              :         //
     803              :         // Example:
     804              :         //   RANGEDEL [a, z)#60 in L0
     805              :         //   SINGLEDEL g#50 in L1
     806              :         //   SET g#40 in L2
     807              :         //   RANGEDEL [g,h)#30 in L3
     808              :         //   SET g#20 in L6
     809              :         //
     810              :         // In this example, the two SETs represent the same user write, and the
     811              :         // RANGEDELs are caused by the CockroachDB range being dropped. That is,
     812              :         // the user wrote to g once, range was dropped, then added back, which
     813              :         // caused the SET again, then at some point g was validly deleted using a
     814              :         // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
     815              :         // get fragmented due to compactions it has been part of. Say this L3 file
     816              :         // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
     817              :         // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
     818              :         // using a delete-only compaction, resulting in an LSM with state:
     819              :         //
     820              :         //   RANGEDEL [a, z)#60 in L0
     821              :         //   SINGLEDEL g#50 in L1
     822              :         //   SET g#40 in L2
     823              :         //   SET g#20 in L6
     824              :         //
     825              :         // A multi-level compaction involving L1, L2, L6 will cause the invariant
     826              :         // violation callback. This example doesn't need multi-level compactions:
     827              :         // say there was a Pebble snapshot at g#21 preventing g#20 from being
     828              :         // dropped when it meets g#40 in a compaction. That snapshot will not save
     829              :         // RANGEDEL [g,h)#30, so we can have:
     830              :         //
     831              :         //   SINGLEDEL g#50 in L1
     832              :         //   SET g#40, SET g#20 in L6
     833              :         //
     834              :         // And say the snapshot is removed and then the L1 and L6 compaction
     835              :         // happens, resulting in the invariant violation callback.
     836              :         NondeterministicSingleDelete
     837              : 
     838              :         // MissizedDelete is emitted when a DELSIZED tombstone is found that did
     839              :         // not accurately record the size of the value it deleted. This can lead to
     840              :         // incorrect behavior in compactions.
     841              :         MissizedDelete
     842              : 
     843              :         // InvalidValue is emitted when a user-implemented callback (such as
     844              :         // ShortAttributeExtractor) returns an error for a committed value. This
     845              :         // suggests that either the callback is not implemented for all possible
     846              :         // values or a malformed value was committed to the DB.
     847              :         InvalidValue
     848              : )
     849              : 
     850            2 : func (k APIMisuseKind) String() string {
     851            2 :         switch k {
     852            2 :         case IneffectualSingleDelete:
     853            2 :                 return "ineffectual SINGLEDEL"
     854            0 :         case NondeterministicSingleDelete:
     855            0 :                 return "nondeterministic SINGLEDEL"
     856            2 :         case MissizedDelete:
     857            2 :                 return "missized DELSIZED"
     858            0 :         case InvalidValue:
     859            0 :                 return "invalid value"
     860            0 :         default:
     861            0 :                 return "unknown"
     862              :         }
     863              : }
     864              : 
     865              : // EventListener contains a set of functions that will be invoked when various
     866              : // significant DB events occur. Note that the functions should not run for an
     867              : // excessive amount of time as they are invoked synchronously by the DB and may
     868              : // block continued DB work. For a similar reason it is advisable to not perform
     869              : // any synchronous calls back into the DB.
     870              : type EventListener struct {
     871              :         // BackgroundError is invoked whenever an error occurs during a background
     872              :         // operation such as flush or compaction.
     873              :         BackgroundError func(error)
     874              : 
     875              :         // BlobFileCreated is invoked after a blob file has been created.
     876              :         BlobFileCreated func(BlobFileCreateInfo)
     877              : 
     878              :         // BlobFileDeleted is invoked after a blob file has been deleted.
     879              :         BlobFileDeleted func(BlobFileDeleteInfo)
     880              : 
     881              :         // BlobFileRewriteBegin is invoked when a blob file rewrite compaction begins.
     882              :         BlobFileRewriteBegin func(BlobFileRewriteInfo)
     883              : 
     884              :         // BlobFileRewriteEnd is invoked when a blob file rewrite compaction ends.
     885              :         BlobFileRewriteEnd func(BlobFileRewriteInfo)
     886              : 
     887              :         // DataCorruption is invoked when an on-disk corruption is detected. It should
     888              :         // not block, as it is called synchronously in read paths.
     889              :         DataCorruption func(DataCorruptionInfo)
     890              : 
     891              :         // CompactionBegin is invoked after the inputs to a compaction have been
     892              :         // determined, but before the compaction has produced any output.
     893              :         CompactionBegin func(CompactionInfo)
     894              : 
     895              :         // CompactionEnd is invoked after a compaction has completed and the result
     896              :         // has been installed.
     897              :         CompactionEnd func(CompactionInfo)
     898              : 
     899              :         // DiskSlow is invoked after a disk write operation on a file created with a
     900              :         // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
     901              :         // observed to exceed the specified disk slowness threshold duration. DiskSlow
     902              :         // is called on a goroutine that is monitoring slowness/stuckness. The callee
     903              :         // MUST return without doing any IO, or blocking on anything (like a mutex)
     904              :         // that is waiting on IO. This is imperative in order to reliably monitor for
     905              :         // slowness, since if this goroutine gets stuck, the monitoring will stop
     906              :         // working.
     907              :         DiskSlow func(DiskSlowInfo)
     908              : 
     909              :         // FlushBegin is invoked after the inputs to a flush have been determined,
     910              :         // but before the flush has produced any output.
     911              :         FlushBegin func(FlushInfo)
     912              : 
     913              :         // FlushEnd is invoked after a flush has complated and the result has been
     914              :         // installed.
     915              :         FlushEnd func(FlushInfo)
     916              : 
     917              :         // DownloadBegin is invoked when a db.Download operation starts or restarts
     918              :         // (restarts are caused by new external tables being ingested during the
     919              :         // operation).
     920              :         DownloadBegin func(DownloadInfo)
     921              : 
     922              :         // DownloadEnd is invoked when a db.Download operation completes.
     923              :         DownloadEnd func(DownloadInfo)
     924              : 
     925              :         // FormatUpgrade is invoked after the database's FormatMajorVersion
     926              :         // is upgraded.
     927              :         FormatUpgrade func(FormatMajorVersion)
     928              : 
     929              :         // ManifestCreated is invoked after a manifest has been created.
     930              :         ManifestCreated func(ManifestCreateInfo)
     931              : 
     932              :         // ManifestDeleted is invoked after a manifest has been deleted.
     933              :         ManifestDeleted func(ManifestDeleteInfo)
     934              : 
     935              :         // TableCreated is invoked when a table has been created.
     936              :         TableCreated func(TableCreateInfo)
     937              : 
     938              :         // TableDeleted is invoked after a table has been deleted.
     939              :         TableDeleted func(TableDeleteInfo)
     940              : 
     941              :         // TableIngested is invoked after an externally created table has been
     942              :         // ingested via a call to DB.Ingest().
     943              :         TableIngested func(TableIngestInfo)
     944              : 
     945              :         // TableStatsLoaded is invoked at most once, when the table stats
     946              :         // collector has loaded statistics for all tables that existed at Open.
     947              :         TableStatsLoaded func(TableStatsInfo)
     948              : 
     949              :         // TableValidated is invoked after validation runs on an sstable.
     950              :         TableValidated func(TableValidatedInfo)
     951              : 
     952              :         // WALCreated is invoked after a WAL has been created.
     953              :         WALCreated func(WALCreateInfo)
     954              : 
     955              :         // WALDeleted is invoked after a WAL has been deleted.
     956              :         WALDeleted func(WALDeleteInfo)
     957              : 
     958              :         // WriteStallBegin is invoked when writes are intentionally delayed.
     959              :         WriteStallBegin func(WriteStallBeginInfo)
     960              : 
     961              :         // WriteStallEnd is invoked when delayed writes are released.
     962              :         WriteStallEnd func()
     963              : 
     964              :         // LowDiskSpace is invoked periodically when the disk space is running
     965              :         // low.
     966              :         LowDiskSpace func(LowDiskSpaceInfo)
     967              : 
     968              :         // PossibleAPIMisuse is invoked when a possible API misuse is detected.
     969              :         PossibleAPIMisuse func(PossibleAPIMisuseInfo)
     970              : }
     971              : 
     972              : // EnsureDefaults ensures that background error events are logged to the
     973              : // specified logger if a handler for those events hasn't been otherwise
     974              : // specified. Ensure all handlers are non-nil so that we don't have to check
     975              : // for nil-ness before invoking.
     976            2 : func (l *EventListener) EnsureDefaults(logger Logger) {
     977            2 :         if l.BackgroundError == nil {
     978            2 :                 if logger != nil {
     979            2 :                         l.BackgroundError = func(err error) {
     980            1 :                                 logger.Errorf("background error: %s", err)
     981            1 :                         }
     982            1 :                 } else {
     983            1 :                         l.BackgroundError = func(error) {}
     984              :                 }
     985              :         }
     986            2 :         if l.BlobFileCreated == nil {
     987            2 :                 l.BlobFileCreated = func(info BlobFileCreateInfo) {}
     988              :         }
     989            2 :         if l.BlobFileDeleted == nil {
     990            2 :                 l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
     991              :         }
     992            2 :         if l.BlobFileRewriteBegin == nil {
     993            2 :                 l.BlobFileRewriteBegin = func(info BlobFileRewriteInfo) {}
     994              :         }
     995            2 :         if l.BlobFileRewriteEnd == nil {
     996            2 :                 l.BlobFileRewriteEnd = func(info BlobFileRewriteInfo) {}
     997              :         }
     998            2 :         if l.DataCorruption == nil {
     999            2 :                 if logger != nil {
    1000            2 :                         l.DataCorruption = func(info DataCorruptionInfo) {
    1001            1 :                                 logger.Fatalf("%s", info)
    1002            1 :                         }
    1003            1 :                 } else {
    1004            1 :                         l.DataCorruption = func(info DataCorruptionInfo) {}
    1005              :                 }
    1006              :         }
    1007            2 :         if l.CompactionBegin == nil {
    1008            2 :                 l.CompactionBegin = func(info CompactionInfo) {}
    1009              :         }
    1010            2 :         if l.CompactionEnd == nil {
    1011            2 :                 l.CompactionEnd = func(info CompactionInfo) {}
    1012              :         }
    1013            2 :         if l.DiskSlow == nil {
    1014            2 :                 l.DiskSlow = func(info DiskSlowInfo) {}
    1015              :         }
    1016            2 :         if l.FlushBegin == nil {
    1017            2 :                 l.FlushBegin = func(info FlushInfo) {}
    1018              :         }
    1019            2 :         if l.FlushEnd == nil {
    1020            2 :                 l.FlushEnd = func(info FlushInfo) {}
    1021              :         }
    1022            2 :         if l.DownloadBegin == nil {
    1023            2 :                 l.DownloadBegin = func(info DownloadInfo) {}
    1024              :         }
    1025            2 :         if l.DownloadEnd == nil {
    1026            2 :                 l.DownloadEnd = func(info DownloadInfo) {}
    1027              :         }
    1028            2 :         if l.FormatUpgrade == nil {
    1029            2 :                 l.FormatUpgrade = func(v FormatMajorVersion) {}
    1030              :         }
    1031            2 :         if l.ManifestCreated == nil {
    1032            2 :                 l.ManifestCreated = func(info ManifestCreateInfo) {}
    1033              :         }
    1034            2 :         if l.ManifestDeleted == nil {
    1035            2 :                 l.ManifestDeleted = func(info ManifestDeleteInfo) {}
    1036              :         }
    1037            2 :         if l.TableCreated == nil {
    1038            2 :                 l.TableCreated = func(info TableCreateInfo) {}
    1039              :         }
    1040            2 :         if l.TableDeleted == nil {
    1041            2 :                 l.TableDeleted = func(info TableDeleteInfo) {}
    1042              :         }
    1043            2 :         if l.TableIngested == nil {
    1044            2 :                 l.TableIngested = func(info TableIngestInfo) {}
    1045              :         }
    1046            2 :         if l.TableStatsLoaded == nil {
    1047            2 :                 l.TableStatsLoaded = func(info TableStatsInfo) {}
    1048              :         }
    1049            2 :         if l.TableValidated == nil {
    1050            2 :                 l.TableValidated = func(validated TableValidatedInfo) {}
    1051              :         }
    1052            2 :         if l.WALCreated == nil {
    1053            2 :                 l.WALCreated = func(info WALCreateInfo) {}
    1054              :         }
    1055            2 :         if l.WALDeleted == nil {
    1056            2 :                 l.WALDeleted = func(info WALDeleteInfo) {}
    1057              :         }
    1058            2 :         if l.WriteStallBegin == nil {
    1059            2 :                 l.WriteStallBegin = func(info WriteStallBeginInfo) {}
    1060              :         }
    1061            2 :         if l.WriteStallEnd == nil {
    1062            2 :                 l.WriteStallEnd = func() {}
    1063              :         }
    1064            2 :         if l.LowDiskSpace == nil {
    1065            2 :                 l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
    1066              :         }
    1067            2 :         if l.PossibleAPIMisuse == nil {
    1068            2 :                 l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
    1069              :         }
    1070              : }
    1071              : 
    1072              : // MakeLoggingEventListener creates an EventListener that logs all events to the
    1073              : // specified logger.
    1074            2 : func MakeLoggingEventListener(logger Logger) EventListener {
    1075            2 :         if logger == nil {
    1076            1 :                 logger = DefaultLogger
    1077            1 :         }
    1078              : 
    1079            2 :         return EventListener{
    1080            2 :                 BackgroundError: func(err error) {
    1081            1 :                         logger.Errorf("background error: %s", err)
    1082            1 :                 },
    1083            2 :                 BlobFileCreated: func(info BlobFileCreateInfo) {
    1084            2 :                         logger.Infof("%s", info)
    1085            2 :                 },
    1086            2 :                 BlobFileDeleted: func(info BlobFileDeleteInfo) {
    1087            2 :                         logger.Infof("%s", info)
    1088            2 :                 },
    1089            2 :                 BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
    1090            2 :                         logger.Infof("%s", info)
    1091            2 :                 },
    1092            2 :                 BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
    1093            2 :                         logger.Infof("%s", info)
    1094            2 :                 },
    1095            0 :                 DataCorruption: func(info DataCorruptionInfo) {
    1096            0 :                         logger.Errorf("%s", info)
    1097            0 :                 },
    1098            2 :                 CompactionBegin: func(info CompactionInfo) {
    1099            2 :                         logger.Infof("%s", info)
    1100            2 :                 },
    1101            1 :                 CompactionEnd: func(info CompactionInfo) {
    1102            1 :                         logger.Infof("%s", info)
    1103            1 :                 },
    1104            0 :                 DiskSlow: func(info DiskSlowInfo) {
    1105            0 :                         logger.Infof("%s", info)
    1106            0 :                 },
    1107            2 :                 FlushBegin: func(info FlushInfo) {
    1108            2 :                         logger.Infof("%s", info)
    1109            2 :                 },
    1110            1 :                 FlushEnd: func(info FlushInfo) {
    1111            1 :                         logger.Infof("%s", info)
    1112            1 :                 },
    1113            2 :                 DownloadBegin: func(info DownloadInfo) {
    1114            2 :                         logger.Infof("%s", info)
    1115            2 :                 },
    1116            1 :                 DownloadEnd: func(info DownloadInfo) {
    1117            1 :                         logger.Infof("%s", info)
    1118            1 :                 },
    1119            2 :                 FormatUpgrade: func(v FormatMajorVersion) {
    1120            2 :                         logger.Infof("upgraded to format version: %s", v)
    1121            2 :                 },
    1122            1 :                 ManifestCreated: func(info ManifestCreateInfo) {
    1123            1 :                         logger.Infof("%s", info)
    1124            1 :                 },
    1125            1 :                 ManifestDeleted: func(info ManifestDeleteInfo) {
    1126            1 :                         logger.Infof("%s", info)
    1127            1 :                 },
    1128            2 :                 TableCreated: func(info TableCreateInfo) {
    1129            2 :                         logger.Infof("%s", info)
    1130            2 :                 },
    1131            1 :                 TableDeleted: func(info TableDeleteInfo) {
    1132            1 :                         logger.Infof("%s", info)
    1133            1 :                 },
    1134            1 :                 TableIngested: func(info TableIngestInfo) {
    1135            1 :                         logger.Infof("%s", info)
    1136            1 :                 },
    1137            2 :                 TableStatsLoaded: func(info TableStatsInfo) {
    1138            2 :                         logger.Infof("%s", info)
    1139            2 :                 },
    1140            2 :                 TableValidated: func(info TableValidatedInfo) {
    1141            2 :                         logger.Infof("%s", info)
    1142            2 :                 },
    1143            1 :                 WALCreated: func(info WALCreateInfo) {
    1144            1 :                         logger.Infof("%s", info)
    1145            1 :                 },
    1146            1 :                 WALDeleted: func(info WALDeleteInfo) {
    1147            1 :                         logger.Infof("%s", info)
    1148            1 :                 },
    1149            2 :                 WriteStallBegin: func(info WriteStallBeginInfo) {
    1150            2 :                         logger.Infof("%s", info)
    1151            2 :                 },
    1152            2 :                 WriteStallEnd: func() {
    1153            2 :                         logger.Infof("write stall ending")
    1154            2 :                 },
    1155            0 :                 LowDiskSpace: func(info LowDiskSpaceInfo) {
    1156            0 :                         logger.Infof("%s", info)
    1157            0 :                 },
    1158            2 :                 PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
    1159            2 :                         logger.Infof("%s", info)
    1160            2 :                 },
    1161              :         }
    1162              : }
    1163              : 
    1164              : // TeeEventListener wraps two EventListeners, forwarding all events to both.
    1165            1 : func TeeEventListener(a, b EventListener) EventListener {
    1166            1 :         a.EnsureDefaults(nil)
    1167            1 :         b.EnsureDefaults(nil)
    1168            1 :         return EventListener{
    1169            1 :                 BackgroundError: func(err error) {
    1170            0 :                         a.BackgroundError(err)
    1171            0 :                         b.BackgroundError(err)
    1172            0 :                 },
    1173            0 :                 BlobFileCreated: func(info BlobFileCreateInfo) {
    1174            0 :                         a.BlobFileCreated(info)
    1175            0 :                         b.BlobFileCreated(info)
    1176            0 :                 },
    1177            0 :                 BlobFileDeleted: func(info BlobFileDeleteInfo) {
    1178            0 :                         a.BlobFileDeleted(info)
    1179            0 :                         b.BlobFileDeleted(info)
    1180            0 :                 },
    1181            0 :                 BlobFileRewriteBegin: func(info BlobFileRewriteInfo) {
    1182            0 :                         a.BlobFileRewriteBegin(info)
    1183            0 :                         b.BlobFileRewriteBegin(info)
    1184            0 :                 },
    1185            0 :                 BlobFileRewriteEnd: func(info BlobFileRewriteInfo) {
    1186            0 :                         a.BlobFileRewriteEnd(info)
    1187            0 :                         b.BlobFileRewriteEnd(info)
    1188            0 :                 },
    1189            0 :                 DataCorruption: func(info DataCorruptionInfo) {
    1190            0 :                         a.DataCorruption(info)
    1191            0 :                         b.DataCorruption(info)
    1192            0 :                 },
    1193            1 :                 CompactionBegin: func(info CompactionInfo) {
    1194            1 :                         a.CompactionBegin(info)
    1195            1 :                         b.CompactionBegin(info)
    1196            1 :                 },
    1197            1 :                 CompactionEnd: func(info CompactionInfo) {
    1198            1 :                         a.CompactionEnd(info)
    1199            1 :                         b.CompactionEnd(info)
    1200            1 :                 },
    1201            0 :                 DiskSlow: func(info DiskSlowInfo) {
    1202            0 :                         a.DiskSlow(info)
    1203            0 :                         b.DiskSlow(info)
    1204            0 :                 },
    1205            1 :                 FlushBegin: func(info FlushInfo) {
    1206            1 :                         a.FlushBegin(info)
    1207            1 :                         b.FlushBegin(info)
    1208            1 :                 },
    1209            1 :                 FlushEnd: func(info FlushInfo) {
    1210            1 :                         a.FlushEnd(info)
    1211            1 :                         b.FlushEnd(info)
    1212            1 :                 },
    1213            0 :                 DownloadBegin: func(info DownloadInfo) {
    1214            0 :                         a.DownloadBegin(info)
    1215            0 :                         b.DownloadBegin(info)
    1216            0 :                 },
    1217            0 :                 DownloadEnd: func(info DownloadInfo) {
    1218            0 :                         a.DownloadEnd(info)
    1219            0 :                         b.DownloadEnd(info)
    1220            0 :                 },
    1221            0 :                 FormatUpgrade: func(v FormatMajorVersion) {
    1222            0 :                         a.FormatUpgrade(v)
    1223            0 :                         b.FormatUpgrade(v)
    1224            0 :                 },
    1225            1 :                 ManifestCreated: func(info ManifestCreateInfo) {
    1226            1 :                         a.ManifestCreated(info)
    1227            1 :                         b.ManifestCreated(info)
    1228            1 :                 },
    1229            0 :                 ManifestDeleted: func(info ManifestDeleteInfo) {
    1230            0 :                         a.ManifestDeleted(info)
    1231            0 :                         b.ManifestDeleted(info)
    1232            0 :                 },
    1233            1 :                 TableCreated: func(info TableCreateInfo) {
    1234            1 :                         a.TableCreated(info)
    1235            1 :                         b.TableCreated(info)
    1236            1 :                 },
    1237            1 :                 TableDeleted: func(info TableDeleteInfo) {
    1238            1 :                         a.TableDeleted(info)
    1239            1 :                         b.TableDeleted(info)
    1240            1 :                 },
    1241            1 :                 TableIngested: func(info TableIngestInfo) {
    1242            1 :                         a.TableIngested(info)
    1243            1 :                         b.TableIngested(info)
    1244            1 :                 },
    1245            1 :                 TableStatsLoaded: func(info TableStatsInfo) {
    1246            1 :                         a.TableStatsLoaded(info)
    1247            1 :                         b.TableStatsLoaded(info)
    1248            1 :                 },
    1249            0 :                 TableValidated: func(info TableValidatedInfo) {
    1250            0 :                         a.TableValidated(info)
    1251            0 :                         b.TableValidated(info)
    1252            0 :                 },
    1253            1 :                 WALCreated: func(info WALCreateInfo) {
    1254            1 :                         a.WALCreated(info)
    1255            1 :                         b.WALCreated(info)
    1256            1 :                 },
    1257            1 :                 WALDeleted: func(info WALDeleteInfo) {
    1258            1 :                         a.WALDeleted(info)
    1259            1 :                         b.WALDeleted(info)
    1260            1 :                 },
    1261            0 :                 WriteStallBegin: func(info WriteStallBeginInfo) {
    1262            0 :                         a.WriteStallBegin(info)
    1263            0 :                         b.WriteStallBegin(info)
    1264            0 :                 },
    1265            0 :                 WriteStallEnd: func() {
    1266            0 :                         a.WriteStallEnd()
    1267            0 :                         b.WriteStallEnd()
    1268            0 :                 },
    1269            0 :                 LowDiskSpace: func(info LowDiskSpaceInfo) {
    1270            0 :                         a.LowDiskSpace(info)
    1271            0 :                         b.LowDiskSpace(info)
    1272            0 :                 },
    1273            0 :                 PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
    1274            0 :                         a.PossibleAPIMisuse(info)
    1275            0 :                         b.PossibleAPIMisuse(info)
    1276            0 :                 },
    1277              :         }
    1278              : }
    1279              : 
    1280              : // lowDiskSpaceReporter contains the logic to report low disk space events.
    1281              : // Report is called whenever we get the disk usage statistics.
    1282              : //
    1283              : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
    1284              : // whenever we reach a new threshold. We periodically repost the event every 30
    1285              : // minutes until we are above all thresholds.
    1286              : type lowDiskSpaceReporter struct {
    1287              :         mu struct {
    1288              :                 sync.Mutex
    1289              :                 lastNoticeThreshold int
    1290              :                 lastNoticeTime      crtime.Mono
    1291              :         }
    1292              : }
    1293              : 
    1294              : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
    1295              : 
    1296              : const lowDiskSpaceFrequency = 30 * time.Minute
    1297              : 
    1298            2 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
    1299            2 :         threshold, ok := r.findThreshold(availBytes, totalBytes)
    1300            2 :         if !ok {
    1301            2 :                 // Normal path.
    1302            2 :                 return
    1303            2 :         }
    1304            1 :         if r.shouldReport(threshold, crtime.NowMono()) {
    1305            1 :                 el.LowDiskSpace(LowDiskSpaceInfo{
    1306            1 :                         AvailBytes:       availBytes,
    1307            1 :                         TotalBytes:       totalBytes,
    1308            1 :                         PercentThreshold: threshold,
    1309            1 :                 })
    1310            1 :         }
    1311              : }
    1312              : 
    1313              : // shouldReport returns true if we should report an event. Updates
    1314              : // lastNoticeTime/lastNoticeThreshold appropriately.
    1315            1 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
    1316            1 :         r.mu.Lock()
    1317            1 :         defer r.mu.Unlock()
    1318            1 :         if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
    1319            1 :                 now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
    1320            1 :                 r.mu.lastNoticeThreshold = threshold
    1321            1 :                 r.mu.lastNoticeTime = now
    1322            1 :                 return true
    1323            1 :         }
    1324            1 :         return false
    1325              : }
    1326              : 
    1327              : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
    1328              : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
    1329              : // there is more free space than the highest threshold).
    1330              : func (r *lowDiskSpaceReporter) findThreshold(
    1331              :         availBytes, totalBytes uint64,
    1332            2 : ) (threshold int, ok bool) {
    1333            2 :         // Note: in the normal path, we exit the loop during the first iteration.
    1334            2 :         for i, t := range lowDiskSpaceThresholds {
    1335            2 :                 if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
    1336            2 :                         break
    1337              :                 }
    1338            1 :                 threshold = t
    1339            1 :                 ok = true
    1340              :         }
    1341            2 :         return threshold, ok
    1342              : }
    1343              : 
    1344              : // reportCorruption reports a corruption of a TableMetadata or BlobFileMetadata
    1345              : // to the event listener and also adds a DataCorruptionInfo payload to the error.
    1346            1 : func (d *DB) reportCorruption(meta base.ObjectInfo, err error) error {
    1347            1 :         if invariants.Enabled && !IsCorruptionError(err) {
    1348            0 :                 panic("not a corruption error")
    1349              :         }
    1350            1 :         fileType, fileNum := meta.FileInfo()
    1351            1 : 
    1352            1 :         objMeta, lookupErr := d.objProvider.Lookup(fileType, fileNum)
    1353            1 :         if lookupErr != nil {
    1354            1 :                 // If the object is not known to the provider, it must be a local object
    1355            1 :                 // that was missing when we opened the store. Remote objects have their
    1356            1 :                 // metadata in a catalog, so even if the backing object is deleted, the
    1357            1 :                 // DiskFileNum would still be known.
    1358            1 :                 objMeta = objstorage.ObjectMetadata{DiskFileNum: fileNum, FileType: fileType}
    1359            1 :         }
    1360            1 :         path := d.objProvider.Path(objMeta)
    1361            1 :         if objMeta.IsRemote() {
    1362            1 :                 // Remote path (which include the locator and full path) might not always be
    1363            1 :                 // safe.
    1364            1 :                 err = errors.WithHintf(err, "path: %s", path)
    1365            1 :         } else {
    1366            1 :                 // Local paths are safe: they start with the store directory and the
    1367            1 :                 // filename is generated by Pebble.
    1368            1 :                 err = errors.WithHintf(err, "path: %s", redact.Safe(path))
    1369            1 :         }
    1370            1 :         info := DataCorruptionInfo{
    1371            1 :                 Path:     path,
    1372            1 :                 IsRemote: objMeta.IsRemote(),
    1373            1 :                 Locator:  objMeta.Remote.Locator,
    1374            1 :                 Bounds:   meta.UserKeyBounds(),
    1375            1 :                 Details:  err,
    1376            1 :         }
    1377            1 :         d.opts.EventListener.DataCorruption(info)
    1378            1 :         // We don't use errors.Join() because that also annotates with this stack
    1379            1 :         // trace which would not be useful.
    1380            1 :         return errorsjoin.Join(err, &corruptionDetailError{info: info})
    1381              : }
    1382              : 
    1383              : type corruptionDetailError struct {
    1384              :         info DataCorruptionInfo
    1385              : }
    1386              : 
    1387            1 : func (e *corruptionDetailError) Error() string {
    1388            1 :         return "<corruption detail carrier>"
    1389            1 : }
    1390              : 
    1391              : // ExtractDataCorruptionInfo extracts the DataCorruptionInfo details from a
    1392              : // corruption error. Returns nil if there is no such detail.
    1393            1 : func ExtractDataCorruptionInfo(err error) *DataCorruptionInfo {
    1394            1 :         var e *corruptionDetailError
    1395            1 :         if errors.As(err, &e) {
    1396            1 :                 return &e.info
    1397            1 :         }
    1398            0 :         return nil
    1399              : }
        

Generated by: LCOV version 2.0-1