LCOV - code coverage report
Current view: top level - pebble/tool/logs - compaction.go (source / functions) Hit Total Coverage
Test: 2024-07-21 08:15Z 72c3f550 - tests only.lcov Lines: 544 694 78.4 %
Date: 2024-07-21 08:16:21 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package logs
       6             : 
       7             : import (
       8             :         "bufio"
       9             :         "bytes"
      10             :         "cmp"
      11             :         "fmt"
      12             :         "math"
      13             :         "os"
      14             :         "path/filepath"
      15             :         "regexp"
      16             :         "slices"
      17             :         "sort"
      18             :         "strconv"
      19             :         "strings"
      20             :         "time"
      21             : 
      22             :         "github.com/cockroachdb/errors"
      23             :         "github.com/cockroachdb/pebble/internal/humanize"
      24             :         "github.com/cockroachdb/pebble/internal/manifest"
      25             :         "github.com/spf13/cobra"
      26             : )
      27             : 
      28             : const numLevels = manifest.NumLevels
      29             : 
      30             : var (
      31             :         // Captures a common logging prefix that can be used as the context for the
      32             :         // surrounding information captured by other expressions. Example:
      33             :         //
      34             :         //   I211215 14:26:56.012382 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1845 ⋮ [T1,n5,pebble,s5] ...
      35             :         //
      36             :         logContextPattern = regexp.MustCompile(
      37             :                 `^.*` +
      38             :                         /* Timestamp        */ `(?P<timestamp>\d{6} \d{2}:\d{2}:\d{2}.\d{6}).*` +
      39             :                         /* Node / Store     */ `\[(T(\d+|\?),)?n(?P<node>\d+|\?).*,s(?P<store>\d+|\?).*?\].*`,
      40             :         )
      41             :         logContextPatternTimestampIdx = logContextPattern.SubexpIndex("timestamp")
      42             :         logContextPatternNodeIdx      = logContextPattern.SubexpIndex("node")
      43             :         logContextPatternStoreIdx     = logContextPattern.SubexpIndex("store")
      44             : 
      45             :         // Matches either a compaction or a memtable flush log line.
      46             :         //
      47             :         // A compaction start / end line resembles:
      48             :         //   "[JOB X] compact(ed|ing)"
      49             :         //
      50             :         // A memtable flush start / end line resembles:
      51             :         //   "[JOB X] flush(ed|ing)"
      52             :         //
      53             :         // An ingested sstable flush looks like:
      54             :         //   "[JOB 226] flushed 6 ingested flushables"
      55             :         sentinelPattern          = regexp.MustCompile(`\[JOB.*(?P<prefix>compact|flush|ingest)(?P<suffix>ed|ing)[^:]`)
      56             :         sentinelPatternPrefixIdx = sentinelPattern.SubexpIndex("prefix")
      57             :         sentinelPatternSuffixIdx = sentinelPattern.SubexpIndex("suffix")
      58             : 
      59             :         // Example compaction start and end log lines:
      60             :         // 23.1 and older:
      61             :         //   I211215 14:26:56.012382 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1845 ⋮ [n5,pebble,s5] 1216510  [JOB 284925] compacting(default) L2 [442555] (4.2 M) + L3 [445853] (8.4 M)
      62             :         //   I211215 14:26:56.318543 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1886 ⋮ [n5,pebble,s5] 1216554  [JOB 284925] compacted(default) L2 [442555] (4.2 M) + L3 [445853] (8.4 M) -> L3 [445883 445887] (13 M), in 0.3s, output rate 42 M/s
      63             :         // current:
      64             :         //   I211215 14:26:56.012382 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1845 ⋮ [n5,pebble,s5] 1216510  [JOB 284925] compacting(default) L2 [442555] (4.2MB) + L3 [445853] (8.4MB)
      65             :         //   I211215 14:26:56.318543 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1886 ⋮ [n5,pebble,s5] 1216554  [JOB 284925] compacted(default) L2 [442555] (4.2MB) + L3 [445853] (8.4MB) -> L3 [445883 445887] (13MB), in 0.3s, output rate 42MB/s
      66             :         //
      67             :         // NOTE: we use the log timestamp to compute the compaction duration rather
      68             :         // than the Pebble log output.
      69             :         compactionPattern = regexp.MustCompile(
      70             :                 `^.*` +
      71             :                         /* Job ID            */ `\[JOB (?P<job>\d+)]\s` +
      72             :                         /* Start / end       */ `compact(?P<suffix>ed|ing)` +
      73             : 
      74             :                         /* Compaction type   */
      75             :                         `\((?P<type>.*?)\)\s` +
      76             :                         /* Optional annotation*/ `?(\s*\[(?P<annotations>.*?)\]\s*)?` +
      77             : 
      78             :                         /* Start / end level */
      79             :                         `(?P<levels>L(?P<from>\d).*?(?:.*(?:\+|->)\sL(?P<to>\d))?` +
      80             :                         /* Bytes             */
      81             :                         `(?:.*?\((?P<bytes>[0-9.]+( [BKMGTPE]|[KMGTPE]?B))\))` +
      82             :                         /* Score */
      83             :                         `?(\s*(Score=\d+(\.\d+)))?)`,
      84             :         )
      85             :         compactionPatternJobIdx    = compactionPattern.SubexpIndex("job")
      86             :         compactionPatternSuffixIdx = compactionPattern.SubexpIndex("suffix")
      87             :         compactionPatternTypeIdx   = compactionPattern.SubexpIndex("type")
      88             :         compactionPatternLevels    = compactionPattern.SubexpIndex("levels")
      89             :         compactionPatternFromIdx   = compactionPattern.SubexpIndex("from")
      90             :         compactionPatternToIdx     = compactionPattern.SubexpIndex("to")
      91             :         compactionPatternBytesIdx  = compactionPattern.SubexpIndex("bytes")
      92             : 
      93             :         // Example memtable flush log lines:
      94             :         // 23.1 and older:
      95             :         //   I211213 16:23:48.903751 21136 3@vendor/github.com/cockroachdb/pebble/event.go:599 ⋮ [n9,pebble,s9] 24 [JOB 10] flushing 2 memtables to L0
      96             :         //   I211213 16:23:49.134464 21136 3@vendor/github.com/cockroachdb/pebble/event.go:603 ⋮ [n9,pebble,s9] 26 [JOB 10] flushed 2 memtables to L0 [1535806] (1.3 M), in 0.2s, output rate 5.8 M/s
      97             :         // current:
      98             :         //   I211213 16:23:48.903751 21136
      99             :         //   3@vendor/github.com/cockroachdb/pebble/event.go:599 ⋮ [n9,pebble,s9] 24 [JOB 10] flushing 2 memtables (1.4MB)  to L0
     100             :         //   I211213 16:23:49.134464 21136
     101             :         //   3@vendor/github.com/cockroachdb/pebble/event.go:603 ⋮ [n9,pebble,s9] 26 [JOB 10] flushed 2 memtables (1.4MB) to L0 [1535806] (1.3MB), in 0.2s, output rate 5.8MB/s
     102             :         //
     103             :         // NOTE: we use the log timestamp to compute the flush duration rather than
     104             :         // the Pebble log output.
     105             :         flushPattern = regexp.MustCompile(
     106             :                 `^..*` +
     107             :                         /* Job ID                       */ `\[JOB (?P<job>\d+)]\s` +
     108             :                         /* Compaction type              */ `flush(?P<suffix>ed|ing)\s` +
     109             :                         /* Memtable count; size (23.2+) */ `\d+ memtables? (\([^)]+\))?` +
     110             :                         /* SSTable Bytes                */ `(?:.*?\((?P<bytes>[0-9.]+( [BKMGTPE]|[KMGTPE]?B))\))?`,
     111             :         )
     112             :         flushPatternSuffixIdx = flushPattern.SubexpIndex("suffix")
     113             :         flushPatternJobIdx    = flushPattern.SubexpIndex("job")
     114             :         flushPatternBytesIdx  = flushPattern.SubexpIndex("bytes")
     115             : 
     116             :         // Example ingested log lines:
     117             :         // 23.1 and older:
     118             :         //   I220228 16:01:22.487906 18476248525 3@vendor/github.com/cockroachdb/pebble/ingest.go:637 ⋮ [n24,pebble,s24] 33430782  [JOB 10211226] ingested L0:21818678 (1.8 K), L0:21818683 (1.2 K), L0:21818679 (1.6 K), L0:21818680 (1.1 K), L0:21818681 (1.1 K), L0:21818682 (160 M)
     119             :         // current:
     120             :         //   I220228 16:01:22.487906 18476248525 3@vendor/github.com/cockroachdb/pebble/ingest.go:637 ⋮ [n24,pebble,s24] 33430782  [JOB 10211226] ingested L0:21818678 (1.8KB), L0:21818683 (1.2KB), L0:21818679 (1.6KB), L0:21818680 (1.1KB), L0:21818681 (1.1KB), L0:21818682 (160MB)
     121             :         //
     122             :         ingestedPattern = regexp.MustCompile(
     123             :                 `^.*` +
     124             :                         /* Job ID           */ `\[JOB (?P<job>\d+)]\s` +
     125             :                         /* ingested */ `ingested\s`)
     126             :         ingestedPatternJobIdx = ingestedPattern.SubexpIndex("job")
     127             :         ingestedFilePattern   = regexp.MustCompile(
     128             :                 `L` +
     129             :                         /* Level       */ `(?P<level>\d):` +
     130             :                         /* File number */ `(?P<file>\d+)\s` +
     131             :                         /* Bytes       */ `\((?P<bytes>[0-9.]+( [BKMGTPE]|[KMGTPE]?B))\)`)
     132             :         ingestedFilePatternLevelIdx = ingestedFilePattern.SubexpIndex("level")
     133             :         ingestedFilePatternFileIdx  = ingestedFilePattern.SubexpIndex("file")
     134             :         ingestedFilePatternBytesIdx = ingestedFilePattern.SubexpIndex("bytes")
     135             : 
     136             :         // flushable ingestions
     137             :         //
     138             :         // I230831 04:13:28.824280 3780 3@pebble/event.go:685 ⋮ [n10,s10,pebble] 365  [JOB 226] flushed 6 ingested flushables L0:024334 (1.5KB) + L0:024339 (1.0KB) + L0:024335 (1.9KB) + L0:024336 (1.1KB) + L0:024337 (1.1KB) + L0:024338 (12KB) in 0.0s (0.0s total), output rate 67MB/s
     139             :         flushableIngestedPattern = regexp.MustCompile(
     140             :                 `^.*` +
     141             :                         /* Job ID           */ `\[JOB (?P<job>\d+)]\s` +
     142             :                         /* match ingested flushable */ `flushed \d ingested flushable`)
     143             :         flushableIngestedPatternJobIdx = flushableIngestedPattern.SubexpIndex("job")
     144             : 
     145             :         // Example read-amp log line:
     146             :         // 23.1 and older:
     147             :         //   total     31766   188 G       -   257 G   187 G    48 K   3.6 G     744   536 G    49 K   278 G       5     2.1
     148             :         // current:
     149             :         //   total |     1   639B     0B |     - |   84B |     0     0B |     0     0B |     3  1.9KB | 1.2KB |   1 23.7
     150             :         readAmpPattern = regexp.MustCompile(
     151             :                 /* Read amp */ `(?:^|\+)(?:\s{2}total|total \|).*?\s(?P<value>\d+)\s.{4,7}$`,
     152             :         )
     153             :         readAmpPatternValueIdx = readAmpPattern.SubexpIndex("value")
     154             : )
     155             : 
     156             : const (
     157             :         // timeFmt matches the Cockroach log timestamp format.
     158             :         // See: https://github.com/cockroachdb/cockroach/blob/master/pkg/util/log/format_crdb_v2.go
     159             :         timeFmt = "060102 15:04:05.000000"
     160             : 
     161             :         // timeFmtSlim is similar to timeFmt, except that it strips components with a
     162             :         // lower granularity than a minute.
     163             :         timeFmtSlim = "060102 15:04"
     164             : 
     165             :         // timeFmtHrMinSec prints only the hour, minute and second of the time.
     166             :         timeFmtHrMinSec = "15:04:05"
     167             : )
     168             : 
     169             : // compactionType is the type of compaction. It tracks the types in
     170             : // compaction.go. We copy the values here to avoid exporting the types in
     171             : // compaction.go.
     172             : type compactionType uint8
     173             : 
     174             : const (
     175             :         compactionTypeDefault compactionType = iota
     176             :         compactionTypeFlush
     177             :         compactionTypeMove
     178             :         compactionTypeDeleteOnly
     179             :         compactionTypeElisionOnly
     180             :         compactionTypeRead
     181             : )
     182             : 
     183             : // String implements fmt.Stringer.
     184           1 : func (c compactionType) String() string {
     185           1 :         switch c {
     186           1 :         case compactionTypeDefault:
     187           1 :                 return "default"
     188           0 :         case compactionTypeMove:
     189           0 :                 return "move"
     190           0 :         case compactionTypeDeleteOnly:
     191           0 :                 return "delete-only"
     192           0 :         case compactionTypeElisionOnly:
     193           0 :                 return "elision-only"
     194           0 :         case compactionTypeRead:
     195           0 :                 return "read"
     196           0 :         default:
     197           0 :                 panic(errors.Newf("unknown compaction type: %s", c))
     198             :         }
     199             : }
     200             : 
     201             : // parseCompactionType parses the given compaction type string and returns a
     202             : // compactionType.
     203           1 : func parseCompactionType(s string) (t compactionType, err error) {
     204           1 :         switch s {
     205           1 :         case "default":
     206           1 :                 t = compactionTypeDefault
     207           1 :         case "move":
     208           1 :                 t = compactionTypeMove
     209           1 :         case "delete-only":
     210           1 :                 t = compactionTypeDeleteOnly
     211           0 :         case "elision-only":
     212           0 :                 t = compactionTypeElisionOnly
     213           0 :         case "read":
     214           0 :                 t = compactionTypeRead
     215           0 :         default:
     216           0 :                 err = errors.Newf("unknown compaction type: %s", s)
     217             :         }
     218           1 :         return
     219             : }
     220             : 
     221             : // compactionStart is a compaction start event.
     222             : type compactionStart struct {
     223             :         ctx        logContext
     224             :         jobID      int
     225             :         cType      compactionType
     226             :         fromLevel  int
     227             :         toLevel    int
     228             :         inputBytes uint64
     229             : }
     230             : 
     231             : // parseCompactionStart converts the given regular expression sub-matches for a
     232             : // compaction start log line into a compactionStart event.
     233           1 : func parseCompactionStart(matches []string) (compactionStart, error) {
     234           1 :         var start compactionStart
     235           1 : 
     236           1 :         // Parse job ID.
     237           1 :         jobID, err := strconv.Atoi(matches[compactionPatternJobIdx])
     238           1 :         if err != nil {
     239           0 :                 return start, errors.Newf("could not parse jobID: %s", err)
     240           0 :         }
     241             : 
     242             :         // Parse compaction type.
     243           1 :         cType, err := parseCompactionType(matches[compactionPatternTypeIdx])
     244           1 :         if err != nil {
     245           0 :                 return start, err
     246           0 :         }
     247             : 
     248             :         // Parse input bytes.
     249           1 :         inputBytes, err := sumInputBytes(matches[compactionPatternLevels])
     250           1 :         if err != nil {
     251           0 :                 return start, errors.Newf("could not sum input bytes: %s", err)
     252           0 :         }
     253             : 
     254             :         // Parse from-level.
     255           1 :         from, err := strconv.Atoi(matches[compactionPatternFromIdx])
     256           1 :         if err != nil {
     257           0 :                 return start, errors.Newf("could not parse from-level: %s", err)
     258           0 :         }
     259             : 
     260             :         // Parse to-level. For deletion and elision compactions, set the same level.
     261           1 :         to := from
     262           1 :         if cType != compactionTypeElisionOnly && cType != compactionTypeDeleteOnly {
     263           1 :                 to, err = strconv.Atoi(matches[compactionPatternToIdx])
     264           1 :                 if err != nil {
     265           0 :                         return start, errors.Newf("could not parse to-level: %s", err)
     266           0 :                 }
     267             :         }
     268             : 
     269           1 :         start = compactionStart{
     270           1 :                 jobID:      jobID,
     271           1 :                 cType:      cType,
     272           1 :                 fromLevel:  from,
     273           1 :                 toLevel:    to,
     274           1 :                 inputBytes: inputBytes,
     275           1 :         }
     276           1 : 
     277           1 :         return start, nil
     278             : }
     279             : 
     280             : // compactionEnd is a compaction end event.
     281             : type compactionEnd struct {
     282             :         jobID        int
     283             :         writtenBytes uint64
     284             :         // TODO(jackson): Parse and include the aggregate size of input
     285             :         // sstables. It may be instructive, because compactions that drop
     286             :         // keys write less data than they remove from the input level.
     287             : }
     288             : 
     289             : // parseCompactionEnd converts the given regular expression sub-matches for a
     290             : // compaction end log line into a compactionEnd event.
     291           1 : func parseCompactionEnd(matches []string) (compactionEnd, error) {
     292           1 :         var end compactionEnd
     293           1 : 
     294           1 :         // Parse job ID.
     295           1 :         jobID, err := strconv.Atoi(matches[compactionPatternJobIdx])
     296           1 :         if err != nil {
     297           0 :                 return end, errors.Newf("could not parse jobID: %s", err)
     298           0 :         }
     299           1 :         end = compactionEnd{jobID: jobID}
     300           1 : 
     301           1 :         // Optionally, if we have compacted bytes.
     302           1 :         if matches[compactionPatternBytesIdx] != "" {
     303           1 :                 end.writtenBytes = unHumanize(matches[compactionPatternBytesIdx])
     304           1 :         }
     305             : 
     306           1 :         return end, nil
     307             : }
     308             : 
     309             : // parseFlushStart converts the given regular expression sub-matches for a
     310             : // memtable flush start log line into a compactionStart event.
     311           1 : func parseFlushStart(matches []string) (compactionStart, error) {
     312           1 :         var start compactionStart
     313           1 :         // Parse job ID.
     314           1 :         jobID, err := strconv.Atoi(matches[flushPatternJobIdx])
     315           1 :         if err != nil {
     316           0 :                 return start, errors.Newf("could not parse jobID: %s", err)
     317           0 :         }
     318           1 :         c := compactionStart{
     319           1 :                 jobID:     jobID,
     320           1 :                 cType:     compactionTypeFlush,
     321           1 :                 fromLevel: -1,
     322           1 :                 toLevel:   0,
     323           1 :         }
     324           1 :         return c, nil
     325             : }
     326             : 
     327             : // parseFlushEnd converts the given regular expression sub-matches for a
     328             : // memtable flush end log line into a compactionEnd event.
     329           1 : func parseFlushEnd(matches []string) (compactionEnd, error) {
     330           1 :         var end compactionEnd
     331           1 : 
     332           1 :         // Parse job ID.
     333           1 :         jobID, err := strconv.Atoi(matches[flushPatternJobIdx])
     334           1 :         if err != nil {
     335           0 :                 return end, errors.Newf("could not parse jobID: %s", err)
     336           0 :         }
     337           1 :         end = compactionEnd{jobID: jobID}
     338           1 : 
     339           1 :         // Optionally, if we have flushed bytes.
     340           1 :         if matches[flushPatternBytesIdx] != "" {
     341           1 :                 end.writtenBytes = unHumanize(matches[flushPatternBytesIdx])
     342           1 :         }
     343             : 
     344           1 :         return end, nil
     345             : }
     346             : 
     347             : // event describes an aggregated event (eg, start and end events
     348             : // combined if necessary).
     349             : type event struct {
     350             :         nodeID     int
     351             :         storeID    int
     352             :         jobID      int
     353             :         timeStart  time.Time
     354             :         timeEnd    time.Time
     355             :         compaction *compaction
     356             :         ingest     *ingest
     357             : }
     358             : 
     359             : // compaction represents an aggregated compaction event (i.e. the combination of
     360             : // a start and end event).
     361             : type compaction struct {
     362             :         cType       compactionType
     363             :         fromLevel   int
     364             :         toLevel     int
     365             :         inputBytes  uint64
     366             :         outputBytes uint64
     367             : }
     368             : 
     369             : // ingest describes the completion of an ingest.
     370             : type ingest struct {
     371             :         files []ingestedFile
     372             : }
     373             : 
     374             : type ingestedFile struct {
     375             :         level     int
     376             :         fileNum   int
     377             :         sizeBytes uint64
     378             : }
     379             : 
     380             : // readAmp represents a read-amp event.
     381             : type readAmp struct {
     382             :         ctx     logContext
     383             :         readAmp int
     384             : }
     385             : 
     386             : type nodeStoreJob struct {
     387             :         node, store, job int
     388             : }
     389             : 
     390           0 : func (n nodeStoreJob) String() string {
     391           0 :         return fmt.Sprintf("(node=%d,store=%d,job=%d)", n.node, n.store, n.job)
     392           0 : }
     393             : 
     394             : type errorEvent struct {
     395             :         path string
     396             :         line string
     397             :         err  error
     398             : }
     399             : 
     400             : // logEventCollector keeps track of open compaction events and read-amp events
     401             : // over the course of parsing log line events. Completed compaction events are
     402             : // added to the collector once a matching start and end pair are encountered.
     403             : // Read-amp events are added as they are encountered (the have no start / end
     404             : // concept).
     405             : type logEventCollector struct {
     406             :         ctx      logContext
     407             :         m        map[nodeStoreJob]compactionStart
     408             :         events   []event
     409             :         readAmps []readAmp
     410             :         errors   []errorEvent
     411             : }
     412             : 
     413             : // newEventCollector instantiates a new logEventCollector.
     414           1 : func newEventCollector() *logEventCollector {
     415           1 :         return &logEventCollector{
     416           1 :                 m: make(map[nodeStoreJob]compactionStart),
     417           1 :         }
     418           1 : }
     419             : 
     420             : // addError records an error encountered during log parsing.
     421           0 : func (c *logEventCollector) addError(path, line string, err error) {
     422           0 :         c.errors = append(c.errors, errorEvent{path: path, line: line, err: err})
     423           0 : }
     424             : 
     425             : // addCompactionStart adds a new compactionStart to the collector. The event is
     426             : // tracked by its job ID.
     427           1 : func (c *logEventCollector) addCompactionStart(start compactionStart) error {
     428           1 :         key := nodeStoreJob{c.ctx.node, c.ctx.store, start.jobID}
     429           1 :         if _, ok := c.m[key]; ok {
     430           0 :                 return errors.Newf("start event already seen for %s", key)
     431           0 :         }
     432           1 :         start.ctx = c.ctx
     433           1 :         c.m[key] = start
     434           1 :         return nil
     435             : }
     436             : 
     437             : // addCompactionEnd completes the compaction event for the given compactionEnd.
     438           1 : func (c *logEventCollector) addCompactionEnd(end compactionEnd) {
     439           1 :         key := nodeStoreJob{c.ctx.node, c.ctx.store, end.jobID}
     440           1 :         start, ok := c.m[key]
     441           1 :         if !ok {
     442           0 :                 _, _ = fmt.Fprintf(
     443           0 :                         os.Stderr,
     444           0 :                         "compaction end event missing start event for %s; skipping\n", key,
     445           0 :                 )
     446           0 :                 return
     447           0 :         }
     448             : 
     449             :         // Remove the job from the collector once it has been matched.
     450           1 :         delete(c.m, key)
     451           1 : 
     452           1 :         c.events = append(c.events, event{
     453           1 :                 nodeID:    start.ctx.node,
     454           1 :                 storeID:   start.ctx.store,
     455           1 :                 jobID:     start.jobID,
     456           1 :                 timeStart: start.ctx.timestamp,
     457           1 :                 timeEnd:   c.ctx.timestamp,
     458           1 :                 compaction: &compaction{
     459           1 :                         cType:       start.cType,
     460           1 :                         fromLevel:   start.fromLevel,
     461           1 :                         toLevel:     start.toLevel,
     462           1 :                         inputBytes:  start.inputBytes,
     463           1 :                         outputBytes: end.writtenBytes,
     464           1 :                 },
     465           1 :         })
     466             : }
     467             : 
     468             : // addReadAmp adds the readAmp event to the collector.
     469           1 : func (c *logEventCollector) addReadAmp(ra readAmp) {
     470           1 :         ra.ctx = c.ctx
     471           1 :         c.readAmps = append(c.readAmps, ra)
     472           1 : }
     473             : 
     474             : // logContext captures the metadata of log lines.
     475             : type logContext struct {
     476             :         timestamp   time.Time
     477             :         node, store int
     478             : }
     479             : 
     480             : // saveContext saves the given logContext in the collector.
     481           1 : func (c *logEventCollector) saveContext(ctx logContext) {
     482           1 :         c.ctx = ctx
     483           1 : }
     484             : 
     485             : // level is a level in the LSM. The WAL is level -1.
     486             : type level int
     487             : 
     488             : // String implements fmt.Stringer.
     489           1 : func (l level) String() string {
     490           1 :         if l == -1 {
     491           0 :                 return "WAL"
     492           0 :         }
     493           1 :         return "L" + strconv.Itoa(int(l))
     494             : }
     495             : 
     496             : // fromTo is a map key for (from, to) level tuples.
     497             : type fromTo struct {
     498             :         from, to level
     499             : }
     500             : 
     501             : // compactionTypeCount is a mapping from compaction type to count.
     502             : type compactionTypeCount map[compactionType]int
     503             : 
     504             : // windowSummary summarizes events in a window of time between a start and end
     505             : // time. The window tracks:
     506             : // - for each compaction type: counts, total bytes compacted, and total duration.
     507             : // - total ingested bytes for each level
     508             : // - read amp magnitudes
     509             : type windowSummary struct {
     510             :         nodeID, storeID      int
     511             :         tStart, tEnd         time.Time
     512             :         eventCount           int
     513             :         flushedCount         int
     514             :         flushedBytes         uint64
     515             :         flushedTime          time.Duration
     516             :         compactionCounts     map[fromTo]compactionTypeCount
     517             :         compactionBytesIn    map[fromTo]uint64
     518             :         compactionBytesOut   map[fromTo]uint64
     519             :         compactionBytesMoved map[fromTo]uint64
     520             :         compactionBytesDel   map[fromTo]uint64
     521             :         compactionTime       map[fromTo]time.Duration
     522             :         ingestedCount        [numLevels]int
     523             :         ingestedBytes        [numLevels]uint64
     524             :         readAmps             []readAmp
     525             :         longRunning          []event
     526             : }
     527             : 
     528             : // String implements fmt.Stringer, returning a formatted window summary.
     529           1 : func (s windowSummary) String() string {
     530           1 :         type fromToCount struct {
     531           1 :                 ft         fromTo
     532           1 :                 counts     compactionTypeCount
     533           1 :                 bytesIn    uint64
     534           1 :                 bytesOut   uint64
     535           1 :                 bytesMoved uint64
     536           1 :                 bytesDel   uint64
     537           1 :                 duration   time.Duration
     538           1 :         }
     539           1 :         var pairs []fromToCount
     540           1 :         for k, v := range s.compactionCounts {
     541           1 :                 pairs = append(pairs, fromToCount{
     542           1 :                         ft:         k,
     543           1 :                         counts:     v,
     544           1 :                         bytesIn:    s.compactionBytesIn[k],
     545           1 :                         bytesOut:   s.compactionBytesOut[k],
     546           1 :                         bytesMoved: s.compactionBytesMoved[k],
     547           1 :                         bytesDel:   s.compactionBytesDel[k],
     548           1 :                         duration:   s.compactionTime[k],
     549           1 :                 })
     550           1 :         }
     551           1 :         slices.SortFunc(pairs, func(l, r fromToCount) int {
     552           1 :                 if v := cmp.Compare(l.ft.from, r.ft.from); v != 0 {
     553           1 :                         return v
     554           1 :                 }
     555           0 :                 return cmp.Compare(l.ft.to, r.ft.to)
     556             :         })
     557             : 
     558           1 :         nodeID, storeID := "?", "?"
     559           1 :         if s.nodeID != -1 {
     560           1 :                 nodeID = strconv.Itoa(s.nodeID)
     561           1 :         }
     562           1 :         if s.storeID != -1 {
     563           1 :                 storeID = strconv.Itoa(s.storeID)
     564           1 :         }
     565             : 
     566           1 :         var sb strings.Builder
     567           1 :         sb.WriteString(fmt.Sprintf("node: %s, store: %s\n", nodeID, storeID))
     568           1 :         sb.WriteString(fmt.Sprintf("   from: %s\n", s.tStart.Format(timeFmtSlim)))
     569           1 :         sb.WriteString(fmt.Sprintf("     to: %s\n", s.tEnd.Format(timeFmtSlim)))
     570           1 :         var count, sum int
     571           1 :         for _, ra := range s.readAmps {
     572           1 :                 count++
     573           1 :                 sum += ra.readAmp
     574           1 :         }
     575           1 :         sb.WriteString(fmt.Sprintf("  r-amp: %.1f\n", float64(sum)/float64(count)))
     576           1 : 
     577           1 :         // Print flush+ingest statistics.
     578           1 :         {
     579           1 :                 var headerWritten bool
     580           1 :                 maybeWriteHeader := func() {
     581           1 :                         if !headerWritten {
     582           1 :                                 sb.WriteString("_kind______from______to_____________________________________count___bytes______time\n")
     583           1 :                                 headerWritten = true
     584           1 :                         }
     585             :                 }
     586             : 
     587           1 :                 if s.flushedCount > 0 {
     588           1 :                         maybeWriteHeader()
     589           1 :                         fmt.Fprintf(&sb, "%-7s         %7s                                   %7d %7s %9s\n",
     590           1 :                                 "flush", "L0", s.flushedCount, humanize.Bytes.Uint64(s.flushedBytes),
     591           1 :                                 s.flushedTime.Truncate(time.Second))
     592           1 :                 }
     593             : 
     594           1 :                 count := s.flushedCount
     595           1 :                 sum := s.flushedBytes
     596           1 :                 totalTime := s.flushedTime
     597           1 :                 for l := 0; l < len(s.ingestedBytes); l++ {
     598           1 :                         if s.ingestedCount[l] == 0 {
     599           1 :                                 continue
     600             :                         }
     601           1 :                         maybeWriteHeader()
     602           1 :                         fmt.Fprintf(&sb, "%-7s         %7s                                   %7d %7s\n",
     603           1 :                                 "ingest", fmt.Sprintf("L%d", l), s.ingestedCount[l], humanize.Bytes.Uint64(s.ingestedBytes[l]))
     604           1 :                         count += s.ingestedCount[l]
     605           1 :                         sum += s.ingestedBytes[l]
     606             :                 }
     607           1 :                 if headerWritten {
     608           1 :                         fmt.Fprintf(&sb, "total                                                     %7d %7s %9s\n",
     609           1 :                                 count, humanize.Bytes.Uint64(sum), totalTime.Truncate(time.Second),
     610           1 :                         )
     611           1 :                 }
     612             :         }
     613             : 
     614             :         // Print compactions statistics.
     615           1 :         if len(s.compactionCounts) > 0 {
     616           1 :                 sb.WriteString("_kind______from______to___default____move___elide__delete___count___in(B)__out(B)__mov(B)__del(B)______time\n")
     617           1 :                 var totalDef, totalMove, totalElision, totalDel int
     618           1 :                 var totalBytesIn, totalBytesOut, totalBytesMoved, totalBytesDel uint64
     619           1 :                 var totalTime time.Duration
     620           1 :                 for _, p := range pairs {
     621           1 :                         def := p.counts[compactionTypeDefault]
     622           1 :                         move := p.counts[compactionTypeMove]
     623           1 :                         elision := p.counts[compactionTypeElisionOnly]
     624           1 :                         del := p.counts[compactionTypeDeleteOnly]
     625           1 :                         total := def + move + elision + del
     626           1 : 
     627           1 :                         str := fmt.Sprintf("%-7s %7s %7s   %7d %7d %7d %7d %7d %7s %7s %7s %7s %9s\n",
     628           1 :                                 "compact", p.ft.from, p.ft.to, def, move, elision, del, total,
     629           1 :                                 humanize.Bytes.Uint64(p.bytesIn), humanize.Bytes.Uint64(p.bytesOut),
     630           1 :                                 humanize.Bytes.Uint64(p.bytesMoved), humanize.Bytes.Uint64(p.bytesDel),
     631           1 :                                 p.duration.Truncate(time.Second))
     632           1 :                         sb.WriteString(str)
     633           1 : 
     634           1 :                         totalDef += def
     635           1 :                         totalMove += move
     636           1 :                         totalElision += elision
     637           1 :                         totalDel += del
     638           1 :                         totalBytesIn += p.bytesIn
     639           1 :                         totalBytesOut += p.bytesOut
     640           1 :                         totalBytesMoved += p.bytesMoved
     641           1 :                         totalBytesDel += p.bytesDel
     642           1 :                         totalTime += p.duration
     643           1 :                 }
     644           1 :                 sb.WriteString(fmt.Sprintf("total         %19d %7d %7d %7d %7d %7s %7s %7s %7s %9s\n",
     645           1 :                         totalDef, totalMove, totalElision, totalDel, s.eventCount,
     646           1 :                         humanize.Bytes.Uint64(totalBytesIn), humanize.Bytes.Uint64(totalBytesOut),
     647           1 :                         humanize.Bytes.Uint64(totalBytesMoved), humanize.Bytes.Uint64(totalBytesDel),
     648           1 :                         totalTime.Truncate(time.Second)))
     649             :         }
     650             : 
     651             :         // (Optional) Long running events.
     652           1 :         if len(s.longRunning) > 0 {
     653           1 :                 sb.WriteString("long-running events (descending runtime):\n")
     654           1 :                 sb.WriteString("_kind________from________to_______job______type_____start_______end____dur(s)_____bytes:\n")
     655           1 :                 for _, e := range s.longRunning {
     656           1 :                         c := e.compaction
     657           1 :                         kind := "compact"
     658           1 :                         if c.fromLevel == -1 {
     659           0 :                                 kind = "flush"
     660           0 :                         }
     661           1 :                         sb.WriteString(fmt.Sprintf("%-7s %9s %9s %9d %9s %9s %9s %9.0f %9s\n",
     662           1 :                                 kind, level(c.fromLevel), level(c.toLevel), e.jobID, c.cType,
     663           1 :                                 e.timeStart.Format(timeFmtHrMinSec), e.timeEnd.Format(timeFmtHrMinSec),
     664           1 :                                 e.timeEnd.Sub(e.timeStart).Seconds(), humanize.Bytes.Uint64(c.outputBytes)))
     665             :                 }
     666             :         }
     667             : 
     668           1 :         return sb.String()
     669             : }
     670             : 
     671             : // windowSummarySlice is a slice of windowSummary that sorts in order of start
     672             : // time, node, then store.
     673             : type windowsSummarySlice []windowSummary
     674             : 
     675           1 : func (s windowsSummarySlice) Len() int {
     676           1 :         return len(s)
     677           1 : }
     678             : 
     679           1 : func (s windowsSummarySlice) Less(i, j int) bool {
     680           1 :         if !s[i].tStart.Equal(s[j].tStart) {
     681           1 :                 return s[i].tStart.Before(s[j].tStart)
     682           1 :         }
     683           1 :         if s[i].nodeID != s[j].nodeID {
     684           1 :                 return s[i].nodeID < s[j].nodeID
     685           1 :         }
     686           1 :         return s[i].storeID < s[j].storeID
     687             : }
     688             : 
     689           1 : func (s windowsSummarySlice) Swap(i, j int) {
     690           1 :         s[i], s[j] = s[j], s[i]
     691           1 : }
     692             : 
     693             : // eventSlice is a slice of events that sorts in order of node, store,
     694             : // then event start time.
     695             : type eventSlice []event
     696             : 
     697           1 : func (s eventSlice) Len() int {
     698           1 :         return len(s)
     699           1 : }
     700             : 
     701           1 : func (s eventSlice) Less(i, j int) bool {
     702           1 :         if s[i].nodeID != s[j].nodeID {
     703           1 :                 return s[i].nodeID < s[j].nodeID
     704           1 :         }
     705           1 :         if s[i].storeID != s[j].storeID {
     706           1 :                 return s[i].storeID < s[j].storeID
     707           1 :         }
     708           1 :         return s[i].timeStart.Before(s[j].timeStart)
     709             : }
     710             : 
     711           1 : func (s eventSlice) Swap(i, j int) {
     712           1 :         s[i], s[j] = s[j], s[i]
     713           1 : }
     714             : 
     715             : // readAmpSlice is a slice of readAmp events that sorts in order of node, store,
     716             : // then read amp event start time.
     717             : type readAmpSlice []readAmp
     718             : 
     719           1 : func (r readAmpSlice) Len() int {
     720           1 :         return len(r)
     721           1 : }
     722             : 
     723           1 : func (r readAmpSlice) Less(i, j int) bool {
     724           1 :         // Sort by node, store, then read-amp.
     725           1 :         if r[i].ctx.node != r[j].ctx.node {
     726           0 :                 return r[i].ctx.node < r[j].ctx.node
     727           0 :         }
     728           1 :         if r[i].ctx.store != r[j].ctx.store {
     729           0 :                 return r[i].ctx.store < r[j].ctx.store
     730           0 :         }
     731           1 :         return r[i].ctx.timestamp.Before(r[j].ctx.timestamp)
     732             : }
     733             : 
     734           0 : func (r readAmpSlice) Swap(i, j int) {
     735           0 :         r[i], r[j] = r[j], r[i]
     736           0 : }
     737             : 
     738             : // aggregator combines compaction and read-amp events within windows of fixed
     739             : // duration and returns one aggregated windowSummary struct per window.
     740             : type aggregator struct {
     741             :         window           time.Duration
     742             :         events           []event
     743             :         readAmps         []readAmp
     744             :         longRunningLimit time.Duration
     745             : }
     746             : 
     747             : // newAggregator returns a new aggregator.
     748             : func newAggregator(
     749             :         window, longRunningLimit time.Duration, events []event, readAmps []readAmp,
     750           1 : ) *aggregator {
     751           1 :         return &aggregator{
     752           1 :                 window:           window,
     753           1 :                 events:           events,
     754           1 :                 readAmps:         readAmps,
     755           1 :                 longRunningLimit: longRunningLimit,
     756           1 :         }
     757           1 : }
     758             : 
     759             : // aggregate aggregates the events into windows, returning the windowSummary for
     760             : // each interval.
     761           1 : func (a *aggregator) aggregate() []windowSummary {
     762           1 :         if len(a.events) == 0 {
     763           0 :                 return nil
     764           0 :         }
     765             : 
     766             :         // Sort the event and read-amp slices by start time.
     767           1 :         sort.Sort(eventSlice(a.events))
     768           1 :         sort.Sort(readAmpSlice(a.readAmps))
     769           1 : 
     770           1 :         initWindow := func(e event) *windowSummary {
     771           1 :                 start := e.timeStart.Truncate(a.window)
     772           1 :                 return &windowSummary{
     773           1 :                         nodeID:               e.nodeID,
     774           1 :                         storeID:              e.storeID,
     775           1 :                         tStart:               start,
     776           1 :                         tEnd:                 start.Add(a.window),
     777           1 :                         compactionCounts:     make(map[fromTo]compactionTypeCount),
     778           1 :                         compactionBytesIn:    make(map[fromTo]uint64),
     779           1 :                         compactionBytesOut:   make(map[fromTo]uint64),
     780           1 :                         compactionBytesMoved: make(map[fromTo]uint64),
     781           1 :                         compactionBytesDel:   make(map[fromTo]uint64),
     782           1 :                         compactionTime:       make(map[fromTo]time.Duration),
     783           1 :                 }
     784           1 :         }
     785             : 
     786           1 :         var windows []windowSummary
     787           1 :         var j int // index for read-amps
     788           1 :         finishWindow := func(cur *windowSummary) {
     789           1 :                 // Collect read-amp values for the previous window.
     790           1 :                 var readAmps []readAmp
     791           1 :                 for j < len(a.readAmps) {
     792           1 :                         ra := a.readAmps[j]
     793           1 : 
     794           1 :                         // Skip values before the current window.
     795           1 :                         if ra.ctx.node < cur.nodeID ||
     796           1 :                                 ra.ctx.store < cur.storeID ||
     797           1 :                                 ra.ctx.timestamp.Before(cur.tStart) {
     798           0 :                                 j++
     799           0 :                                 continue
     800             :                         }
     801             : 
     802             :                         // We've passed over the current window. Stop.
     803           1 :                         if ra.ctx.node > cur.nodeID ||
     804           1 :                                 ra.ctx.store > cur.storeID ||
     805           1 :                                 ra.ctx.timestamp.After(cur.tEnd) {
     806           1 :                                 break
     807             :                         }
     808             : 
     809             :                         // Collect this read-amp value.
     810           1 :                         readAmps = append(readAmps, ra)
     811           1 :                         j++
     812             :                 }
     813           1 :                 cur.readAmps = readAmps
     814           1 : 
     815           1 :                 // Sort long running compactions in descending order of duration.
     816           1 :                 slices.SortFunc(cur.longRunning, func(l, r event) int {
     817           0 :                         return cmp.Compare(l.timeEnd.Sub(l.timeStart), r.timeEnd.Sub(r.timeStart))
     818           0 :                 })
     819             : 
     820             :                 // Add the completed window to the set of windows.
     821           1 :                 windows = append(windows, *cur)
     822             :         }
     823             : 
     824             :         // Move through the compactions, collecting relevant compactions into the same
     825             :         // window. Windows have the same node and store, and a compaction start time
     826             :         // within a given range.
     827           1 :         i := 0
     828           1 :         curWindow := initWindow(a.events[i])
     829           1 :         for ; ; i++ {
     830           1 :                 // No more windows. Complete the current window.
     831           1 :                 if i == len(a.events) {
     832           1 :                         finishWindow(curWindow)
     833           1 :                         break
     834             :                 }
     835           1 :                 e := a.events[i]
     836           1 : 
     837           1 :                 // If we're at the start of a new interval, finalize the current window and
     838           1 :                 // start a new one.
     839           1 :                 if curWindow.nodeID != e.nodeID ||
     840           1 :                         curWindow.storeID != e.storeID ||
     841           1 :                         e.timeStart.After(curWindow.tEnd) {
     842           1 :                         finishWindow(curWindow)
     843           1 :                         curWindow = initWindow(e)
     844           1 :                 }
     845             : 
     846           1 :                 switch {
     847           1 :                 case e.ingest != nil:
     848           1 :                         // Update ingest stats.
     849           1 :                         for _, f := range e.ingest.files {
     850           1 :                                 curWindow.ingestedCount[f.level]++
     851           1 :                                 curWindow.ingestedBytes[f.level] += f.sizeBytes
     852           1 :                         }
     853           1 :                 case e.compaction != nil && e.compaction.cType == compactionTypeFlush:
     854           1 :                         // Update flush stats.
     855           1 :                         f := e.compaction
     856           1 :                         curWindow.flushedCount++
     857           1 :                         curWindow.flushedBytes += f.outputBytes
     858           1 :                         curWindow.flushedTime += e.timeEnd.Sub(e.timeStart)
     859           1 :                 case e.compaction != nil:
     860           1 :                         // Update compaction stats.
     861           1 :                         c := e.compaction
     862           1 :                         // Update compaction counts.
     863           1 :                         ft := fromTo{level(c.fromLevel), level(c.toLevel)}
     864           1 :                         m, ok := curWindow.compactionCounts[ft]
     865           1 :                         if !ok {
     866           1 :                                 m = make(compactionTypeCount)
     867           1 :                                 curWindow.compactionCounts[ft] = m
     868           1 :                         }
     869           1 :                         m[c.cType]++
     870           1 :                         curWindow.eventCount++
     871           1 : 
     872           1 :                         // Update compacted bytes in / out / moved / deleted.
     873           1 :                         switch c.cType {
     874           1 :                         case compactionTypeMove:
     875           1 :                                 curWindow.compactionBytesMoved[ft] += c.inputBytes
     876           1 :                         case compactionTypeDeleteOnly:
     877           1 :                                 curWindow.compactionBytesDel[ft] += c.inputBytes
     878           1 :                         default:
     879           1 :                                 curWindow.compactionBytesIn[ft] += c.inputBytes
     880           1 :                                 curWindow.compactionBytesOut[ft] += c.outputBytes
     881             :                         }
     882             : 
     883             :                         // Update compaction time.
     884           1 :                         _, ok = curWindow.compactionTime[ft]
     885           1 :                         if !ok {
     886           1 :                                 curWindow.compactionTime[ft] = 0
     887           1 :                         }
     888           1 :                         curWindow.compactionTime[ft] += e.timeEnd.Sub(e.timeStart)
     889             : 
     890             :                 }
     891             :                 // Add "long-running" events. Those that start in this window
     892             :                 // that have duration longer than the window interval.
     893           1 :                 if e.timeEnd.Sub(e.timeStart) > a.longRunningLimit {
     894           1 :                         curWindow.longRunning = append(curWindow.longRunning, e)
     895           1 :                 }
     896             :         }
     897             : 
     898             :         // Windows are added in order of (node, store, time). Re-sort the windows by
     899             :         // (time, node, store) for better presentation.
     900           1 :         sort.Sort(windowsSummarySlice(windows))
     901           1 : 
     902           1 :         return windows
     903             : }
     904             : 
     905             : // parseLog parses the log file with the given path, using the given parse
     906             : // function to collect events in the given logEventCollector. parseLog
     907             : // returns a non-nil error if an I/O error was encountered while reading
     908             : // the log file. Parsing errors are accumulated in the
     909             : // logEventCollector.
     910           1 : func parseLog(path string, b *logEventCollector) error {
     911           1 :         f, err := os.Open(path)
     912           1 :         if err != nil {
     913           0 :                 return err
     914           0 :         }
     915           1 :         defer f.Close()
     916           1 : 
     917           1 :         s := bufio.NewScanner(f)
     918           1 :         for s.Scan() {
     919           1 :                 line := s.Text()
     920           1 :                 // Store the log context for the current line, if we have one.
     921           1 :                 if err := parseLogContext(line, b); err != nil {
     922           0 :                         return err
     923           0 :                 }
     924             : 
     925             :                 // First check for a flush or compaction.
     926           1 :                 matches := sentinelPattern.FindStringSubmatch(line)
     927           1 :                 if matches != nil {
     928           1 :                         // Determine which regexp to apply by testing the first letter of the prefix.
     929           1 :                         var err error
     930           1 :                         switch matches[sentinelPatternPrefixIdx][0] {
     931           1 :                         case 'c':
     932           1 :                                 err = parseCompaction(line, b)
     933           1 :                         case 'f':
     934           1 :                                 err = parseFlush(line, b)
     935           1 :                         case 'i':
     936           1 :                                 err = parseIngest(line, b)
     937           0 :                         default:
     938           0 :                                 err = errors.Newf("unexpected line: neither compaction nor flush: %s", line)
     939             :                         }
     940           1 :                         if err != nil {
     941           0 :                                 b.addError(path, line, err)
     942           0 :                         }
     943           1 :                         continue
     944             :                 }
     945             : 
     946             :                 // Else check for an LSM debug line.
     947           1 :                 if err = parseReadAmp(line, b); err != nil {
     948           0 :                         b.addError(path, line, err)
     949           0 :                         continue
     950             :                 }
     951             :         }
     952           1 :         return s.Err()
     953             : }
     954             : 
     955             : // parseLogContext extracts contextual information from the log line (e.g. the
     956             : // timestamp, node and store).
     957           1 : func parseLogContext(line string, b *logEventCollector) error {
     958           1 :         matches := logContextPattern.FindStringSubmatch(line)
     959           1 :         if matches == nil {
     960           1 :                 return nil
     961           1 :         }
     962             : 
     963             :         // Parse start time.
     964           1 :         t, err := time.Parse(timeFmt, matches[logContextPatternTimestampIdx])
     965           1 :         if err != nil {
     966           0 :                 return errors.Newf("could not parse timestamp: %s", err)
     967           0 :         }
     968             : 
     969             :         // Parse node and store.
     970           1 :         nodeID, err := strconv.Atoi(matches[logContextPatternNodeIdx])
     971           1 :         if err != nil {
     972           1 :                 if matches[logContextPatternNodeIdx] != "?" {
     973           0 :                         return errors.Newf("could not parse node ID: %s", err)
     974           0 :                 }
     975           1 :                 nodeID = -1
     976             :         }
     977             : 
     978           1 :         storeID, err := strconv.Atoi(matches[logContextPatternStoreIdx])
     979           1 :         if err != nil {
     980           1 :                 if matches[logContextPatternStoreIdx] != "?" {
     981           0 :                         return errors.Newf("could not parse store ID: %s", err)
     982           0 :                 }
     983           1 :                 storeID = -1
     984             :         }
     985             : 
     986           1 :         b.saveContext(logContext{
     987           1 :                 timestamp: t,
     988           1 :                 node:      nodeID,
     989           1 :                 store:     storeID,
     990           1 :         })
     991           1 :         return nil
     992             : }
     993             : 
     994             : // parseCompaction parses and collects Pebble compaction events.
     995           1 : func parseCompaction(line string, b *logEventCollector) error {
     996           1 :         matches := compactionPattern.FindStringSubmatch(line)
     997           1 :         if matches == nil {
     998           0 :                 return nil
     999           0 :         }
    1000             : 
    1001             :         // "compacting": implies start line.
    1002           1 :         if matches[compactionPatternSuffixIdx] == "ing" {
    1003           1 :                 start, err := parseCompactionStart(matches)
    1004           1 :                 if err != nil {
    1005           0 :                         return err
    1006           0 :                 }
    1007           1 :                 if err := b.addCompactionStart(start); err != nil {
    1008           0 :                         return err
    1009           0 :                 }
    1010           1 :                 return nil
    1011             :         }
    1012             : 
    1013             :         // "compacted": implies end line.
    1014           1 :         end, err := parseCompactionEnd(matches)
    1015           1 :         if err != nil {
    1016           0 :                 return err
    1017           0 :         }
    1018             : 
    1019           1 :         b.addCompactionEnd(end)
    1020           1 :         return nil
    1021             : }
    1022             : 
    1023             : // parseFlush parses and collects Pebble memtable flush events.
    1024           1 : func parseFlush(line string, b *logEventCollector) error {
    1025           1 :         matches := flushPattern.FindStringSubmatch(line)
    1026           1 :         if matches == nil {
    1027           0 :                 return nil
    1028           0 :         }
    1029             : 
    1030           1 :         if matches[flushPatternSuffixIdx] == "ing" {
    1031           1 :                 start, err := parseFlushStart(matches)
    1032           1 :                 if err != nil {
    1033           0 :                         return err
    1034           0 :                 }
    1035           1 :                 return b.addCompactionStart(start)
    1036             :         }
    1037             : 
    1038           1 :         end, err := parseFlushEnd(matches)
    1039           1 :         if err != nil {
    1040           0 :                 return err
    1041           0 :         }
    1042             : 
    1043           1 :         b.addCompactionEnd(end)
    1044           1 :         return nil
    1045             : }
    1046             : 
    1047           1 : func parseIngestDuringFlush(line string, b *logEventCollector) error {
    1048           1 :         matches := flushableIngestedPattern.FindStringSubmatch(line)
    1049           1 :         if matches == nil {
    1050           0 :                 return nil
    1051           0 :         }
    1052             :         // Parse job ID.
    1053           1 :         jobID, err := strconv.Atoi(matches[flushableIngestedPatternJobIdx])
    1054           1 :         if err != nil {
    1055           0 :                 return errors.Newf("could not parse jobID: %s", err)
    1056           0 :         }
    1057           1 :         return parseRemainingIngestLogLine(jobID, line, b)
    1058             : }
    1059             : 
    1060             : // parseIngest parses and collects Pebble ingest complete events.
    1061           1 : func parseIngest(line string, b *logEventCollector) error {
    1062           1 :         matches := ingestedPattern.FindStringSubmatch(line)
    1063           1 :         if matches == nil {
    1064           1 :                 // Try and parse the other kind of ingest.
    1065           1 :                 return parseIngestDuringFlush(line, b)
    1066           1 :         }
    1067             :         // Parse job ID.
    1068           1 :         jobID, err := strconv.Atoi(matches[ingestedPatternJobIdx])
    1069           1 :         if err != nil {
    1070           0 :                 return errors.Newf("could not parse jobID: %s", err)
    1071           0 :         }
    1072           1 :         return parseRemainingIngestLogLine(jobID, line, b)
    1073             : }
    1074             : 
    1075             : // parses the level, filenum, and bytes for the files which were ingested.
    1076           1 : func parseRemainingIngestLogLine(jobID int, line string, b *logEventCollector) error {
    1077           1 :         fileMatches := ingestedFilePattern.FindAllStringSubmatch(line, -1)
    1078           1 :         files := make([]ingestedFile, len(fileMatches))
    1079           1 :         for i := range fileMatches {
    1080           1 :                 level, err := strconv.Atoi(fileMatches[i][ingestedFilePatternLevelIdx])
    1081           1 :                 if err != nil {
    1082           0 :                         return errors.Newf("could not parse level: %s", err)
    1083           0 :                 }
    1084           1 :                 fileNum, err := strconv.Atoi(fileMatches[i][ingestedFilePatternFileIdx])
    1085           1 :                 if err != nil {
    1086           0 :                         return errors.Newf("could not parse file number: %s", err)
    1087           0 :                 }
    1088           1 :                 files[i] = ingestedFile{
    1089           1 :                         level:     level,
    1090           1 :                         fileNum:   fileNum,
    1091           1 :                         sizeBytes: unHumanize(fileMatches[i][ingestedFilePatternBytesIdx]),
    1092           1 :                 }
    1093             :         }
    1094           1 :         b.events = append(b.events, event{
    1095           1 :                 nodeID:    b.ctx.node,
    1096           1 :                 storeID:   b.ctx.store,
    1097           1 :                 jobID:     jobID,
    1098           1 :                 timeStart: b.ctx.timestamp,
    1099           1 :                 timeEnd:   b.ctx.timestamp,
    1100           1 :                 ingest: &ingest{
    1101           1 :                         files: files,
    1102           1 :                 },
    1103           1 :         })
    1104           1 :         return nil
    1105             : }
    1106             : 
    1107             : // parseReadAmp attempts to parse the current line as a read amp value
    1108           1 : func parseReadAmp(line string, b *logEventCollector) error {
    1109           1 :         matches := readAmpPattern.FindStringSubmatch(line)
    1110           1 :         if matches == nil {
    1111           1 :                 return nil
    1112           1 :         }
    1113           1 :         val, err := strconv.Atoi(matches[readAmpPatternValueIdx])
    1114           1 :         if err != nil {
    1115           0 :                 return errors.Newf("could not parse read amp: %s", err)
    1116           0 :         }
    1117           1 :         b.addReadAmp(readAmp{
    1118           1 :                 readAmp: val,
    1119           1 :         })
    1120           1 :         return nil
    1121             : }
    1122             : 
    1123             : // runCompactionLogs is runnable function of the top-level cobra.Command that
    1124             : // parses and collects Pebble compaction events and LSM information.
    1125           0 : func runCompactionLogs(cmd *cobra.Command, args []string) error {
    1126           0 :         // The args contain a list of log files to read.
    1127           0 :         files := args
    1128           0 : 
    1129           0 :         // Scan the log files collecting start and end compaction lines.
    1130           0 :         b := newEventCollector()
    1131           0 :         for _, file := range files {
    1132           0 :                 err := parseLog(file, b)
    1133           0 :                 // parseLog returns an error only on I/O errors, which we
    1134           0 :                 // immediately exit with.
    1135           0 :                 if err != nil {
    1136           0 :                         return err
    1137           0 :                 }
    1138             :         }
    1139             : 
    1140           0 :         window, err := cmd.Flags().GetDuration("window")
    1141           0 :         if err != nil {
    1142           0 :                 return err
    1143           0 :         }
    1144             : 
    1145           0 :         longRunningLimit, err := cmd.Flags().GetDuration("long-running-limit")
    1146           0 :         if err != nil {
    1147           0 :                 return err
    1148           0 :         }
    1149           0 :         if longRunningLimit == 0 {
    1150           0 :                 // Off by default. Set to infinite duration.
    1151           0 :                 longRunningLimit = time.Duration(math.MaxInt64)
    1152           0 :         }
    1153             : 
    1154             :         // Aggregate the lines.
    1155           0 :         a := newAggregator(window, longRunningLimit, b.events, b.readAmps)
    1156           0 :         summaries := a.aggregate()
    1157           0 :         for _, s := range summaries {
    1158           0 :                 fmt.Printf("%s\n", s)
    1159           0 :         }
    1160             : 
    1161             :         // After the summaries, print accumulated parsing errors to stderr.
    1162           0 :         for _, e := range b.errors {
    1163           0 :                 fmt.Fprintf(os.Stderr, "-\n%s: %s\nError: %s\n", filepath.Base(e.path), e.line, e.err)
    1164           0 :         }
    1165           0 :         return nil
    1166             : }
    1167             : 
    1168             : // unHumanize performs the opposite of humanize.Bytes.Uint64 (e.g. "10B",
    1169             : // "10MB") or the 23.1 humanize.IEC.Uint64 (e.g. "10 B", "10 M"), converting a
    1170             : // human-readable value into a raw number of bytes.
    1171           1 : func unHumanize(s string) uint64 {
    1172           1 :         if len(s) < 2 || !(s[0] >= '0' && s[0] <= '9') {
    1173           0 :                 panic(errors.Newf("invalid bytes value %q", s))
    1174             :         }
    1175           1 :         if s[len(s)-1] == 'B' {
    1176           1 :                 s = s[:len(s)-1]
    1177           1 :         }
    1178             : 
    1179           1 :         multiplier := uint64(1)
    1180           1 :         switch s[len(s)-1] {
    1181           1 :         case 'K':
    1182           1 :                 multiplier = 1 << 10
    1183           1 :         case 'M':
    1184           1 :                 multiplier = 1 << 20
    1185           0 :         case 'G':
    1186           0 :                 multiplier = 1 << 30
    1187           0 :         case 'T':
    1188           0 :                 multiplier = 1 << 40
    1189           0 :         case 'P':
    1190           0 :                 multiplier = 1 << 50
    1191           0 :         case 'E':
    1192           0 :                 multiplier = 1 << 60
    1193             :         }
    1194           1 :         if multiplier != 1 {
    1195           1 :                 s = s[:len(s)-1]
    1196           1 :         }
    1197           1 :         if s[len(s)-1] == ' ' {
    1198           1 :                 s = s[:len(s)-1]
    1199           1 :         }
    1200           1 :         val, err := strconv.ParseFloat(s, 64)
    1201           1 :         if err != nil {
    1202           0 :                 panic(fmt.Sprintf("parsing %s: %v", s, err))
    1203             :         }
    1204             : 
    1205           1 :         return uint64(val * float64(multiplier))
    1206             : }
    1207             : 
    1208             : // sumInputBytes takes a string as input and returns the sum of the
    1209             : // human-readable sizes, as an integer number of bytes.
    1210           1 : func sumInputBytes(s string) (total uint64, _ error) {
    1211           1 :         var (
    1212           1 :                 open bool
    1213           1 :                 b    bytes.Buffer
    1214           1 :         )
    1215           1 :         for _, c := range s {
    1216           1 :                 switch c {
    1217           1 :                 case '(':
    1218           1 :                         open = true
    1219           1 :                 case ')':
    1220           1 :                         total += unHumanize(b.String())
    1221           1 :                         b.Reset()
    1222           1 :                         open = false
    1223           1 :                 default:
    1224           1 :                         if open {
    1225           1 :                                 b.WriteRune(c)
    1226           1 :                         }
    1227             :                 }
    1228             :         }
    1229           1 :         return
    1230             : }

Generated by: LCOV version 1.14