LCOV - code coverage report
Current view: top level - pebble - event.go (source / functions) Coverage Total Hit
Test: 2025-03-10 08:17Z de0c5a11 - meta test only.lcov Lines: 53.9 % 571 308
Test Date: 2025-03-10 08:18:31 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "fmt"
       9              :         "strings"
      10              :         "sync"
      11              :         "time"
      12              : 
      13              :         "github.com/cockroachdb/crlib/crtime"
      14              :         "github.com/cockroachdb/errors"
      15              :         "github.com/cockroachdb/pebble/internal/base"
      16              :         "github.com/cockroachdb/pebble/internal/humanize"
      17              :         "github.com/cockroachdb/pebble/internal/invariants"
      18              :         "github.com/cockroachdb/pebble/internal/manifest"
      19              :         "github.com/cockroachdb/pebble/objstorage"
      20              :         "github.com/cockroachdb/pebble/objstorage/remote"
      21              :         "github.com/cockroachdb/pebble/vfs"
      22              :         "github.com/cockroachdb/redact"
      23              : )
      24              : 
      25              : // FileNum is an identifier for a file within a database.
      26              : type FileNum = base.FileNum
      27              : 
      28              : // TableInfo exports the manifest.TableInfo type.
      29              : type TableInfo = manifest.TableInfo
      30              : 
      31            1 : func tablesTotalSize(tables []TableInfo) uint64 {
      32            1 :         var size uint64
      33            1 :         for i := range tables {
      34            1 :                 size += tables[i].Size
      35            1 :         }
      36            1 :         return size
      37              : }
      38              : 
      39            1 : func formatFileNums(tables []TableInfo) string {
      40            1 :         var buf strings.Builder
      41            1 :         for i := range tables {
      42            1 :                 if i > 0 {
      43            1 :                         buf.WriteString(" ")
      44            1 :                 }
      45            1 :                 buf.WriteString(tables[i].FileNum.String())
      46              :         }
      47            1 :         return buf.String()
      48              : }
      49              : 
      50              : // DataCorruptionInfo contains the information for a DataCorruption event.
      51              : type DataCorruptionInfo struct {
      52              :         // Path of the file that is corrupted. For remote files the path starts with
      53              :         // "remote://".
      54              :         Path     string
      55              :         IsRemote bool
      56              :         // Locator is only set when IsRemote is true (note that an empty Locator is
      57              :         // valid even then).
      58              :         Locator remote.Locator
      59              :         // Bounds indicates the keyspace range that is affected.
      60              :         Bounds base.UserKeyBounds
      61              :         // Details of the error. See cockroachdb/error for how to format with or
      62              :         // without redaction.
      63              :         Details error
      64              : }
      65              : 
      66            0 : func (i DataCorruptionInfo) String() string {
      67            0 :         return redact.StringWithoutMarkers(i)
      68            0 : }
      69              : 
      70              : // SafeFormat implements redact.SafeFormatter.
      71            0 : func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
      72            0 :         w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
      73            0 :         if i.IsRemote {
      74            0 :                 w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
      75            0 :         }
      76            0 :         w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
      77              : }
      78              : 
      79              : // LevelInfo contains info pertaining to a particular level.
      80              : type LevelInfo struct {
      81              :         Level  int
      82              :         Tables []TableInfo
      83              :         Score  float64
      84              : }
      85              : 
      86            0 : func (i LevelInfo) String() string {
      87            0 :         return redact.StringWithoutMarkers(i)
      88            0 : }
      89              : 
      90              : // SafeFormat implements redact.SafeFormatter.
      91            1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
      92            1 :         w.Printf("L%d [%s] (%s) Score=%.2f",
      93            1 :                 redact.Safe(i.Level),
      94            1 :                 redact.Safe(formatFileNums(i.Tables)),
      95            1 :                 redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
      96            1 :                 redact.Safe(i.Score))
      97            1 : }
      98              : 
      99              : // CompactionInfo contains the info for a compaction event.
     100              : type CompactionInfo struct {
     101              :         // JobID is the ID of the compaction job.
     102              :         JobID int
     103              :         // Reason is the reason for the compaction.
     104              :         Reason string
     105              :         // Input contains the input tables for the compaction organized by level.
     106              :         Input []LevelInfo
     107              :         // Output contains the output tables generated by the compaction. The output
     108              :         // tables are empty for the compaction begin event.
     109              :         Output LevelInfo
     110              :         // Duration is the time spent compacting, including reading and writing
     111              :         // sstables.
     112              :         Duration time.Duration
     113              :         // TotalDuration is the total wall-time duration of the compaction,
     114              :         // including applying the compaction to the database. TotalDuration is
     115              :         // always ≥ Duration.
     116              :         TotalDuration time.Duration
     117              :         Done          bool
     118              :         Err           error
     119              : 
     120              :         SingleLevelOverlappingRatio float64
     121              :         MultiLevelOverlappingRatio  float64
     122              : 
     123              :         // Annotations specifies additional info to appear in a compaction's event log line
     124              :         Annotations compactionAnnotations
     125              : }
     126              : 
     127              : type compactionAnnotations []string
     128              : 
     129              : // SafeFormat implements redact.SafeFormatter.
     130            1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
     131            1 :         if len(ca) == 0 {
     132            0 :                 return
     133            0 :         }
     134            1 :         for i := range ca {
     135            1 :                 if i != 0 {
     136            0 :                         w.Print(" ")
     137            0 :                 }
     138            1 :                 w.Printf("%s", redact.SafeString(ca[i]))
     139              :         }
     140              : }
     141              : 
     142            1 : func (i CompactionInfo) String() string {
     143            1 :         return redact.StringWithoutMarkers(i)
     144            1 : }
     145              : 
     146              : // SafeFormat implements redact.SafeFormatter.
     147            1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     148            1 :         if i.Err != nil {
     149            1 :                 w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
     150            1 :                         redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
     151            1 :                 return
     152            1 :         }
     153              : 
     154            1 :         if !i.Done {
     155            1 :                 w.Printf("[JOB %d] compacting(%s) ",
     156            1 :                         redact.Safe(i.JobID),
     157            1 :                         redact.SafeString(i.Reason))
     158            1 :                 if len(i.Annotations) > 0 {
     159            1 :                         w.Printf("%s ", i.Annotations)
     160            1 :                 }
     161            1 :                 w.Printf("%s; ", levelInfos(i.Input))
     162            1 :                 w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
     163            1 :                 return
     164              :         }
     165            1 :         outputSize := tablesTotalSize(i.Output.Tables)
     166            1 :         w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
     167            1 :         if len(i.Annotations) > 0 {
     168            1 :                 w.Printf("%s ", i.Annotations)
     169            1 :         }
     170            1 :         w.Print(levelInfos(i.Input))
     171            1 :         w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
     172            1 :                 redact.Safe(i.Output.Level),
     173            1 :                 redact.Safe(formatFileNums(i.Output.Tables)),
     174            1 :                 redact.Safe(humanize.Bytes.Uint64(outputSize)),
     175            1 :                 redact.Safe(i.Duration.Seconds()),
     176            1 :                 redact.Safe(i.TotalDuration.Seconds()),
     177            1 :                 redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     178              : }
     179              : 
     180              : type levelInfos []LevelInfo
     181              : 
     182            1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
     183            1 :         for j, levelInfo := range i {
     184            1 :                 if j > 0 {
     185            1 :                         w.Printf(" + ")
     186            1 :                 }
     187            1 :                 w.Print(levelInfo)
     188              :         }
     189              : }
     190              : 
     191              : // DiskSlowInfo contains the info for a disk slowness event when writing to a
     192              : // file.
     193              : type DiskSlowInfo = vfs.DiskSlowInfo
     194              : 
     195              : // FlushInfo contains the info for a flush event.
     196              : type FlushInfo struct {
     197              :         // JobID is the ID of the flush job.
     198              :         JobID int
     199              :         // Reason is the reason for the flush.
     200              :         Reason string
     201              :         // Input contains the count of input memtables that were flushed.
     202              :         Input int
     203              :         // InputBytes contains the total in-memory size of the memtable(s) that were
     204              :         // flushed. This size includes skiplist indexing data structures.
     205              :         InputBytes uint64
     206              :         // Output contains the ouptut table generated by the flush. The output info
     207              :         // is empty for the flush begin event.
     208              :         Output []TableInfo
     209              :         // Duration is the time spent flushing. This duration includes writing and
     210              :         // syncing all of the flushed keys to sstables.
     211              :         Duration time.Duration
     212              :         // TotalDuration is the total wall-time duration of the flush, including
     213              :         // applying the flush to the database. TotalDuration is always ≥ Duration.
     214              :         TotalDuration time.Duration
     215              :         // Ingest is set to true if the flush is handling tables that were added to
     216              :         // the flushable queue via an ingestion operation.
     217              :         Ingest bool
     218              :         // IngestLevels are the output levels for each ingested table in the flush.
     219              :         // This field is only populated when Ingest is true.
     220              :         IngestLevels []int
     221              :         Done         bool
     222              :         Err          error
     223              : }
     224              : 
     225            1 : func (i FlushInfo) String() string {
     226            1 :         return redact.StringWithoutMarkers(i)
     227            1 : }
     228              : 
     229              : // SafeFormat implements redact.SafeFormatter.
     230            1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     231            1 :         if i.Err != nil {
     232            1 :                 w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
     233            1 :                 return
     234            1 :         }
     235              : 
     236            1 :         plural := redact.SafeString("s")
     237            1 :         if i.Input == 1 {
     238            1 :                 plural = ""
     239            1 :         }
     240            1 :         if !i.Done {
     241            1 :                 w.Printf("[JOB %d] ", redact.Safe(i.JobID))
     242            1 :                 if !i.Ingest {
     243            1 :                         w.Printf("flushing %d memtable", redact.Safe(i.Input))
     244            1 :                         w.SafeString(plural)
     245            1 :                         w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
     246            1 :                 } else {
     247            1 :                         w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
     248            1 :                 }
     249            1 :                 return
     250              :         }
     251              : 
     252            1 :         outputSize := tablesTotalSize(i.Output)
     253            1 :         if !i.Ingest {
     254            1 :                 if invariants.Enabled && len(i.IngestLevels) > 0 {
     255            0 :                         panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
     256              :                 }
     257            1 :                 w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
     258            1 :                         redact.Safe(i.JobID), redact.Safe(i.Input), plural,
     259            1 :                         redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
     260            1 :                         redact.Safe(formatFileNums(i.Output)),
     261            1 :                         redact.Safe(humanize.Bytes.Uint64(outputSize)),
     262            1 :                         redact.Safe(i.Duration.Seconds()),
     263            1 :                         redact.Safe(i.TotalDuration.Seconds()),
     264            1 :                         redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     265            1 :         } else {
     266            1 :                 if invariants.Enabled && len(i.IngestLevels) == 0 {
     267            0 :                         panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
     268              :                 }
     269            1 :                 w.Printf("[JOB %d] flushed %d ingested flushable%s",
     270            1 :                         redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
     271            1 :                 for j, level := range i.IngestLevels {
     272            1 :                         file := i.Output[j]
     273            1 :                         if j > 0 {
     274            1 :                                 w.Printf(" +")
     275            1 :                         }
     276            1 :                         w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
     277              :                 }
     278            1 :                 w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
     279            1 :                         redact.Safe(i.Duration.Seconds()),
     280            1 :                         redact.Safe(i.TotalDuration.Seconds()),
     281            1 :                         redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     282              :         }
     283              : }
     284              : 
     285              : // DownloadInfo contains the info for a DB.Download() event.
     286              : type DownloadInfo struct {
     287              :         // JobID is the ID of the download job.
     288              :         JobID int
     289              : 
     290              :         Spans []DownloadSpan
     291              : 
     292              :         // Duration is the time since the operation was started.
     293              :         Duration                    time.Duration
     294              :         DownloadCompactionsLaunched int
     295              : 
     296              :         // RestartCount indicates that the download operation restarted because it
     297              :         // noticed that new external files were ingested. A DownloadBegin event with
     298              :         // RestartCount = 0 is the start of the operation; each time we restart it we
     299              :         // have another DownloadBegin event with RestartCount > 0.
     300              :         RestartCount int
     301              :         Done         bool
     302              :         Err          error
     303              : }
     304              : 
     305            1 : func (i DownloadInfo) String() string {
     306            1 :         return redact.StringWithoutMarkers(i)
     307            1 : }
     308              : 
     309              : // SafeFormat implements redact.SafeFormatter.
     310            1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     311            1 :         switch {
     312            0 :         case i.Err != nil:
     313            0 :                 w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
     314              : 
     315            1 :         case i.Done:
     316            1 :                 w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
     317            1 :                         redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
     318              : 
     319            1 :         default:
     320            1 :                 if i.RestartCount == 0 {
     321            1 :                         w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
     322            1 :                 } else {
     323            0 :                         w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
     324            0 :                                 redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
     325            0 :                                 redact.Safe(i.DownloadCompactionsLaunched))
     326            0 :                 }
     327              :         }
     328              : }
     329              : 
     330              : // ManifestCreateInfo contains info about a manifest creation event.
     331              : type ManifestCreateInfo struct {
     332              :         // JobID is the ID of the job the caused the manifest to be created.
     333              :         JobID int
     334              :         Path  string
     335              :         // The file number of the new Manifest.
     336              :         FileNum base.DiskFileNum
     337              :         Err     error
     338              : }
     339              : 
     340            1 : func (i ManifestCreateInfo) String() string {
     341            1 :         return redact.StringWithoutMarkers(i)
     342            1 : }
     343              : 
     344              : // SafeFormat implements redact.SafeFormatter.
     345            1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     346            1 :         if i.Err != nil {
     347            0 :                 w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
     348            0 :                 return
     349            0 :         }
     350            1 :         w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
     351              : }
     352              : 
     353              : // ManifestDeleteInfo contains the info for a Manifest deletion event.
     354              : type ManifestDeleteInfo struct {
     355              :         // JobID is the ID of the job the caused the Manifest to be deleted.
     356              :         JobID   int
     357              :         Path    string
     358              :         FileNum base.DiskFileNum
     359              :         Err     error
     360              : }
     361              : 
     362            1 : func (i ManifestDeleteInfo) String() string {
     363            1 :         return redact.StringWithoutMarkers(i)
     364            1 : }
     365              : 
     366              : // SafeFormat implements redact.SafeFormatter.
     367            1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     368            1 :         if i.Err != nil {
     369            0 :                 w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
     370            0 :                 return
     371            0 :         }
     372            1 :         w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
     373              : }
     374              : 
     375              : // TableCreateInfo contains the info for a table creation event.
     376              : type TableCreateInfo struct {
     377              :         JobID int
     378              :         // Reason is the reason for the table creation: "compacting", "flushing", or
     379              :         // "ingesting".
     380              :         Reason  string
     381              :         Path    string
     382              :         FileNum base.DiskFileNum
     383              : }
     384              : 
     385            1 : func (i TableCreateInfo) String() string {
     386            1 :         return redact.StringWithoutMarkers(i)
     387            1 : }
     388              : 
     389              : // SafeFormat implements redact.SafeFormatter.
     390            1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     391            1 :         w.Printf("[JOB %d] %s: sstable created %s",
     392            1 :                 redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
     393            1 : }
     394              : 
     395              : // TableDeleteInfo contains the info for a table deletion event.
     396              : type TableDeleteInfo struct {
     397              :         JobID   int
     398              :         Path    string
     399              :         FileNum base.DiskFileNum
     400              :         Err     error
     401              : }
     402              : 
     403            1 : func (i TableDeleteInfo) String() string {
     404            1 :         return redact.StringWithoutMarkers(i)
     405            1 : }
     406              : 
     407              : // SafeFormat implements redact.SafeFormatter.
     408            1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     409            1 :         if i.Err != nil {
     410            0 :                 w.Printf("[JOB %d] sstable delete error %s: %s",
     411            0 :                         redact.Safe(i.JobID), i.FileNum, i.Err)
     412            0 :                 return
     413            0 :         }
     414            1 :         w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
     415              : }
     416              : 
     417              : // TableIngestInfo contains the info for a table ingestion event.
     418              : type TableIngestInfo struct {
     419              :         // JobID is the ID of the job the caused the table to be ingested.
     420              :         JobID  int
     421              :         Tables []struct {
     422              :                 TableInfo
     423              :                 Level int
     424              :         }
     425              :         // GlobalSeqNum is the sequence number that was assigned to all entries in
     426              :         // the ingested table.
     427              :         GlobalSeqNum base.SeqNum
     428              :         // flushable indicates whether the ingested sstable was treated as a
     429              :         // flushable.
     430              :         flushable bool
     431              :         Err       error
     432              : }
     433              : 
     434            1 : func (i TableIngestInfo) String() string {
     435            1 :         return redact.StringWithoutMarkers(i)
     436            1 : }
     437              : 
     438              : // SafeFormat implements redact.SafeFormatter.
     439            1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     440            1 :         if i.Err != nil {
     441            0 :                 w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
     442            0 :                 return
     443            0 :         }
     444              : 
     445            1 :         if i.flushable {
     446            1 :                 w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
     447            1 :         } else {
     448            1 :                 w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
     449            1 :         }
     450              : 
     451            1 :         for j := range i.Tables {
     452            1 :                 t := &i.Tables[j]
     453            1 :                 if j > 0 {
     454            1 :                         w.Printf(",")
     455            1 :                 }
     456            1 :                 levelStr := ""
     457            1 :                 if !i.flushable {
     458            1 :                         levelStr = fmt.Sprintf("L%d:", t.Level)
     459            1 :                 }
     460            1 :                 w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
     461            1 :                         redact.Safe(humanize.Bytes.Uint64(t.Size)))
     462              :         }
     463              : }
     464              : 
     465              : // TableStatsInfo contains the info for a table stats loaded event.
     466              : type TableStatsInfo struct {
     467              :         // JobID is the ID of the job that finished loading the initial tables'
     468              :         // stats.
     469              :         JobID int
     470              : }
     471              : 
     472            1 : func (i TableStatsInfo) String() string {
     473            1 :         return redact.StringWithoutMarkers(i)
     474            1 : }
     475              : 
     476              : // SafeFormat implements redact.SafeFormatter.
     477            1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     478            1 :         w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
     479            1 : }
     480              : 
     481              : // TableValidatedInfo contains information on the result of a validation run
     482              : // on an sstable.
     483              : type TableValidatedInfo struct {
     484              :         JobID int
     485              :         Meta  *tableMetadata
     486              : }
     487              : 
     488            1 : func (i TableValidatedInfo) String() string {
     489            1 :         return redact.StringWithoutMarkers(i)
     490            1 : }
     491              : 
     492              : // SafeFormat implements redact.SafeFormatter.
     493            1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     494            1 :         w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
     495            1 : }
     496              : 
     497              : // WALCreateInfo contains info about a WAL creation event.
     498              : type WALCreateInfo struct {
     499              :         // JobID is the ID of the job the caused the WAL to be created.
     500              :         JobID int
     501              :         Path  string
     502              :         // The file number of the new WAL.
     503              :         FileNum base.DiskFileNum
     504              :         // The file number of a previous WAL which was recycled to create this
     505              :         // one. Zero if recycling did not take place.
     506              :         RecycledFileNum base.DiskFileNum
     507              :         Err             error
     508              : }
     509              : 
     510            1 : func (i WALCreateInfo) String() string {
     511            1 :         return redact.StringWithoutMarkers(i)
     512            1 : }
     513              : 
     514              : // SafeFormat implements redact.SafeFormatter.
     515            1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     516            1 :         if i.Err != nil {
     517            0 :                 w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
     518            0 :                 return
     519            0 :         }
     520              : 
     521            1 :         if i.RecycledFileNum == 0 {
     522            1 :                 w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
     523            1 :                 return
     524            1 :         }
     525              : 
     526            0 :         w.Printf("[JOB %d] WAL created %s (recycled %s)",
     527            0 :                 redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
     528              : }
     529              : 
     530              : // WALDeleteInfo contains the info for a WAL deletion event.
     531              : //
     532              : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
     533              : // is insufficient to infer whether primary or secondary.
     534              : type WALDeleteInfo struct {
     535              :         // JobID is the ID of the job the caused the WAL to be deleted.
     536              :         JobID   int
     537              :         Path    string
     538              :         FileNum base.DiskFileNum
     539              :         Err     error
     540              : }
     541              : 
     542            1 : func (i WALDeleteInfo) String() string {
     543            1 :         return redact.StringWithoutMarkers(i)
     544            1 : }
     545              : 
     546              : // SafeFormat implements redact.SafeFormatter.
     547            1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     548            1 :         if i.Err != nil {
     549            0 :                 w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
     550            0 :                 return
     551            0 :         }
     552            1 :         w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
     553              : }
     554              : 
     555              : // WriteStallBeginInfo contains the info for a write stall begin event.
     556              : type WriteStallBeginInfo struct {
     557              :         Reason string
     558              : }
     559              : 
     560            1 : func (i WriteStallBeginInfo) String() string {
     561            1 :         return redact.StringWithoutMarkers(i)
     562            1 : }
     563              : 
     564              : // SafeFormat implements redact.SafeFormatter.
     565            1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     566            1 :         w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
     567            1 : }
     568              : 
     569              : // LowDiskSpaceInfo contains the information for a LowDiskSpace
     570              : // event.
     571              : type LowDiskSpaceInfo struct {
     572              :         // AvailBytes is the disk space available to the current process in bytes.
     573              :         AvailBytes uint64
     574              :         // TotalBytes is the total disk space in bytes.
     575              :         TotalBytes uint64
     576              :         // PercentThreshold is one of a set of fixed percentages in the
     577              :         // lowDiskSpaceThresholds below. This event was issued because the disk
     578              :         // space went below this threshold.
     579              :         PercentThreshold int
     580              : }
     581              : 
     582            0 : func (i LowDiskSpaceInfo) String() string {
     583            0 :         return redact.StringWithoutMarkers(i)
     584            0 : }
     585              : 
     586              : // SafeFormat implements redact.SafeFormatter.
     587            0 : func (i LowDiskSpaceInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     588            0 :         w.Printf(
     589            0 :                 "available disk space under %d%% (%s of %s)",
     590            0 :                 redact.Safe(i.PercentThreshold),
     591            0 :                 redact.Safe(humanize.Bytes.Uint64(i.AvailBytes)),
     592            0 :                 redact.Safe(humanize.Bytes.Uint64(i.TotalBytes)),
     593            0 :         )
     594            0 : }
     595              : 
     596              : // PossibleAPIMisuseInfo contains the information for a PossibleAPIMisuse event.
     597              : type PossibleAPIMisuseInfo struct {
     598              :         Kind APIMisuseKind
     599              : 
     600              :         // UserKey is set for the following kinds:
     601              :         //  - IneffectualSingleDelete,
     602              :         //  - NondeterministicSingleDelete.
     603              :         UserKey []byte
     604              : }
     605              : 
     606            1 : func (i PossibleAPIMisuseInfo) String() string {
     607            1 :         return redact.StringWithoutMarkers(i)
     608            1 : }
     609              : 
     610              : // SafeFormat implements redact.SafeFormatter.
     611            1 : func (i PossibleAPIMisuseInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     612            1 :         switch i.Kind {
     613            1 :         case IneffectualSingleDelete, NondeterministicSingleDelete:
     614            1 :                 w.Printf("possible API misuse: %s (key=%q)", redact.Safe(i.Kind), i.UserKey)
     615            0 :         default:
     616            0 :                 if invariants.Enabled {
     617            0 :                         panic("invalid API misuse event")
     618              :                 }
     619            0 :                 w.Printf("invalid API misuse event")
     620              :         }
     621              : }
     622              : 
     623              : // APIMisuseKind identifies the type of API misuse represented by a
     624              : // PossibleAPIMisuse event.
     625              : type APIMisuseKind int8
     626              : 
     627              : const (
     628              :         // IneffectualSingleDelete is emitted in compactions/flushes if any
     629              :         // single delete is being elided without deleting a point set/merge.
     630              :         //
     631              :         // This event can sometimes be a false positive because of delete-only
     632              :         // compactions which can cause a recent RANGEDEL to peek below an older
     633              :         // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
     634              :         //
     635              :         // Example:
     636              :         //   RANGEDEL [a, c)#10 in L0
     637              :         //   SINGLEDEL b#5 in L1
     638              :         //   SET b#3 in L6
     639              :         //
     640              :         // If the L6 file containing the SET is narrow and the L1 file containing
     641              :         // the SINGLEDEL is wide, a delete-only compaction can remove the file in
     642              :         // L2 before the SINGLEDEL is compacted down. Then when the SINGLEDEL is
     643              :         // compacted down, it will not find any SET to delete, resulting in the
     644              :         // ineffectual callback.
     645              :         IneffectualSingleDelete APIMisuseKind = iota
     646              : 
     647              :         // NondeterministicSingleDelete is emitted in compactions/flushes if any
     648              :         // single delete has consumed a Set/Merge, and there is another immediately
     649              :         // older Set/SetWithDelete/Merge. The user of Pebble has violated the
     650              :         // invariant under which SingleDelete can be used correctly.
     651              :         //
     652              :         // Consider the sequence SingleDelete#3, Set#2, Set#1. There are three
     653              :         // ways some of these keys can first meet in a compaction.
     654              :         //
     655              :         // - All 3 keys in the same compaction: this callback will detect the
     656              :         //   violation.
     657              :         //
     658              :         // - SingleDelete#3, Set#2 meet in a compaction first: Both keys will
     659              :         //   disappear. The violation will not be detected, and the DB will have
     660              :         //   Set#1 which is likely incorrect (from the user's perspective).
     661              :         //
     662              :         // - Set#2, Set#1 meet in a compaction first: The output will be Set#2,
     663              :         //   which will later be consumed by SingleDelete#3. The violation will
     664              :         //   not be detected and the DB will be correct.
     665              :         //
     666              :         // This event can sometimes be a false positive because of delete-only
     667              :         // compactions which can cause a recent RANGEDEL to peek below an older
     668              :         // SINGLEDEL and delete an arbitrary subset of data below that SINGLEDEL.
     669              :         //
     670              :         // Example:
     671              :         //   RANGEDEL [a, z)#60 in L0
     672              :         //   SINGLEDEL g#50 in L1
     673              :         //   SET g#40 in L2
     674              :         //   RANGEDEL [g,h)#30 in L3
     675              :         //   SET g#20 in L6
     676              :         //
     677              :         // In this example, the two SETs represent the same user write, and the
     678              :         // RANGEDELs are caused by the CockroachDB range being dropped. That is,
     679              :         // the user wrote to g once, range was dropped, then added back, which
     680              :         // caused the SET again, then at some point g was validly deleted using a
     681              :         // SINGLEDEL, and then the range was dropped again. The older RANGEDEL can
     682              :         // get fragmented due to compactions it has been part of. Say this L3 file
     683              :         // containing the RANGEDEL is very narrow, while the L1, L2, L6 files are
     684              :         // wider than the RANGEDEL in L0. Then the RANGEDEL in L3 can be dropped
     685              :         // using a delete-only compaction, resulting in an LSM with state:
     686              :         //
     687              :         //   RANGEDEL [a, z)#60 in L0
     688              :         //   SINGLEDEL g#50 in L1
     689              :         //   SET g#40 in L2
     690              :         //   SET g#20 in L6
     691              :         //
     692              :         // A multi-level compaction involving L1, L2, L6 will cause the invariant
     693              :         // violation callback. This example doesn't need multi-level compactions:
     694              :         // say there was a Pebble snapshot at g#21 preventing g#20 from being
     695              :         // dropped when it meets g#40 in a compaction. That snapshot will not save
     696              :         // RANGEDEL [g,h)#30, so we can have:
     697              :         //
     698              :         //   SINGLEDEL g#50 in L1
     699              :         //   SET g#40, SET g#20 in L6
     700              :         //
     701              :         // And say the snapshot is removed and then the L1 and L6 compaction
     702              :         // happens, resulting in the invariant violation callback.
     703              :         NondeterministicSingleDelete
     704              : )
     705              : 
     706            1 : func (k APIMisuseKind) String() string {
     707            1 :         switch k {
     708            1 :         case IneffectualSingleDelete:
     709            1 :                 return "ineffectual SINGLEDEL"
     710            0 :         case NondeterministicSingleDelete:
     711            0 :                 return "nondeterministic SINGLEDEL"
     712            0 :         default:
     713            0 :                 return "unknown"
     714              :         }
     715              : }
     716              : 
     717              : // EventListener contains a set of functions that will be invoked when various
     718              : // significant DB events occur. Note that the functions should not run for an
     719              : // excessive amount of time as they are invoked synchronously by the DB and may
     720              : // block continued DB work. For a similar reason it is advisable to not perform
     721              : // any synchronous calls back into the DB.
     722              : type EventListener struct {
     723              :         // BackgroundError is invoked whenever an error occurs during a background
     724              :         // operation such as flush or compaction.
     725              :         BackgroundError func(error)
     726              : 
     727              :         // DataCorruption is invoked when an on-disk corruption is detected. It should
     728              :         // not block, as it is called synchronously in read paths.
     729              :         DataCorruption func(DataCorruptionInfo)
     730              : 
     731              :         // CompactionBegin is invoked after the inputs to a compaction have been
     732              :         // determined, but before the compaction has produced any output.
     733              :         CompactionBegin func(CompactionInfo)
     734              : 
     735              :         // CompactionEnd is invoked after a compaction has completed and the result
     736              :         // has been installed.
     737              :         CompactionEnd func(CompactionInfo)
     738              : 
     739              :         // DiskSlow is invoked after a disk write operation on a file created with a
     740              :         // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
     741              :         // observed to exceed the specified disk slowness threshold duration. DiskSlow
     742              :         // is called on a goroutine that is monitoring slowness/stuckness. The callee
     743              :         // MUST return without doing any IO, or blocking on anything (like a mutex)
     744              :         // that is waiting on IO. This is imperative in order to reliably monitor for
     745              :         // slowness, since if this goroutine gets stuck, the monitoring will stop
     746              :         // working.
     747              :         DiskSlow func(DiskSlowInfo)
     748              : 
     749              :         // FlushBegin is invoked after the inputs to a flush have been determined,
     750              :         // but before the flush has produced any output.
     751              :         FlushBegin func(FlushInfo)
     752              : 
     753              :         // FlushEnd is invoked after a flush has complated and the result has been
     754              :         // installed.
     755              :         FlushEnd func(FlushInfo)
     756              : 
     757              :         // DownloadBegin is invoked when a db.Download operation starts or restarts
     758              :         // (restarts are caused by new external tables being ingested during the
     759              :         // operation).
     760              :         DownloadBegin func(DownloadInfo)
     761              : 
     762              :         // DownloadEnd is invoked when a db.Download operation completes.
     763              :         DownloadEnd func(DownloadInfo)
     764              : 
     765              :         // FormatUpgrade is invoked after the database's FormatMajorVersion
     766              :         // is upgraded.
     767              :         FormatUpgrade func(FormatMajorVersion)
     768              : 
     769              :         // ManifestCreated is invoked after a manifest has been created.
     770              :         ManifestCreated func(ManifestCreateInfo)
     771              : 
     772              :         // ManifestDeleted is invoked after a manifest has been deleted.
     773              :         ManifestDeleted func(ManifestDeleteInfo)
     774              : 
     775              :         // TableCreated is invoked when a table has been created.
     776              :         TableCreated func(TableCreateInfo)
     777              : 
     778              :         // TableDeleted is invoked after a table has been deleted.
     779              :         TableDeleted func(TableDeleteInfo)
     780              : 
     781              :         // TableIngested is invoked after an externally created table has been
     782              :         // ingested via a call to DB.Ingest().
     783              :         TableIngested func(TableIngestInfo)
     784              : 
     785              :         // TableStatsLoaded is invoked at most once, when the table stats
     786              :         // collector has loaded statistics for all tables that existed at Open.
     787              :         TableStatsLoaded func(TableStatsInfo)
     788              : 
     789              :         // TableValidated is invoked after validation runs on an sstable.
     790              :         TableValidated func(TableValidatedInfo)
     791              : 
     792              :         // WALCreated is invoked after a WAL has been created.
     793              :         WALCreated func(WALCreateInfo)
     794              : 
     795              :         // WALDeleted is invoked after a WAL has been deleted.
     796              :         WALDeleted func(WALDeleteInfo)
     797              : 
     798              :         // WriteStallBegin is invoked when writes are intentionally delayed.
     799              :         WriteStallBegin func(WriteStallBeginInfo)
     800              : 
     801              :         // WriteStallEnd is invoked when delayed writes are released.
     802              :         WriteStallEnd func()
     803              : 
     804              :         // LowDiskSpace is invoked periodically when the disk space is running
     805              :         // low.
     806              :         LowDiskSpace func(LowDiskSpaceInfo)
     807              : 
     808              :         // PossibleAPIMisuse is invoked when a possible API misuse is detected.
     809              :         PossibleAPIMisuse func(PossibleAPIMisuseInfo)
     810              : }
     811              : 
     812            0 : func backgroundError(logger Logger, err error) {
     813            0 :         if errors.Is(err, ErrCancelledCompaction) {
     814            0 :                 // ErrCancelledCompaction is not an unexpected error, hence severity INFO.
     815            0 :                 logger.Infof("background error: %s", err)
     816            0 :         } else {
     817            0 :                 logger.Errorf("background error: %s", err)
     818            0 :         }
     819              : }
     820              : 
     821              : // EnsureDefaults ensures that background error events are logged to the
     822              : // specified logger if a handler for those events hasn't been otherwise
     823              : // specified. Ensure all handlers are non-nil so that we don't have to check
     824              : // for nil-ness before invoking.
     825            1 : func (l *EventListener) EnsureDefaults(logger Logger) {
     826            1 :         if l.BackgroundError == nil {
     827            1 :                 if logger != nil {
     828            1 :                         l.BackgroundError = func(err error) {
     829            0 :                                 backgroundError(logger, err)
     830            0 :                         }
     831            0 :                 } else {
     832            0 :                         l.BackgroundError = func(error) {}
     833              :                 }
     834              :         }
     835            1 :         if l.DataCorruption == nil {
     836            1 :                 if logger != nil {
     837            1 :                         l.DataCorruption = func(info DataCorruptionInfo) {
     838            0 :                                 logger.Fatalf("%s", info)
     839            0 :                         }
     840            0 :                 } else {
     841            0 :                         l.DataCorruption = func(info DataCorruptionInfo) {}
     842              :                 }
     843              :         }
     844            1 :         if l.CompactionBegin == nil {
     845            1 :                 l.CompactionBegin = func(info CompactionInfo) {}
     846              :         }
     847            1 :         if l.CompactionEnd == nil {
     848            1 :                 l.CompactionEnd = func(info CompactionInfo) {}
     849              :         }
     850            1 :         if l.DiskSlow == nil {
     851            1 :                 l.DiskSlow = func(info DiskSlowInfo) {}
     852              :         }
     853            1 :         if l.FlushBegin == nil {
     854            1 :                 l.FlushBegin = func(info FlushInfo) {}
     855              :         }
     856            1 :         if l.FlushEnd == nil {
     857            1 :                 l.FlushEnd = func(info FlushInfo) {}
     858              :         }
     859            1 :         if l.DownloadBegin == nil {
     860            1 :                 l.DownloadBegin = func(info DownloadInfo) {}
     861              :         }
     862            1 :         if l.DownloadEnd == nil {
     863            1 :                 l.DownloadEnd = func(info DownloadInfo) {}
     864              :         }
     865            1 :         if l.FormatUpgrade == nil {
     866            1 :                 l.FormatUpgrade = func(v FormatMajorVersion) {}
     867              :         }
     868            1 :         if l.ManifestCreated == nil {
     869            1 :                 l.ManifestCreated = func(info ManifestCreateInfo) {}
     870              :         }
     871            1 :         if l.ManifestDeleted == nil {
     872            1 :                 l.ManifestDeleted = func(info ManifestDeleteInfo) {}
     873              :         }
     874            1 :         if l.TableCreated == nil {
     875            1 :                 l.TableCreated = func(info TableCreateInfo) {}
     876              :         }
     877            1 :         if l.TableDeleted == nil {
     878            1 :                 l.TableDeleted = func(info TableDeleteInfo) {}
     879              :         }
     880            1 :         if l.TableIngested == nil {
     881            1 :                 l.TableIngested = func(info TableIngestInfo) {}
     882              :         }
     883            1 :         if l.TableStatsLoaded == nil {
     884            1 :                 l.TableStatsLoaded = func(info TableStatsInfo) {}
     885              :         }
     886            1 :         if l.TableValidated == nil {
     887            1 :                 l.TableValidated = func(validated TableValidatedInfo) {}
     888              :         }
     889            1 :         if l.WALCreated == nil {
     890            1 :                 l.WALCreated = func(info WALCreateInfo) {}
     891              :         }
     892            1 :         if l.WALDeleted == nil {
     893            1 :                 l.WALDeleted = func(info WALDeleteInfo) {}
     894              :         }
     895            1 :         if l.WriteStallBegin == nil {
     896            1 :                 l.WriteStallBegin = func(info WriteStallBeginInfo) {}
     897              :         }
     898            1 :         if l.WriteStallEnd == nil {
     899            1 :                 l.WriteStallEnd = func() {}
     900              :         }
     901            1 :         if l.LowDiskSpace == nil {
     902            1 :                 l.LowDiskSpace = func(info LowDiskSpaceInfo) {}
     903              :         }
     904            1 :         if l.PossibleAPIMisuse == nil {
     905            1 :                 l.PossibleAPIMisuse = func(info PossibleAPIMisuseInfo) {}
     906              :         }
     907              : }
     908              : 
     909              : // MakeLoggingEventListener creates an EventListener that logs all events to the
     910              : // specified logger.
     911            1 : func MakeLoggingEventListener(logger Logger) EventListener {
     912            1 :         if logger == nil {
     913            0 :                 logger = DefaultLogger
     914            0 :         }
     915              : 
     916            1 :         return EventListener{
     917            1 :                 BackgroundError: func(err error) {
     918            0 :                         backgroundError(logger, err)
     919            0 :                 },
     920            0 :                 DataCorruption: func(info DataCorruptionInfo) {
     921            0 :                         logger.Errorf("%s", info)
     922            0 :                 },
     923            1 :                 CompactionBegin: func(info CompactionInfo) {
     924            1 :                         logger.Infof("%s", info)
     925            1 :                 },
     926            0 :                 CompactionEnd: func(info CompactionInfo) {
     927            0 :                         logger.Infof("%s", info)
     928            0 :                 },
     929            0 :                 DiskSlow: func(info DiskSlowInfo) {
     930            0 :                         logger.Infof("%s", info)
     931            0 :                 },
     932            1 :                 FlushBegin: func(info FlushInfo) {
     933            1 :                         logger.Infof("%s", info)
     934            1 :                 },
     935            0 :                 FlushEnd: func(info FlushInfo) {
     936            0 :                         logger.Infof("%s", info)
     937            0 :                 },
     938            1 :                 DownloadBegin: func(info DownloadInfo) {
     939            1 :                         logger.Infof("%s", info)
     940            1 :                 },
     941            0 :                 DownloadEnd: func(info DownloadInfo) {
     942            0 :                         logger.Infof("%s", info)
     943            0 :                 },
     944            1 :                 FormatUpgrade: func(v FormatMajorVersion) {
     945            1 :                         logger.Infof("upgraded to format version: %s", v)
     946            1 :                 },
     947            0 :                 ManifestCreated: func(info ManifestCreateInfo) {
     948            0 :                         logger.Infof("%s", info)
     949            0 :                 },
     950            0 :                 ManifestDeleted: func(info ManifestDeleteInfo) {
     951            0 :                         logger.Infof("%s", info)
     952            0 :                 },
     953            1 :                 TableCreated: func(info TableCreateInfo) {
     954            1 :                         logger.Infof("%s", info)
     955            1 :                 },
     956            0 :                 TableDeleted: func(info TableDeleteInfo) {
     957            0 :                         logger.Infof("%s", info)
     958            0 :                 },
     959            0 :                 TableIngested: func(info TableIngestInfo) {
     960            0 :                         logger.Infof("%s", info)
     961            0 :                 },
     962            1 :                 TableStatsLoaded: func(info TableStatsInfo) {
     963            1 :                         logger.Infof("%s", info)
     964            1 :                 },
     965            1 :                 TableValidated: func(info TableValidatedInfo) {
     966            1 :                         logger.Infof("%s", info)
     967            1 :                 },
     968            0 :                 WALCreated: func(info WALCreateInfo) {
     969            0 :                         logger.Infof("%s", info)
     970            0 :                 },
     971            0 :                 WALDeleted: func(info WALDeleteInfo) {
     972            0 :                         logger.Infof("%s", info)
     973            0 :                 },
     974            1 :                 WriteStallBegin: func(info WriteStallBeginInfo) {
     975            1 :                         logger.Infof("%s", info)
     976            1 :                 },
     977            1 :                 WriteStallEnd: func() {
     978            1 :                         logger.Infof("write stall ending")
     979            1 :                 },
     980            0 :                 LowDiskSpace: func(info LowDiskSpaceInfo) {
     981            0 :                         logger.Infof("%s", info)
     982            0 :                 },
     983            1 :                 PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
     984            1 :                         logger.Infof("%s", info)
     985            1 :                 },
     986              :         }
     987              : }
     988              : 
     989              : // TeeEventListener wraps two EventListeners, forwarding all events to both.
     990            0 : func TeeEventListener(a, b EventListener) EventListener {
     991            0 :         a.EnsureDefaults(nil)
     992            0 :         b.EnsureDefaults(nil)
     993            0 :         return EventListener{
     994            0 :                 BackgroundError: func(err error) {
     995            0 :                         a.BackgroundError(err)
     996            0 :                         b.BackgroundError(err)
     997            0 :                 },
     998            0 :                 DataCorruption: func(info DataCorruptionInfo) {
     999            0 :                         a.DataCorruption(info)
    1000            0 :                         b.DataCorruption(info)
    1001            0 :                 },
    1002            0 :                 CompactionBegin: func(info CompactionInfo) {
    1003            0 :                         a.CompactionBegin(info)
    1004            0 :                         b.CompactionBegin(info)
    1005            0 :                 },
    1006            0 :                 CompactionEnd: func(info CompactionInfo) {
    1007            0 :                         a.CompactionEnd(info)
    1008            0 :                         b.CompactionEnd(info)
    1009            0 :                 },
    1010            0 :                 DiskSlow: func(info DiskSlowInfo) {
    1011            0 :                         a.DiskSlow(info)
    1012            0 :                         b.DiskSlow(info)
    1013            0 :                 },
    1014            0 :                 FlushBegin: func(info FlushInfo) {
    1015            0 :                         a.FlushBegin(info)
    1016            0 :                         b.FlushBegin(info)
    1017            0 :                 },
    1018            0 :                 FlushEnd: func(info FlushInfo) {
    1019            0 :                         a.FlushEnd(info)
    1020            0 :                         b.FlushEnd(info)
    1021            0 :                 },
    1022            0 :                 DownloadBegin: func(info DownloadInfo) {
    1023            0 :                         a.DownloadBegin(info)
    1024            0 :                         b.DownloadBegin(info)
    1025            0 :                 },
    1026            0 :                 DownloadEnd: func(info DownloadInfo) {
    1027            0 :                         a.DownloadEnd(info)
    1028            0 :                         b.DownloadEnd(info)
    1029            0 :                 },
    1030            0 :                 FormatUpgrade: func(v FormatMajorVersion) {
    1031            0 :                         a.FormatUpgrade(v)
    1032            0 :                         b.FormatUpgrade(v)
    1033            0 :                 },
    1034            0 :                 ManifestCreated: func(info ManifestCreateInfo) {
    1035            0 :                         a.ManifestCreated(info)
    1036            0 :                         b.ManifestCreated(info)
    1037            0 :                 },
    1038            0 :                 ManifestDeleted: func(info ManifestDeleteInfo) {
    1039            0 :                         a.ManifestDeleted(info)
    1040            0 :                         b.ManifestDeleted(info)
    1041            0 :                 },
    1042            0 :                 TableCreated: func(info TableCreateInfo) {
    1043            0 :                         a.TableCreated(info)
    1044            0 :                         b.TableCreated(info)
    1045            0 :                 },
    1046            0 :                 TableDeleted: func(info TableDeleteInfo) {
    1047            0 :                         a.TableDeleted(info)
    1048            0 :                         b.TableDeleted(info)
    1049            0 :                 },
    1050            0 :                 TableIngested: func(info TableIngestInfo) {
    1051            0 :                         a.TableIngested(info)
    1052            0 :                         b.TableIngested(info)
    1053            0 :                 },
    1054            0 :                 TableStatsLoaded: func(info TableStatsInfo) {
    1055            0 :                         a.TableStatsLoaded(info)
    1056            0 :                         b.TableStatsLoaded(info)
    1057            0 :                 },
    1058            0 :                 TableValidated: func(info TableValidatedInfo) {
    1059            0 :                         a.TableValidated(info)
    1060            0 :                         b.TableValidated(info)
    1061            0 :                 },
    1062            0 :                 WALCreated: func(info WALCreateInfo) {
    1063            0 :                         a.WALCreated(info)
    1064            0 :                         b.WALCreated(info)
    1065            0 :                 },
    1066            0 :                 WALDeleted: func(info WALDeleteInfo) {
    1067            0 :                         a.WALDeleted(info)
    1068            0 :                         b.WALDeleted(info)
    1069            0 :                 },
    1070            0 :                 WriteStallBegin: func(info WriteStallBeginInfo) {
    1071            0 :                         a.WriteStallBegin(info)
    1072            0 :                         b.WriteStallBegin(info)
    1073            0 :                 },
    1074            0 :                 WriteStallEnd: func() {
    1075            0 :                         a.WriteStallEnd()
    1076            0 :                         b.WriteStallEnd()
    1077            0 :                 },
    1078            0 :                 LowDiskSpace: func(info LowDiskSpaceInfo) {
    1079            0 :                         a.LowDiskSpace(info)
    1080            0 :                         b.LowDiskSpace(info)
    1081            0 :                 },
    1082            0 :                 PossibleAPIMisuse: func(info PossibleAPIMisuseInfo) {
    1083            0 :                         a.PossibleAPIMisuse(info)
    1084            0 :                         b.PossibleAPIMisuse(info)
    1085            0 :                 },
    1086              :         }
    1087              : }
    1088              : 
    1089              : // lowDiskSpaceReporter contains the logic to report low disk space events.
    1090              : // Report is called whenever we get the disk usage statistics.
    1091              : //
    1092              : // We define a few thresholds (10%, 5%, 3%, 2%, 1%) and we post an event
    1093              : // whenever we reach a new threshold. We periodically repost the event every 30
    1094              : // minutes until we are above all thresholds.
    1095              : type lowDiskSpaceReporter struct {
    1096              :         mu struct {
    1097              :                 sync.Mutex
    1098              :                 lastNoticeThreshold int
    1099              :                 lastNoticeTime      crtime.Mono
    1100              :         }
    1101              : }
    1102              : 
    1103              : var lowDiskSpaceThresholds = []int{10, 5, 3, 2, 1}
    1104              : 
    1105              : const lowDiskSpaceFrequency = 30 * time.Minute
    1106              : 
    1107            1 : func (r *lowDiskSpaceReporter) Report(availBytes, totalBytes uint64, el *EventListener) {
    1108            1 :         threshold, ok := r.findThreshold(availBytes, totalBytes)
    1109            1 :         if !ok {
    1110            1 :                 // Normal path.
    1111            1 :                 return
    1112            1 :         }
    1113            0 :         if r.shouldReport(threshold, crtime.NowMono()) {
    1114            0 :                 el.LowDiskSpace(LowDiskSpaceInfo{
    1115            0 :                         AvailBytes:       availBytes,
    1116            0 :                         TotalBytes:       totalBytes,
    1117            0 :                         PercentThreshold: threshold,
    1118            0 :                 })
    1119            0 :         }
    1120              : }
    1121              : 
    1122              : // shouldReport returns true if we should report an event. Updates
    1123              : // lastNoticeTime/lastNoticeThreshold appropriately.
    1124            0 : func (r *lowDiskSpaceReporter) shouldReport(threshold int, now crtime.Mono) bool {
    1125            0 :         r.mu.Lock()
    1126            0 :         defer r.mu.Unlock()
    1127            0 :         if threshold < r.mu.lastNoticeThreshold || r.mu.lastNoticeTime == 0 ||
    1128            0 :                 now.Sub(r.mu.lastNoticeTime) >= lowDiskSpaceFrequency {
    1129            0 :                 r.mu.lastNoticeThreshold = threshold
    1130            0 :                 r.mu.lastNoticeTime = now
    1131            0 :                 return true
    1132            0 :         }
    1133            0 :         return false
    1134              : }
    1135              : 
    1136              : // findThreshold returns the largest threshold in lowDiskSpaceThresholds which
    1137              : // is >= the percentage ratio between availBytes and totalBytes (or ok=false if
    1138              : // there is more free space than the highest threshold).
    1139              : func (r *lowDiskSpaceReporter) findThreshold(
    1140              :         availBytes, totalBytes uint64,
    1141            1 : ) (threshold int, ok bool) {
    1142            1 :         // Note: in the normal path, we exit the loop during the first iteration.
    1143            1 :         for i, t := range lowDiskSpaceThresholds {
    1144            1 :                 if availBytes*100 > totalBytes*uint64(lowDiskSpaceThresholds[i]) {
    1145            1 :                         break
    1146              :                 }
    1147            0 :                 threshold = t
    1148            0 :                 ok = true
    1149              :         }
    1150            1 :         return threshold, ok
    1151              : }
    1152              : 
    1153            0 : func (d *DB) reportSSTableCorruption(meta *manifest.TableMetadata, err error) {
    1154            0 :         if invariants.Enabled && err == nil {
    1155            0 :                 panic("nil error")
    1156              :         }
    1157            0 :         objMeta, lookupErr := d.objProvider.Lookup(base.FileTypeTable, meta.FileBacking.DiskFileNum)
    1158            0 :         if lookupErr != nil {
    1159            0 :                 // If the object is not known to the provider, it must be a local object
    1160            0 :                 // that was missing when we opened the store. Remote objects have their
    1161            0 :                 // metadata in a catalog, so even if the backing object is deleted, the
    1162            0 :                 // DiskFileNum would still be known.
    1163            0 :                 objMeta = objstorage.ObjectMetadata{DiskFileNum: meta.FileBacking.DiskFileNum, FileType: base.FileTypeTable}
    1164            0 :         }
    1165            0 :         path := d.objProvider.Path(objMeta)
    1166            0 :         if objMeta.IsRemote() {
    1167            0 :                 // Remote path (which include the locator and full path) might not always be
    1168            0 :                 // safe.
    1169            0 :                 err = errors.WithHintf(err, "path: %s", path)
    1170            0 :         } else {
    1171            0 :                 // Local paths are safe: they start with the store directory and the
    1172            0 :                 // filename is generated by Pebble.
    1173            0 :                 err = errors.WithHintf(err, "path: %s", redact.Safe(path))
    1174            0 :         }
    1175            0 :         info := DataCorruptionInfo{
    1176            0 :                 Path:     path,
    1177            0 :                 IsRemote: objMeta.IsRemote(),
    1178            0 :                 Locator:  objMeta.Remote.Locator,
    1179            0 :                 Bounds:   meta.UserKeyBounds(),
    1180            0 :                 Details:  err,
    1181            0 :         }
    1182            0 :         d.opts.EventListener.DataCorruption(info)
    1183              : }
        

Generated by: LCOV version 2.0-1