LCOV - code coverage report
Current view: top level - pebble - event.go (source / functions) Hit Total Coverage
Test: 2024-10-18 08:16Z ff784a89 - meta test only.lcov Lines: 275 433 63.5 %
Date: 2024-10-18 08:18:35 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "strings"
      10             :         "time"
      11             : 
      12             :         "github.com/cockroachdb/errors"
      13             :         "github.com/cockroachdb/pebble/internal/base"
      14             :         "github.com/cockroachdb/pebble/internal/humanize"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/internal/manifest"
      17             :         "github.com/cockroachdb/pebble/vfs"
      18             :         "github.com/cockroachdb/redact"
      19             : )
      20             : 
      21             : // TableInfo exports the manifest.TableInfo type.
      22             : type TableInfo = manifest.TableInfo
      23             : 
      24           1 : func tablesTotalSize(tables []TableInfo) uint64 {
      25           1 :         var size uint64
      26           1 :         for i := range tables {
      27           1 :                 size += tables[i].Size
      28           1 :         }
      29           1 :         return size
      30             : }
      31             : 
      32           1 : func formatFileNums(tables []TableInfo) string {
      33           1 :         var buf strings.Builder
      34           1 :         for i := range tables {
      35           1 :                 if i > 0 {
      36           1 :                         buf.WriteString(" ")
      37           1 :                 }
      38           1 :                 buf.WriteString(tables[i].FileNum.String())
      39             :         }
      40           1 :         return buf.String()
      41             : }
      42             : 
      43             : // LevelInfo contains info pertaining to a particular level.
      44             : type LevelInfo struct {
      45             :         Level  int
      46             :         Tables []TableInfo
      47             :         Score  float64
      48             : }
      49             : 
      50           0 : func (i LevelInfo) String() string {
      51           0 :         return redact.StringWithoutMarkers(i)
      52           0 : }
      53             : 
      54             : // SafeFormat implements redact.SafeFormatter.
      55           1 : func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
      56           1 :         w.Printf("L%d [%s] (%s) Score=%.2f",
      57           1 :                 redact.Safe(i.Level),
      58           1 :                 redact.Safe(formatFileNums(i.Tables)),
      59           1 :                 redact.Safe(humanize.Bytes.Uint64(tablesTotalSize(i.Tables))),
      60           1 :                 redact.Safe(i.Score))
      61           1 : }
      62             : 
      63             : // CompactionInfo contains the info for a compaction event.
      64             : type CompactionInfo struct {
      65             :         // JobID is the ID of the compaction job.
      66             :         JobID int
      67             :         // Reason is the reason for the compaction.
      68             :         Reason string
      69             :         // Input contains the input tables for the compaction organized by level.
      70             :         Input []LevelInfo
      71             :         // Output contains the output tables generated by the compaction. The output
      72             :         // tables are empty for the compaction begin event.
      73             :         Output LevelInfo
      74             :         // Duration is the time spent compacting, including reading and writing
      75             :         // sstables.
      76             :         Duration time.Duration
      77             :         // TotalDuration is the total wall-time duration of the compaction,
      78             :         // including applying the compaction to the database. TotalDuration is
      79             :         // always ≥ Duration.
      80             :         TotalDuration time.Duration
      81             :         Done          bool
      82             :         Err           error
      83             : 
      84             :         SingleLevelOverlappingRatio float64
      85             :         MultiLevelOverlappingRatio  float64
      86             : 
      87             :         // Annotations specifies additional info to appear in a compaction's event log line
      88             :         Annotations compactionAnnotations
      89             : }
      90             : 
      91             : type compactionAnnotations []string
      92             : 
      93             : // SafeFormat implements redact.SafeFormatter.
      94           1 : func (ca compactionAnnotations) SafeFormat(w redact.SafePrinter, _ rune) {
      95           1 :         if len(ca) == 0 {
      96           0 :                 return
      97           0 :         }
      98           1 :         for i := range ca {
      99           1 :                 if i != 0 {
     100           0 :                         w.Print(" ")
     101           0 :                 }
     102           1 :                 w.Printf("%s", redact.SafeString(ca[i]))
     103             :         }
     104             : }
     105             : 
     106           1 : func (i CompactionInfo) String() string {
     107           1 :         return redact.StringWithoutMarkers(i)
     108           1 : }
     109             : 
     110             : // SafeFormat implements redact.SafeFormatter.
     111           1 : func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     112           1 :         if i.Err != nil {
     113           1 :                 w.Printf("[JOB %d] compaction(%s) to L%d error: %s",
     114           1 :                         redact.Safe(i.JobID), redact.SafeString(i.Reason), redact.Safe(i.Output.Level), i.Err)
     115           1 :                 return
     116           1 :         }
     117             : 
     118           1 :         if !i.Done {
     119           1 :                 w.Printf("[JOB %d] compacting(%s) ",
     120           1 :                         redact.Safe(i.JobID),
     121           1 :                         redact.SafeString(i.Reason))
     122           1 :                 if len(i.Annotations) > 0 {
     123           1 :                         w.Printf("%s ", i.Annotations)
     124           1 :                 }
     125           1 :                 w.Printf("%s; ", levelInfos(i.Input))
     126           1 :                 w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
     127           1 :                 return
     128             :         }
     129           1 :         outputSize := tablesTotalSize(i.Output.Tables)
     130           1 :         w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
     131           1 :         if len(i.Annotations) > 0 {
     132           1 :                 w.Printf("%s ", i.Annotations)
     133           1 :         }
     134           1 :         w.Print(levelInfos(i.Input))
     135           1 :         w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
     136           1 :                 redact.Safe(i.Output.Level),
     137           1 :                 redact.Safe(formatFileNums(i.Output.Tables)),
     138           1 :                 redact.Safe(humanize.Bytes.Uint64(outputSize)),
     139           1 :                 redact.Safe(i.Duration.Seconds()),
     140           1 :                 redact.Safe(i.TotalDuration.Seconds()),
     141           1 :                 redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     142             : }
     143             : 
     144             : type levelInfos []LevelInfo
     145             : 
     146           1 : func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
     147           1 :         for j, levelInfo := range i {
     148           1 :                 if j > 0 {
     149           1 :                         w.Printf(" + ")
     150           1 :                 }
     151           1 :                 w.Print(levelInfo)
     152             :         }
     153             : }
     154             : 
     155             : // DiskSlowInfo contains the info for a disk slowness event when writing to a
     156             : // file.
     157             : type DiskSlowInfo = vfs.DiskSlowInfo
     158             : 
     159             : // FlushInfo contains the info for a flush event.
     160             : type FlushInfo struct {
     161             :         // JobID is the ID of the flush job.
     162             :         JobID int
     163             :         // Reason is the reason for the flush.
     164             :         Reason string
     165             :         // Input contains the count of input memtables that were flushed.
     166             :         Input int
     167             :         // InputBytes contains the total in-memory size of the memtable(s) that were
     168             :         // flushed. This size includes skiplist indexing data structures.
     169             :         InputBytes uint64
     170             :         // Output contains the ouptut table generated by the flush. The output info
     171             :         // is empty for the flush begin event.
     172             :         Output []TableInfo
     173             :         // Duration is the time spent flushing. This duration includes writing and
     174             :         // syncing all of the flushed keys to sstables.
     175             :         Duration time.Duration
     176             :         // TotalDuration is the total wall-time duration of the flush, including
     177             :         // applying the flush to the database. TotalDuration is always ≥ Duration.
     178             :         TotalDuration time.Duration
     179             :         // Ingest is set to true if the flush is handling tables that were added to
     180             :         // the flushable queue via an ingestion operation.
     181             :         Ingest bool
     182             :         // IngestLevels are the output levels for each ingested table in the flush.
     183             :         // This field is only populated when Ingest is true.
     184             :         IngestLevels []int
     185             :         Done         bool
     186             :         Err          error
     187             : }
     188             : 
     189           1 : func (i FlushInfo) String() string {
     190           1 :         return redact.StringWithoutMarkers(i)
     191           1 : }
     192             : 
     193             : // SafeFormat implements redact.SafeFormatter.
     194           1 : func (i FlushInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     195           1 :         if i.Err != nil {
     196           1 :                 w.Printf("[JOB %d] flush error: %s", redact.Safe(i.JobID), i.Err)
     197           1 :                 return
     198           1 :         }
     199             : 
     200           1 :         plural := redact.SafeString("s")
     201           1 :         if i.Input == 1 {
     202           1 :                 plural = ""
     203           1 :         }
     204           1 :         if !i.Done {
     205           1 :                 w.Printf("[JOB %d] ", redact.Safe(i.JobID))
     206           1 :                 if !i.Ingest {
     207           1 :                         w.Printf("flushing %d memtable", redact.Safe(i.Input))
     208           1 :                         w.SafeString(plural)
     209           1 :                         w.Printf(" (%s) to L0", redact.Safe(humanize.Bytes.Uint64(i.InputBytes)))
     210           1 :                 } else {
     211           1 :                         w.Printf("flushing %d ingested table%s", redact.Safe(i.Input), plural)
     212           1 :                 }
     213           1 :                 return
     214             :         }
     215             : 
     216           1 :         outputSize := tablesTotalSize(i.Output)
     217           1 :         if !i.Ingest {
     218           1 :                 if invariants.Enabled && len(i.IngestLevels) > 0 {
     219           0 :                         panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) == 0"))
     220             :                 }
     221           1 :                 w.Printf("[JOB %d] flushed %d memtable%s (%s) to L0 [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
     222           1 :                         redact.Safe(i.JobID), redact.Safe(i.Input), plural,
     223           1 :                         redact.Safe(humanize.Bytes.Uint64(i.InputBytes)),
     224           1 :                         redact.Safe(formatFileNums(i.Output)),
     225           1 :                         redact.Safe(humanize.Bytes.Uint64(outputSize)),
     226           1 :                         redact.Safe(i.Duration.Seconds()),
     227           1 :                         redact.Safe(i.TotalDuration.Seconds()),
     228           1 :                         redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     229           1 :         } else {
     230           1 :                 if invariants.Enabled && len(i.IngestLevels) == 0 {
     231           0 :                         panic(errors.AssertionFailedf("pebble: expected len(IngestedLevels) > 0"))
     232             :                 }
     233           1 :                 w.Printf("[JOB %d] flushed %d ingested flushable%s",
     234           1 :                         redact.Safe(i.JobID), redact.Safe(len(i.Output)), plural)
     235           1 :                 for j, level := range i.IngestLevels {
     236           1 :                         file := i.Output[j]
     237           1 :                         if j > 0 {
     238           1 :                                 w.Printf(" +")
     239           1 :                         }
     240           1 :                         w.Printf(" L%d:%s (%s)", level, file.FileNum, humanize.Bytes.Uint64(file.Size))
     241             :                 }
     242           1 :                 w.Printf(" in %.1fs (%.1fs total), output rate %s/s",
     243           1 :                         redact.Safe(i.Duration.Seconds()),
     244           1 :                         redact.Safe(i.TotalDuration.Seconds()),
     245           1 :                         redact.Safe(humanize.Bytes.Uint64(uint64(float64(outputSize)/i.Duration.Seconds()))))
     246             :         }
     247             : }
     248             : 
     249             : // DownloadInfo contains the info for a DB.Download() event.
     250             : type DownloadInfo struct {
     251             :         // JobID is the ID of the download job.
     252             :         JobID int
     253             : 
     254             :         Spans []DownloadSpan
     255             : 
     256             :         // Duration is the time since the operation was started.
     257             :         Duration                    time.Duration
     258             :         DownloadCompactionsLaunched int
     259             : 
     260             :         // RestartCount indicates that the download operation restarted because it
     261             :         // noticed that new external files were ingested. A DownloadBegin event with
     262             :         // RestartCount = 0 is the start of the operation; each time we restart it we
     263             :         // have another DownloadBegin event with RestartCount > 0.
     264             :         RestartCount int
     265             :         Done         bool
     266             :         Err          error
     267             : }
     268             : 
     269           1 : func (i DownloadInfo) String() string {
     270           1 :         return redact.StringWithoutMarkers(i)
     271           1 : }
     272             : 
     273             : // SafeFormat implements redact.SafeFormatter.
     274           1 : func (i DownloadInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     275           1 :         switch {
     276           0 :         case i.Err != nil:
     277           0 :                 w.Printf("[JOB %d] download error after %1.fs: %s", redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), i.Err)
     278             : 
     279           1 :         case i.Done:
     280           1 :                 w.Printf("[JOB %d] download finished in %.1fs (launched %d compactions)",
     281           1 :                         redact.Safe(i.JobID), redact.Safe(i.Duration.Seconds()), redact.Safe(i.DownloadCompactionsLaunched))
     282             : 
     283           1 :         default:
     284           1 :                 if i.RestartCount == 0 {
     285           1 :                         w.Printf("[JOB %d] starting download for %d spans", redact.Safe(i.JobID), redact.Safe(len(i.Spans)))
     286           1 :                 } else {
     287           0 :                         w.Printf("[JOB %d] restarting download (restart #%d, time so far %.1fs, launched %d compactions)",
     288           0 :                                 redact.Safe(i.JobID), redact.Safe(i.RestartCount), redact.Safe(i.Duration.Seconds()),
     289           0 :                                 redact.Safe(i.DownloadCompactionsLaunched))
     290           0 :                 }
     291             :         }
     292             : }
     293             : 
     294             : // ManifestCreateInfo contains info about a manifest creation event.
     295             : type ManifestCreateInfo struct {
     296             :         // JobID is the ID of the job the caused the manifest to be created.
     297             :         JobID int
     298             :         Path  string
     299             :         // The file number of the new Manifest.
     300             :         FileNum base.DiskFileNum
     301             :         Err     error
     302             : }
     303             : 
     304           1 : func (i ManifestCreateInfo) String() string {
     305           1 :         return redact.StringWithoutMarkers(i)
     306           1 : }
     307             : 
     308             : // SafeFormat implements redact.SafeFormatter.
     309           1 : func (i ManifestCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     310           1 :         if i.Err != nil {
     311           0 :                 w.Printf("[JOB %d] MANIFEST create error: %s", redact.Safe(i.JobID), i.Err)
     312           0 :                 return
     313           0 :         }
     314           1 :         w.Printf("[JOB %d] MANIFEST created %s", redact.Safe(i.JobID), i.FileNum)
     315             : }
     316             : 
     317             : // ManifestDeleteInfo contains the info for a Manifest deletion event.
     318             : type ManifestDeleteInfo struct {
     319             :         // JobID is the ID of the job the caused the Manifest to be deleted.
     320             :         JobID   int
     321             :         Path    string
     322             :         FileNum base.DiskFileNum
     323             :         Err     error
     324             : }
     325             : 
     326           1 : func (i ManifestDeleteInfo) String() string {
     327           1 :         return redact.StringWithoutMarkers(i)
     328           1 : }
     329             : 
     330             : // SafeFormat implements redact.SafeFormatter.
     331           1 : func (i ManifestDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     332           1 :         if i.Err != nil {
     333           0 :                 w.Printf("[JOB %d] MANIFEST delete error: %s", redact.Safe(i.JobID), i.Err)
     334           0 :                 return
     335           0 :         }
     336           1 :         w.Printf("[JOB %d] MANIFEST deleted %s", redact.Safe(i.JobID), i.FileNum)
     337             : }
     338             : 
     339             : // TableCreateInfo contains the info for a table creation event.
     340             : type TableCreateInfo struct {
     341             :         JobID int
     342             :         // Reason is the reason for the table creation: "compacting", "flushing", or
     343             :         // "ingesting".
     344             :         Reason  string
     345             :         Path    string
     346             :         FileNum base.DiskFileNum
     347             : }
     348             : 
     349           1 : func (i TableCreateInfo) String() string {
     350           1 :         return redact.StringWithoutMarkers(i)
     351           1 : }
     352             : 
     353             : // SafeFormat implements redact.SafeFormatter.
     354           1 : func (i TableCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     355           1 :         w.Printf("[JOB %d] %s: sstable created %s",
     356           1 :                 redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
     357           1 : }
     358             : 
     359             : // TableDeleteInfo contains the info for a table deletion event.
     360             : type TableDeleteInfo struct {
     361             :         JobID   int
     362             :         Path    string
     363             :         FileNum base.DiskFileNum
     364             :         Err     error
     365             : }
     366             : 
     367           1 : func (i TableDeleteInfo) String() string {
     368           1 :         return redact.StringWithoutMarkers(i)
     369           1 : }
     370             : 
     371             : // SafeFormat implements redact.SafeFormatter.
     372           1 : func (i TableDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     373           1 :         if i.Err != nil {
     374           0 :                 w.Printf("[JOB %d] sstable delete error %s: %s",
     375           0 :                         redact.Safe(i.JobID), i.FileNum, i.Err)
     376           0 :                 return
     377           0 :         }
     378           1 :         w.Printf("[JOB %d] sstable deleted %s", redact.Safe(i.JobID), i.FileNum)
     379             : }
     380             : 
     381             : // TableIngestInfo contains the info for a table ingestion event.
     382             : type TableIngestInfo struct {
     383             :         // JobID is the ID of the job the caused the table to be ingested.
     384             :         JobID  int
     385             :         Tables []struct {
     386             :                 TableInfo
     387             :                 Level int
     388             :         }
     389             :         // GlobalSeqNum is the sequence number that was assigned to all entries in
     390             :         // the ingested table.
     391             :         GlobalSeqNum base.SeqNum
     392             :         // flushable indicates whether the ingested sstable was treated as a
     393             :         // flushable.
     394             :         flushable bool
     395             :         Err       error
     396             : }
     397             : 
     398           1 : func (i TableIngestInfo) String() string {
     399           1 :         return redact.StringWithoutMarkers(i)
     400           1 : }
     401             : 
     402             : // SafeFormat implements redact.SafeFormatter.
     403           1 : func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     404           1 :         if i.Err != nil {
     405           0 :                 w.Printf("[JOB %d] ingest error: %s", redact.Safe(i.JobID), i.Err)
     406           0 :                 return
     407           0 :         }
     408             : 
     409           1 :         if i.flushable {
     410           1 :                 w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
     411           1 :         } else {
     412           1 :                 w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
     413           1 :         }
     414             : 
     415           1 :         for j := range i.Tables {
     416           1 :                 t := &i.Tables[j]
     417           1 :                 if j > 0 {
     418           1 :                         w.Printf(",")
     419           1 :                 }
     420           1 :                 levelStr := ""
     421           1 :                 if !i.flushable {
     422           1 :                         levelStr = fmt.Sprintf("L%d:", t.Level)
     423           1 :                 }
     424           1 :                 w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
     425           1 :                         redact.Safe(humanize.Bytes.Uint64(t.Size)))
     426             :         }
     427             : }
     428             : 
     429             : // TableStatsInfo contains the info for a table stats loaded event.
     430             : type TableStatsInfo struct {
     431             :         // JobID is the ID of the job that finished loading the initial tables'
     432             :         // stats.
     433             :         JobID int
     434             : }
     435             : 
     436           1 : func (i TableStatsInfo) String() string {
     437           1 :         return redact.StringWithoutMarkers(i)
     438           1 : }
     439             : 
     440             : // SafeFormat implements redact.SafeFormatter.
     441           1 : func (i TableStatsInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     442           1 :         w.Printf("[JOB %d] all initial table stats loaded", redact.Safe(i.JobID))
     443           1 : }
     444             : 
     445             : // TableValidatedInfo contains information on the result of a validation run
     446             : // on an sstable.
     447             : type TableValidatedInfo struct {
     448             :         JobID int
     449             :         Meta  *fileMetadata
     450             : }
     451             : 
     452           1 : func (i TableValidatedInfo) String() string {
     453           1 :         return redact.StringWithoutMarkers(i)
     454           1 : }
     455             : 
     456             : // SafeFormat implements redact.SafeFormatter.
     457           1 : func (i TableValidatedInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     458           1 :         w.Printf("[JOB %d] validated table: %s", redact.Safe(i.JobID), i.Meta)
     459           1 : }
     460             : 
     461             : // WALCreateInfo contains info about a WAL creation event.
     462             : type WALCreateInfo struct {
     463             :         // JobID is the ID of the job the caused the WAL to be created.
     464             :         JobID int
     465             :         Path  string
     466             :         // The file number of the new WAL.
     467             :         FileNum base.DiskFileNum
     468             :         // The file number of a previous WAL which was recycled to create this
     469             :         // one. Zero if recycling did not take place.
     470             :         RecycledFileNum base.DiskFileNum
     471             :         Err             error
     472             : }
     473             : 
     474           1 : func (i WALCreateInfo) String() string {
     475           1 :         return redact.StringWithoutMarkers(i)
     476           1 : }
     477             : 
     478             : // SafeFormat implements redact.SafeFormatter.
     479           1 : func (i WALCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     480           1 :         if i.Err != nil {
     481           0 :                 w.Printf("[JOB %d] WAL create error: %s", redact.Safe(i.JobID), i.Err)
     482           0 :                 return
     483           0 :         }
     484             : 
     485           1 :         if i.RecycledFileNum == 0 {
     486           1 :                 w.Printf("[JOB %d] WAL created %s", redact.Safe(i.JobID), i.FileNum)
     487           1 :                 return
     488           1 :         }
     489             : 
     490           0 :         w.Printf("[JOB %d] WAL created %s (recycled %s)",
     491           0 :                 redact.Safe(i.JobID), i.FileNum, i.RecycledFileNum)
     492             : }
     493             : 
     494             : // WALDeleteInfo contains the info for a WAL deletion event.
     495             : //
     496             : // TODO(sumeer): extend WALDeleteInfo for the failover case in case the path
     497             : // is insufficient to infer whether primary or secondary.
     498             : type WALDeleteInfo struct {
     499             :         // JobID is the ID of the job the caused the WAL to be deleted.
     500             :         JobID   int
     501             :         Path    string
     502             :         FileNum base.DiskFileNum
     503             :         Err     error
     504             : }
     505             : 
     506           1 : func (i WALDeleteInfo) String() string {
     507           1 :         return redact.StringWithoutMarkers(i)
     508           1 : }
     509             : 
     510             : // SafeFormat implements redact.SafeFormatter.
     511           1 : func (i WALDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     512           1 :         if i.Err != nil {
     513           0 :                 w.Printf("[JOB %d] WAL delete error: %s", redact.Safe(i.JobID), i.Err)
     514           0 :                 return
     515           0 :         }
     516           1 :         w.Printf("[JOB %d] WAL deleted %s", redact.Safe(i.JobID), i.FileNum)
     517             : }
     518             : 
     519             : // WriteStallBeginInfo contains the info for a write stall begin event.
     520             : type WriteStallBeginInfo struct {
     521             :         Reason string
     522             : }
     523             : 
     524           1 : func (i WriteStallBeginInfo) String() string {
     525           1 :         return redact.StringWithoutMarkers(i)
     526           1 : }
     527             : 
     528             : // SafeFormat implements redact.SafeFormatter.
     529           1 : func (i WriteStallBeginInfo) SafeFormat(w redact.SafePrinter, _ rune) {
     530           1 :         w.Printf("write stall beginning: %s", redact.Safe(i.Reason))
     531           1 : }
     532             : 
     533             : // EventListener contains a set of functions that will be invoked when various
     534             : // significant DB events occur. Note that the functions should not run for an
     535             : // excessive amount of time as they are invoked synchronously by the DB and may
     536             : // block continued DB work. For a similar reason it is advisable to not perform
     537             : // any synchronous calls back into the DB.
     538             : type EventListener struct {
     539             :         // BackgroundError is invoked whenever an error occurs during a background
     540             :         // operation such as flush or compaction.
     541             :         BackgroundError func(error)
     542             : 
     543             :         // CompactionBegin is invoked after the inputs to a compaction have been
     544             :         // determined, but before the compaction has produced any output.
     545             :         CompactionBegin func(CompactionInfo)
     546             : 
     547             :         // CompactionEnd is invoked after a compaction has completed and the result
     548             :         // has been installed.
     549             :         CompactionEnd func(CompactionInfo)
     550             : 
     551             :         // DiskSlow is invoked after a disk write operation on a file created with a
     552             :         // disk health checking vfs.FS (see vfs.DefaultWithDiskHealthChecks) is
     553             :         // observed to exceed the specified disk slowness threshold duration. DiskSlow
     554             :         // is called on a goroutine that is monitoring slowness/stuckness. The callee
     555             :         // MUST return without doing any IO, or blocking on anything (like a mutex)
     556             :         // that is waiting on IO. This is imperative in order to reliably monitor for
     557             :         // slowness, since if this goroutine gets stuck, the monitoring will stop
     558             :         // working.
     559             :         DiskSlow func(DiskSlowInfo)
     560             : 
     561             :         // FlushBegin is invoked after the inputs to a flush have been determined,
     562             :         // but before the flush has produced any output.
     563             :         FlushBegin func(FlushInfo)
     564             : 
     565             :         // FlushEnd is invoked after a flush has complated and the result has been
     566             :         // installed.
     567             :         FlushEnd func(FlushInfo)
     568             : 
     569             :         // DownloadBegin is invoked when a db.Download operation starts or restarts
     570             :         // (restarts are caused by new external tables being ingested during the
     571             :         // operation).
     572             :         DownloadBegin func(DownloadInfo)
     573             : 
     574             :         // DownloadEnd is invoked when a db.Download operation completes.
     575             :         DownloadEnd func(DownloadInfo)
     576             : 
     577             :         // FormatUpgrade is invoked after the database's FormatMajorVersion
     578             :         // is upgraded.
     579             :         FormatUpgrade func(FormatMajorVersion)
     580             : 
     581             :         // ManifestCreated is invoked after a manifest has been created.
     582             :         ManifestCreated func(ManifestCreateInfo)
     583             : 
     584             :         // ManifestDeleted is invoked after a manifest has been deleted.
     585             :         ManifestDeleted func(ManifestDeleteInfo)
     586             : 
     587             :         // TableCreated is invoked when a table has been created.
     588             :         TableCreated func(TableCreateInfo)
     589             : 
     590             :         // TableDeleted is invoked after a table has been deleted.
     591             :         TableDeleted func(TableDeleteInfo)
     592             : 
     593             :         // TableIngested is invoked after an externally created table has been
     594             :         // ingested via a call to DB.Ingest().
     595             :         TableIngested func(TableIngestInfo)
     596             : 
     597             :         // TableStatsLoaded is invoked at most once, when the table stats
     598             :         // collector has loaded statistics for all tables that existed at Open.
     599             :         TableStatsLoaded func(TableStatsInfo)
     600             : 
     601             :         // TableValidated is invoked after validation runs on an sstable.
     602             :         TableValidated func(TableValidatedInfo)
     603             : 
     604             :         // WALCreated is invoked after a WAL has been created.
     605             :         WALCreated func(WALCreateInfo)
     606             : 
     607             :         // WALDeleted is invoked after a WAL has been deleted.
     608             :         WALDeleted func(WALDeleteInfo)
     609             : 
     610             :         // WriteStallBegin is invoked when writes are intentionally delayed.
     611             :         WriteStallBegin func(WriteStallBeginInfo)
     612             : 
     613             :         // WriteStallEnd is invoked when delayed writes are released.
     614             :         WriteStallEnd func()
     615             : }
     616             : 
     617             : // EnsureDefaults ensures that background error events are logged to the
     618             : // specified logger if a handler for those events hasn't been otherwise
     619             : // specified. Ensure all handlers are non-nil so that we don't have to check
     620             : // for nil-ness before invoking.
     621           1 : func (l *EventListener) EnsureDefaults(logger Logger) {
     622           1 :         if l.BackgroundError == nil {
     623           1 :                 if logger != nil {
     624           1 :                         l.BackgroundError = func(err error) {
     625           0 :                                 logger.Errorf("background error: %s", err)
     626           0 :                         }
     627           0 :                 } else {
     628           0 :                         l.BackgroundError = func(error) {}
     629             :                 }
     630             :         }
     631           1 :         if l.CompactionBegin == nil {
     632           1 :                 l.CompactionBegin = func(info CompactionInfo) {}
     633             :         }
     634           1 :         if l.CompactionEnd == nil {
     635           1 :                 l.CompactionEnd = func(info CompactionInfo) {}
     636             :         }
     637           1 :         if l.DiskSlow == nil {
     638           1 :                 l.DiskSlow = func(info DiskSlowInfo) {}
     639             :         }
     640           1 :         if l.FlushBegin == nil {
     641           1 :                 l.FlushBegin = func(info FlushInfo) {}
     642             :         }
     643           1 :         if l.FlushEnd == nil {
     644           1 :                 l.FlushEnd = func(info FlushInfo) {}
     645             :         }
     646           1 :         if l.DownloadBegin == nil {
     647           1 :                 l.DownloadBegin = func(info DownloadInfo) {}
     648             :         }
     649           1 :         if l.DownloadEnd == nil {
     650           1 :                 l.DownloadEnd = func(info DownloadInfo) {}
     651             :         }
     652           1 :         if l.FormatUpgrade == nil {
     653           1 :                 l.FormatUpgrade = func(v FormatMajorVersion) {}
     654             :         }
     655           1 :         if l.ManifestCreated == nil {
     656           1 :                 l.ManifestCreated = func(info ManifestCreateInfo) {}
     657             :         }
     658           1 :         if l.ManifestDeleted == nil {
     659           1 :                 l.ManifestDeleted = func(info ManifestDeleteInfo) {}
     660             :         }
     661           1 :         if l.TableCreated == nil {
     662           1 :                 l.TableCreated = func(info TableCreateInfo) {}
     663             :         }
     664           1 :         if l.TableDeleted == nil {
     665           1 :                 l.TableDeleted = func(info TableDeleteInfo) {}
     666             :         }
     667           1 :         if l.TableIngested == nil {
     668           1 :                 l.TableIngested = func(info TableIngestInfo) {}
     669             :         }
     670           1 :         if l.TableStatsLoaded == nil {
     671           1 :                 l.TableStatsLoaded = func(info TableStatsInfo) {}
     672             :         }
     673           1 :         if l.TableValidated == nil {
     674           1 :                 l.TableValidated = func(validated TableValidatedInfo) {}
     675             :         }
     676           1 :         if l.WALCreated == nil {
     677           1 :                 l.WALCreated = func(info WALCreateInfo) {}
     678             :         }
     679           1 :         if l.WALDeleted == nil {
     680           1 :                 l.WALDeleted = func(info WALDeleteInfo) {}
     681             :         }
     682           1 :         if l.WriteStallBegin == nil {
     683           1 :                 l.WriteStallBegin = func(info WriteStallBeginInfo) {}
     684             :         }
     685           1 :         if l.WriteStallEnd == nil {
     686           1 :                 l.WriteStallEnd = func() {}
     687             :         }
     688             : }
     689             : 
     690             : // MakeLoggingEventListener creates an EventListener that logs all events to the
     691             : // specified logger.
     692           1 : func MakeLoggingEventListener(logger Logger) EventListener {
     693           1 :         if logger == nil {
     694           0 :                 logger = DefaultLogger
     695           0 :         }
     696             : 
     697           1 :         return EventListener{
     698           1 :                 BackgroundError: func(err error) {
     699           0 :                         logger.Errorf("background error: %s", err)
     700           0 :                 },
     701           1 :                 CompactionBegin: func(info CompactionInfo) {
     702           1 :                         logger.Infof("%s", info)
     703           1 :                 },
     704           0 :                 CompactionEnd: func(info CompactionInfo) {
     705           0 :                         logger.Infof("%s", info)
     706           0 :                 },
     707           0 :                 DiskSlow: func(info DiskSlowInfo) {
     708           0 :                         logger.Infof("%s", info)
     709           0 :                 },
     710           1 :                 FlushBegin: func(info FlushInfo) {
     711           1 :                         logger.Infof("%s", info)
     712           1 :                 },
     713           0 :                 FlushEnd: func(info FlushInfo) {
     714           0 :                         logger.Infof("%s", info)
     715           0 :                 },
     716           1 :                 DownloadBegin: func(info DownloadInfo) {
     717           1 :                         logger.Infof("%s", info)
     718           1 :                 },
     719           0 :                 DownloadEnd: func(info DownloadInfo) {
     720           0 :                         logger.Infof("%s", info)
     721           0 :                 },
     722           1 :                 FormatUpgrade: func(v FormatMajorVersion) {
     723           1 :                         logger.Infof("upgraded to format version: %s", v)
     724           1 :                 },
     725           0 :                 ManifestCreated: func(info ManifestCreateInfo) {
     726           0 :                         logger.Infof("%s", info)
     727           0 :                 },
     728           0 :                 ManifestDeleted: func(info ManifestDeleteInfo) {
     729           0 :                         logger.Infof("%s", info)
     730           0 :                 },
     731           1 :                 TableCreated: func(info TableCreateInfo) {
     732           1 :                         logger.Infof("%s", info)
     733           1 :                 },
     734           0 :                 TableDeleted: func(info TableDeleteInfo) {
     735           0 :                         logger.Infof("%s", info)
     736           0 :                 },
     737           0 :                 TableIngested: func(info TableIngestInfo) {
     738           0 :                         logger.Infof("%s", info)
     739           0 :                 },
     740           1 :                 TableStatsLoaded: func(info TableStatsInfo) {
     741           1 :                         logger.Infof("%s", info)
     742           1 :                 },
     743           1 :                 TableValidated: func(info TableValidatedInfo) {
     744           1 :                         logger.Infof("%s", info)
     745           1 :                 },
     746           0 :                 WALCreated: func(info WALCreateInfo) {
     747           0 :                         logger.Infof("%s", info)
     748           0 :                 },
     749           0 :                 WALDeleted: func(info WALDeleteInfo) {
     750           0 :                         logger.Infof("%s", info)
     751           0 :                 },
     752           1 :                 WriteStallBegin: func(info WriteStallBeginInfo) {
     753           1 :                         logger.Infof("%s", info)
     754           1 :                 },
     755           1 :                 WriteStallEnd: func() {
     756           1 :                         logger.Infof("write stall ending")
     757           1 :                 },
     758             :         }
     759             : }
     760             : 
     761             : // TeeEventListener wraps two EventListeners, forwarding all events to both.
     762           0 : func TeeEventListener(a, b EventListener) EventListener {
     763           0 :         a.EnsureDefaults(nil)
     764           0 :         b.EnsureDefaults(nil)
     765           0 :         return EventListener{
     766           0 :                 BackgroundError: func(err error) {
     767           0 :                         a.BackgroundError(err)
     768           0 :                         b.BackgroundError(err)
     769           0 :                 },
     770           0 :                 CompactionBegin: func(info CompactionInfo) {
     771           0 :                         a.CompactionBegin(info)
     772           0 :                         b.CompactionBegin(info)
     773           0 :                 },
     774           0 :                 CompactionEnd: func(info CompactionInfo) {
     775           0 :                         a.CompactionEnd(info)
     776           0 :                         b.CompactionEnd(info)
     777           0 :                 },
     778           0 :                 DiskSlow: func(info DiskSlowInfo) {
     779           0 :                         a.DiskSlow(info)
     780           0 :                         b.DiskSlow(info)
     781           0 :                 },
     782           0 :                 FlushBegin: func(info FlushInfo) {
     783           0 :                         a.FlushBegin(info)
     784           0 :                         b.FlushBegin(info)
     785           0 :                 },
     786           0 :                 FlushEnd: func(info FlushInfo) {
     787           0 :                         a.FlushEnd(info)
     788           0 :                         b.FlushEnd(info)
     789           0 :                 },
     790           0 :                 DownloadBegin: func(info DownloadInfo) {
     791           0 :                         a.DownloadBegin(info)
     792           0 :                         b.DownloadBegin(info)
     793           0 :                 },
     794           0 :                 DownloadEnd: func(info DownloadInfo) {
     795           0 :                         a.DownloadEnd(info)
     796           0 :                         b.DownloadEnd(info)
     797           0 :                 },
     798           0 :                 FormatUpgrade: func(v FormatMajorVersion) {
     799           0 :                         a.FormatUpgrade(v)
     800           0 :                         b.FormatUpgrade(v)
     801           0 :                 },
     802           0 :                 ManifestCreated: func(info ManifestCreateInfo) {
     803           0 :                         a.ManifestCreated(info)
     804           0 :                         b.ManifestCreated(info)
     805           0 :                 },
     806           0 :                 ManifestDeleted: func(info ManifestDeleteInfo) {
     807           0 :                         a.ManifestDeleted(info)
     808           0 :                         b.ManifestDeleted(info)
     809           0 :                 },
     810           0 :                 TableCreated: func(info TableCreateInfo) {
     811           0 :                         a.TableCreated(info)
     812           0 :                         b.TableCreated(info)
     813           0 :                 },
     814           0 :                 TableDeleted: func(info TableDeleteInfo) {
     815           0 :                         a.TableDeleted(info)
     816           0 :                         b.TableDeleted(info)
     817           0 :                 },
     818           0 :                 TableIngested: func(info TableIngestInfo) {
     819           0 :                         a.TableIngested(info)
     820           0 :                         b.TableIngested(info)
     821           0 :                 },
     822           0 :                 TableStatsLoaded: func(info TableStatsInfo) {
     823           0 :                         a.TableStatsLoaded(info)
     824           0 :                         b.TableStatsLoaded(info)
     825           0 :                 },
     826           0 :                 TableValidated: func(info TableValidatedInfo) {
     827           0 :                         a.TableValidated(info)
     828           0 :                         b.TableValidated(info)
     829           0 :                 },
     830           0 :                 WALCreated: func(info WALCreateInfo) {
     831           0 :                         a.WALCreated(info)
     832           0 :                         b.WALCreated(info)
     833           0 :                 },
     834           0 :                 WALDeleted: func(info WALDeleteInfo) {
     835           0 :                         a.WALDeleted(info)
     836           0 :                         b.WALDeleted(info)
     837           0 :                 },
     838           0 :                 WriteStallBegin: func(info WriteStallBeginInfo) {
     839           0 :                         a.WriteStallBegin(info)
     840           0 :                         b.WriteStallBegin(info)
     841           0 :                 },
     842           0 :                 WriteStallEnd: func() {
     843           0 :                         a.WriteStallEnd()
     844           0 :                         b.WriteStallEnd()
     845           0 :                 },
     846             :         }
     847             : }

Generated by: LCOV version 1.14