Line data Source code
1 : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package logs
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : "cmp"
11 : "fmt"
12 : "math"
13 : "os"
14 : "path/filepath"
15 : "regexp"
16 : "slices"
17 : "sort"
18 : "strconv"
19 : "strings"
20 : "time"
21 :
22 : "github.com/cockroachdb/errors"
23 : "github.com/cockroachdb/pebble/internal/humanize"
24 : "github.com/cockroachdb/pebble/internal/manifest"
25 : "github.com/spf13/cobra"
26 : )
27 :
28 : const numLevels = manifest.NumLevels
29 :
30 : var (
31 : // Captures a common logging prefix that can be used as the context for the
32 : // surrounding information captured by other expressions. Example:
33 : //
34 : // I211215 14:26:56.012382 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1845 ⋮ [T1,n5,pebble,s5] ...
35 : //
36 : logContextPattern = regexp.MustCompile(
37 : `^.*` +
38 : /* Timestamp */ `(?P<timestamp>\d{6} \d{2}:\d{2}:\d{2}.\d{6}).*` +
39 : /* Node / Store */ `\[(T(\d+|\?),)?n(?P<node>\d+|\?).*,s(?P<store>\d+|\?).*?\].*`,
40 : )
41 : logContextPatternTimestampIdx = logContextPattern.SubexpIndex("timestamp")
42 : logContextPatternNodeIdx = logContextPattern.SubexpIndex("node")
43 : logContextPatternStoreIdx = logContextPattern.SubexpIndex("store")
44 :
45 : // Matches either a compaction or a memtable flush log line.
46 : //
47 : // A compaction start / end line resembles:
48 : // "[JOB X] compact(ed|ing)"
49 : //
50 : // A memtable flush start / end line resembles:
51 : // "[JOB X] flush(ed|ing)"
52 : //
53 : // An ingested sstable flush looks like:
54 : // "[JOB 226] flushed 6 ingested flushables"
55 : sentinelPattern = regexp.MustCompile(`\[JOB.*(?P<prefix>compact|flush|ingest)(?P<suffix>ed|ing)[^:]`)
56 : sentinelPatternPrefixIdx = sentinelPattern.SubexpIndex("prefix")
57 : sentinelPatternSuffixIdx = sentinelPattern.SubexpIndex("suffix")
58 :
59 : // Example compaction start and end log lines:
60 : // 23.1 and older:
61 : // I211215 14:26:56.012382 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1845 ⋮ [n5,pebble,s5] 1216510 [JOB 284925] compacting(default) L2 [442555] (4.2 M) + L3 [445853] (8.4 M)
62 : // I211215 14:26:56.318543 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1886 ⋮ [n5,pebble,s5] 1216554 [JOB 284925] compacted(default) L2 [442555] (4.2 M) + L3 [445853] (8.4 M) -> L3 [445883 445887] (13 M), in 0.3s, output rate 42 M/s
63 : // current:
64 : // I211215 14:26:56.012382 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1845 ⋮ [n5,pebble,s5] 1216510 [JOB 284925] compacting(default) L2 [442555] (4.2MB) + L3 [445853] (8.4MB)
65 : // I211215 14:26:56.318543 51831533 3@vendor/github.com/cockroachdb/pebble/compaction.go:1886 ⋮ [n5,pebble,s5] 1216554 [JOB 284925] compacted(default) L2 [442555] (4.2MB) + L3 [445853] (8.4MB) -> L3 [445883 445887] (13MB), in 0.3s, output rate 42MB/s
66 : //
67 : // NOTE: we use the log timestamp to compute the compaction duration rather
68 : // than the Pebble log output.
69 : compactionPattern = regexp.MustCompile(
70 : `^.*` +
71 : /* Job ID */ `\[JOB (?P<job>\d+)]\s` +
72 : /* Start / end */ `compact(?P<suffix>ed|ing)` +
73 :
74 : /* Compaction type */
75 : `\((?P<type>.*?)\)\s` +
76 : /* Optional annotation*/ `?(\s*\[(?P<annotations>.*?)\]\s*)?` +
77 :
78 : /* Start / end level */
79 : `(?P<levels>L(?P<from>\d).*?(?:.*(?:\+|->)\sL(?P<to>\d))?` +
80 : /* Bytes */
81 : `(?:.*?\((?P<bytes>[0-9.]+( [BKMGTPE]|[KMGTPE]?B))\))` +
82 : /* Score */
83 : `?(\s*(Score=\d+(\.\d+)))?)`,
84 : )
85 : compactionPatternJobIdx = compactionPattern.SubexpIndex("job")
86 : compactionPatternSuffixIdx = compactionPattern.SubexpIndex("suffix")
87 : compactionPatternTypeIdx = compactionPattern.SubexpIndex("type")
88 : compactionPatternLevels = compactionPattern.SubexpIndex("levels")
89 : compactionPatternFromIdx = compactionPattern.SubexpIndex("from")
90 : compactionPatternToIdx = compactionPattern.SubexpIndex("to")
91 : compactionPatternBytesIdx = compactionPattern.SubexpIndex("bytes")
92 :
93 : // Example memtable flush log lines:
94 : // 23.1 and older:
95 : // I211213 16:23:48.903751 21136 3@vendor/github.com/cockroachdb/pebble/event.go:599 ⋮ [n9,pebble,s9] 24 [JOB 10] flushing 2 memtables to L0
96 : // I211213 16:23:49.134464 21136 3@vendor/github.com/cockroachdb/pebble/event.go:603 ⋮ [n9,pebble,s9] 26 [JOB 10] flushed 2 memtables to L0 [1535806] (1.3 M), in 0.2s, output rate 5.8 M/s
97 : // current:
98 : // I211213 16:23:48.903751 21136
99 : // 3@vendor/github.com/cockroachdb/pebble/event.go:599 ⋮ [n9,pebble,s9] 24 [JOB 10] flushing 2 memtables (1.4MB) to L0
100 : // I211213 16:23:49.134464 21136
101 : // 3@vendor/github.com/cockroachdb/pebble/event.go:603 ⋮ [n9,pebble,s9] 26 [JOB 10] flushed 2 memtables (1.4MB) to L0 [1535806] (1.3MB), in 0.2s, output rate 5.8MB/s
102 : //
103 : // NOTE: we use the log timestamp to compute the flush duration rather than
104 : // the Pebble log output.
105 : flushPattern = regexp.MustCompile(
106 : `^..*` +
107 : /* Job ID */ `\[JOB (?P<job>\d+)]\s` +
108 : /* Compaction type */ `flush(?P<suffix>ed|ing)\s` +
109 : /* Memtable count; size (23.2+) */ `\d+ memtables? (\([^)]+\))?` +
110 : /* SSTable Bytes */ `(?:.*?\((?P<bytes>[0-9.]+( [BKMGTPE]|[KMGTPE]?B))\))?`,
111 : )
112 : flushPatternSuffixIdx = flushPattern.SubexpIndex("suffix")
113 : flushPatternJobIdx = flushPattern.SubexpIndex("job")
114 : flushPatternBytesIdx = flushPattern.SubexpIndex("bytes")
115 :
116 : // Example ingested log lines:
117 : // 23.1 and older:
118 : // I220228 16:01:22.487906 18476248525 3@vendor/github.com/cockroachdb/pebble/ingest.go:637 ⋮ [n24,pebble,s24] 33430782 [JOB 10211226] ingested L0:21818678 (1.8 K), L0:21818683 (1.2 K), L0:21818679 (1.6 K), L0:21818680 (1.1 K), L0:21818681 (1.1 K), L0:21818682 (160 M)
119 : // current:
120 : // I220228 16:01:22.487906 18476248525 3@vendor/github.com/cockroachdb/pebble/ingest.go:637 ⋮ [n24,pebble,s24] 33430782 [JOB 10211226] ingested L0:21818678 (1.8KB), L0:21818683 (1.2KB), L0:21818679 (1.6KB), L0:21818680 (1.1KB), L0:21818681 (1.1KB), L0:21818682 (160MB)
121 : //
122 : ingestedPattern = regexp.MustCompile(
123 : `^.*` +
124 : /* Job ID */ `\[JOB (?P<job>\d+)]\s` +
125 : /* ingested */ `ingested\s`)
126 : ingestedPatternJobIdx = ingestedPattern.SubexpIndex("job")
127 : ingestedFilePattern = regexp.MustCompile(
128 : `L` +
129 : /* Level */ `(?P<level>\d):` +
130 : /* File number */ `(?P<file>\d+)\s` +
131 : /* Bytes */ `\((?P<bytes>[0-9.]+( [BKMGTPE]|[KMGTPE]?B))\)`)
132 : ingestedFilePatternLevelIdx = ingestedFilePattern.SubexpIndex("level")
133 : ingestedFilePatternFileIdx = ingestedFilePattern.SubexpIndex("file")
134 : ingestedFilePatternBytesIdx = ingestedFilePattern.SubexpIndex("bytes")
135 :
136 : // flushable ingestions
137 : //
138 : // I230831 04:13:28.824280 3780 3@pebble/event.go:685 ⋮ [n10,s10,pebble] 365 [JOB 226] flushed 6 ingested flushables L0:024334 (1.5KB) + L0:024339 (1.0KB) + L0:024335 (1.9KB) + L0:024336 (1.1KB) + L0:024337 (1.1KB) + L0:024338 (12KB) in 0.0s (0.0s total), output rate 67MB/s
139 : flushableIngestedPattern = regexp.MustCompile(
140 : `^.*` +
141 : /* Job ID */ `\[JOB (?P<job>\d+)]\s` +
142 : /* match ingested flushable */ `flushed \d ingested flushable`)
143 : flushableIngestedPatternJobIdx = flushableIngestedPattern.SubexpIndex("job")
144 :
145 : // Example read-amp log line:
146 : // 23.1 and older:
147 : // total 31766 188 G - 257 G 187 G 48 K 3.6 G 744 536 G 49 K 278 G 5 2.1
148 : // current:
149 : // total | 1 639B 0B | - | 84B | 0 0B | 0 0B | 3 1.9KB | 1.2KB | 1 23.7
150 : readAmpPattern = regexp.MustCompile(
151 : /* Read amp */ `(?:^|\+)(?:\s{2}total|total \|).*?\s(?P<value>\d+)\s.{4,7}$`,
152 : )
153 : readAmpPatternValueIdx = readAmpPattern.SubexpIndex("value")
154 : )
155 :
156 : const (
157 : // timeFmt matches the Cockroach log timestamp format.
158 : // See: https://github.com/cockroachdb/cockroach/blob/master/pkg/util/log/format_crdb_v2.go
159 : timeFmt = "060102 15:04:05.000000"
160 :
161 : // timeFmtSlim is similar to timeFmt, except that it strips components with a
162 : // lower granularity than a minute.
163 : timeFmtSlim = "060102 15:04"
164 :
165 : // timeFmtHrMinSec prints only the hour, minute and second of the time.
166 : timeFmtHrMinSec = "15:04:05"
167 : )
168 :
169 : // compactionType is the type of compaction. It tracks the types in
170 : // compaction.go. We copy the values here to avoid exporting the types in
171 : // compaction.go.
172 : type compactionType uint8
173 :
174 : const (
175 : compactionTypeDefault compactionType = iota
176 : compactionTypeFlush
177 : compactionTypeMove
178 : compactionTypeDeleteOnly
179 : compactionTypeElisionOnly
180 : compactionTypeRead
181 : )
182 :
183 : // String implements fmt.Stringer.
184 1 : func (c compactionType) String() string {
185 1 : switch c {
186 1 : case compactionTypeDefault:
187 1 : return "default"
188 0 : case compactionTypeMove:
189 0 : return "move"
190 0 : case compactionTypeDeleteOnly:
191 0 : return "delete-only"
192 0 : case compactionTypeElisionOnly:
193 0 : return "elision-only"
194 0 : case compactionTypeRead:
195 0 : return "read"
196 0 : default:
197 0 : panic(errors.Newf("unknown compaction type: %s", c))
198 : }
199 : }
200 :
201 : // parseCompactionType parses the given compaction type string and returns a
202 : // compactionType.
203 1 : func parseCompactionType(s string) (t compactionType, err error) {
204 1 : switch s {
205 1 : case "default":
206 1 : t = compactionTypeDefault
207 1 : case "move":
208 1 : t = compactionTypeMove
209 1 : case "delete-only":
210 1 : t = compactionTypeDeleteOnly
211 0 : case "elision-only":
212 0 : t = compactionTypeElisionOnly
213 0 : case "read":
214 0 : t = compactionTypeRead
215 0 : default:
216 0 : err = errors.Newf("unknown compaction type: %s", s)
217 : }
218 1 : return
219 : }
220 :
221 : // compactionStart is a compaction start event.
222 : type compactionStart struct {
223 : ctx logContext
224 : jobID int
225 : cType compactionType
226 : fromLevel int
227 : toLevel int
228 : inputBytes uint64
229 : }
230 :
231 : // parseCompactionStart converts the given regular expression sub-matches for a
232 : // compaction start log line into a compactionStart event.
233 1 : func parseCompactionStart(matches []string) (compactionStart, error) {
234 1 : var start compactionStart
235 1 :
236 1 : // Parse job ID.
237 1 : jobID, err := strconv.Atoi(matches[compactionPatternJobIdx])
238 1 : if err != nil {
239 0 : return start, errors.Newf("could not parse jobID: %s", err)
240 0 : }
241 :
242 : // Parse compaction type.
243 1 : cType, err := parseCompactionType(matches[compactionPatternTypeIdx])
244 1 : if err != nil {
245 0 : return start, err
246 0 : }
247 :
248 : // Parse input bytes.
249 1 : inputBytes, err := sumInputBytes(matches[compactionPatternLevels])
250 1 : if err != nil {
251 0 : return start, errors.Newf("could not sum input bytes: %s", err)
252 0 : }
253 :
254 : // Parse from-level.
255 1 : from, err := strconv.Atoi(matches[compactionPatternFromIdx])
256 1 : if err != nil {
257 0 : return start, errors.Newf("could not parse from-level: %s", err)
258 0 : }
259 :
260 : // Parse to-level. For deletion and elision compactions, set the same level.
261 1 : to := from
262 1 : if cType != compactionTypeElisionOnly && cType != compactionTypeDeleteOnly {
263 1 : to, err = strconv.Atoi(matches[compactionPatternToIdx])
264 1 : if err != nil {
265 0 : return start, errors.Newf("could not parse to-level: %s", err)
266 0 : }
267 : }
268 :
269 1 : start = compactionStart{
270 1 : jobID: jobID,
271 1 : cType: cType,
272 1 : fromLevel: from,
273 1 : toLevel: to,
274 1 : inputBytes: inputBytes,
275 1 : }
276 1 :
277 1 : return start, nil
278 : }
279 :
280 : // compactionEnd is a compaction end event.
281 : type compactionEnd struct {
282 : jobID int
283 : writtenBytes uint64
284 : // TODO(jackson): Parse and include the aggregate size of input
285 : // sstables. It may be instructive, because compactions that drop
286 : // keys write less data than they remove from the input level.
287 : }
288 :
289 : // parseCompactionEnd converts the given regular expression sub-matches for a
290 : // compaction end log line into a compactionEnd event.
291 1 : func parseCompactionEnd(matches []string) (compactionEnd, error) {
292 1 : var end compactionEnd
293 1 :
294 1 : // Parse job ID.
295 1 : jobID, err := strconv.Atoi(matches[compactionPatternJobIdx])
296 1 : if err != nil {
297 0 : return end, errors.Newf("could not parse jobID: %s", err)
298 0 : }
299 1 : end = compactionEnd{jobID: jobID}
300 1 :
301 1 : // Optionally, if we have compacted bytes.
302 1 : if matches[compactionPatternBytesIdx] != "" {
303 1 : end.writtenBytes = unHumanize(matches[compactionPatternBytesIdx])
304 1 : }
305 :
306 1 : return end, nil
307 : }
308 :
309 : // parseFlushStart converts the given regular expression sub-matches for a
310 : // memtable flush start log line into a compactionStart event.
311 1 : func parseFlushStart(matches []string) (compactionStart, error) {
312 1 : var start compactionStart
313 1 : // Parse job ID.
314 1 : jobID, err := strconv.Atoi(matches[flushPatternJobIdx])
315 1 : if err != nil {
316 0 : return start, errors.Newf("could not parse jobID: %s", err)
317 0 : }
318 1 : c := compactionStart{
319 1 : jobID: jobID,
320 1 : cType: compactionTypeFlush,
321 1 : fromLevel: -1,
322 1 : toLevel: 0,
323 1 : }
324 1 : return c, nil
325 : }
326 :
327 : // parseFlushEnd converts the given regular expression sub-matches for a
328 : // memtable flush end log line into a compactionEnd event.
329 1 : func parseFlushEnd(matches []string) (compactionEnd, error) {
330 1 : var end compactionEnd
331 1 :
332 1 : // Parse job ID.
333 1 : jobID, err := strconv.Atoi(matches[flushPatternJobIdx])
334 1 : if err != nil {
335 0 : return end, errors.Newf("could not parse jobID: %s", err)
336 0 : }
337 1 : end = compactionEnd{jobID: jobID}
338 1 :
339 1 : // Optionally, if we have flushed bytes.
340 1 : if matches[flushPatternBytesIdx] != "" {
341 1 : end.writtenBytes = unHumanize(matches[flushPatternBytesIdx])
342 1 : }
343 :
344 1 : return end, nil
345 : }
346 :
347 : // event describes an aggregated event (eg, start and end events
348 : // combined if necessary).
349 : type event struct {
350 : nodeID int
351 : storeID int
352 : jobID int
353 : timeStart time.Time
354 : timeEnd time.Time
355 : compaction *compaction
356 : ingest *ingest
357 : }
358 :
359 : // compaction represents an aggregated compaction event (i.e. the combination of
360 : // a start and end event).
361 : type compaction struct {
362 : cType compactionType
363 : fromLevel int
364 : toLevel int
365 : inputBytes uint64
366 : outputBytes uint64
367 : }
368 :
369 : // ingest describes the completion of an ingest.
370 : type ingest struct {
371 : files []ingestedFile
372 : }
373 :
374 : type ingestedFile struct {
375 : level int
376 : fileNum int
377 : sizeBytes uint64
378 : }
379 :
380 : // readAmp represents a read-amp event.
381 : type readAmp struct {
382 : ctx logContext
383 : readAmp int
384 : }
385 :
386 : type nodeStoreJob struct {
387 : node, store, job int
388 : }
389 :
390 0 : func (n nodeStoreJob) String() string {
391 0 : return fmt.Sprintf("(node=%d,store=%d,job=%d)", n.node, n.store, n.job)
392 0 : }
393 :
394 : type errorEvent struct {
395 : path string
396 : line string
397 : err error
398 : }
399 :
400 : // logEventCollector keeps track of open compaction events and read-amp events
401 : // over the course of parsing log line events. Completed compaction events are
402 : // added to the collector once a matching start and end pair are encountered.
403 : // Read-amp events are added as they are encountered (the have no start / end
404 : // concept).
405 : type logEventCollector struct {
406 : ctx logContext
407 : m map[nodeStoreJob]compactionStart
408 : events []event
409 : readAmps []readAmp
410 : errors []errorEvent
411 : }
412 :
413 : // newEventCollector instantiates a new logEventCollector.
414 1 : func newEventCollector() *logEventCollector {
415 1 : return &logEventCollector{
416 1 : m: make(map[nodeStoreJob]compactionStart),
417 1 : }
418 1 : }
419 :
420 : // addError records an error encountered during log parsing.
421 0 : func (c *logEventCollector) addError(path, line string, err error) {
422 0 : c.errors = append(c.errors, errorEvent{path: path, line: line, err: err})
423 0 : }
424 :
425 : // addCompactionStart adds a new compactionStart to the collector. The event is
426 : // tracked by its job ID.
427 1 : func (c *logEventCollector) addCompactionStart(start compactionStart) error {
428 1 : key := nodeStoreJob{c.ctx.node, c.ctx.store, start.jobID}
429 1 : if _, ok := c.m[key]; ok {
430 0 : return errors.Newf("start event already seen for %s", key)
431 0 : }
432 1 : start.ctx = c.ctx
433 1 : c.m[key] = start
434 1 : return nil
435 : }
436 :
437 : // addCompactionEnd completes the compaction event for the given compactionEnd.
438 1 : func (c *logEventCollector) addCompactionEnd(end compactionEnd) {
439 1 : key := nodeStoreJob{c.ctx.node, c.ctx.store, end.jobID}
440 1 : start, ok := c.m[key]
441 1 : if !ok {
442 0 : _, _ = fmt.Fprintf(
443 0 : os.Stderr,
444 0 : "compaction end event missing start event for %s; skipping\n", key,
445 0 : )
446 0 : return
447 0 : }
448 :
449 : // Remove the job from the collector once it has been matched.
450 1 : delete(c.m, key)
451 1 :
452 1 : c.events = append(c.events, event{
453 1 : nodeID: start.ctx.node,
454 1 : storeID: start.ctx.store,
455 1 : jobID: start.jobID,
456 1 : timeStart: start.ctx.timestamp,
457 1 : timeEnd: c.ctx.timestamp,
458 1 : compaction: &compaction{
459 1 : cType: start.cType,
460 1 : fromLevel: start.fromLevel,
461 1 : toLevel: start.toLevel,
462 1 : inputBytes: start.inputBytes,
463 1 : outputBytes: end.writtenBytes,
464 1 : },
465 1 : })
466 : }
467 :
468 : // addReadAmp adds the readAmp event to the collector.
469 1 : func (c *logEventCollector) addReadAmp(ra readAmp) {
470 1 : ra.ctx = c.ctx
471 1 : c.readAmps = append(c.readAmps, ra)
472 1 : }
473 :
474 : // logContext captures the metadata of log lines.
475 : type logContext struct {
476 : timestamp time.Time
477 : node, store int
478 : }
479 :
480 : // saveContext saves the given logContext in the collector.
481 1 : func (c *logEventCollector) saveContext(ctx logContext) {
482 1 : c.ctx = ctx
483 1 : }
484 :
485 : // level is a level in the LSM. The WAL is level -1.
486 : type level int
487 :
488 : // String implements fmt.Stringer.
489 1 : func (l level) String() string {
490 1 : if l == -1 {
491 0 : return "WAL"
492 0 : }
493 1 : return "L" + strconv.Itoa(int(l))
494 : }
495 :
496 : // fromTo is a map key for (from, to) level tuples.
497 : type fromTo struct {
498 : from, to level
499 : }
500 :
501 : // compactionTypeCount is a mapping from compaction type to count.
502 : type compactionTypeCount map[compactionType]int
503 :
504 : // windowSummary summarizes events in a window of time between a start and end
505 : // time. The window tracks:
506 : // - for each compaction type: counts, total bytes compacted, and total duration.
507 : // - total ingested bytes for each level
508 : // - read amp magnitudes
509 : type windowSummary struct {
510 : nodeID, storeID int
511 : tStart, tEnd time.Time
512 : eventCount int
513 : flushedCount int
514 : flushedBytes uint64
515 : flushedTime time.Duration
516 : compactionCounts map[fromTo]compactionTypeCount
517 : compactionBytesIn map[fromTo]uint64
518 : compactionBytesOut map[fromTo]uint64
519 : compactionBytesMoved map[fromTo]uint64
520 : compactionBytesDel map[fromTo]uint64
521 : compactionTime map[fromTo]time.Duration
522 : ingestedCount [numLevels]int
523 : ingestedBytes [numLevels]uint64
524 : readAmps []readAmp
525 : longRunning []event
526 : }
527 :
528 : // String implements fmt.Stringer, returning a formatted window summary.
529 1 : func (s windowSummary) String() string {
530 1 : type fromToCount struct {
531 1 : ft fromTo
532 1 : counts compactionTypeCount
533 1 : bytesIn uint64
534 1 : bytesOut uint64
535 1 : bytesMoved uint64
536 1 : bytesDel uint64
537 1 : duration time.Duration
538 1 : }
539 1 : var pairs []fromToCount
540 1 : for k, v := range s.compactionCounts {
541 1 : pairs = append(pairs, fromToCount{
542 1 : ft: k,
543 1 : counts: v,
544 1 : bytesIn: s.compactionBytesIn[k],
545 1 : bytesOut: s.compactionBytesOut[k],
546 1 : bytesMoved: s.compactionBytesMoved[k],
547 1 : bytesDel: s.compactionBytesDel[k],
548 1 : duration: s.compactionTime[k],
549 1 : })
550 1 : }
551 1 : slices.SortFunc(pairs, func(l, r fromToCount) int {
552 1 : if v := cmp.Compare(l.ft.from, r.ft.from); v != 0 {
553 1 : return v
554 1 : }
555 0 : return cmp.Compare(l.ft.to, r.ft.to)
556 : })
557 :
558 1 : nodeID, storeID := "?", "?"
559 1 : if s.nodeID != -1 {
560 1 : nodeID = strconv.Itoa(s.nodeID)
561 1 : }
562 1 : if s.storeID != -1 {
563 1 : storeID = strconv.Itoa(s.storeID)
564 1 : }
565 :
566 1 : var sb strings.Builder
567 1 : sb.WriteString(fmt.Sprintf("node: %s, store: %s\n", nodeID, storeID))
568 1 : sb.WriteString(fmt.Sprintf(" from: %s\n", s.tStart.Format(timeFmtSlim)))
569 1 : sb.WriteString(fmt.Sprintf(" to: %s\n", s.tEnd.Format(timeFmtSlim)))
570 1 : var count, sum int
571 1 : for _, ra := range s.readAmps {
572 1 : count++
573 1 : sum += ra.readAmp
574 1 : }
575 1 : sb.WriteString(fmt.Sprintf(" r-amp: %.1f\n", float64(sum)/float64(count)))
576 1 :
577 1 : // Print flush+ingest statistics.
578 1 : {
579 1 : var headerWritten bool
580 1 : maybeWriteHeader := func() {
581 1 : if !headerWritten {
582 1 : sb.WriteString("_kind______from______to_____________________________________count___bytes______time\n")
583 1 : headerWritten = true
584 1 : }
585 : }
586 :
587 1 : if s.flushedCount > 0 {
588 1 : maybeWriteHeader()
589 1 : fmt.Fprintf(&sb, "%-7s %7s %7d %7s %9s\n",
590 1 : "flush", "L0", s.flushedCount, humanize.Bytes.Uint64(s.flushedBytes),
591 1 : s.flushedTime.Truncate(time.Second))
592 1 : }
593 :
594 1 : count := s.flushedCount
595 1 : sum := s.flushedBytes
596 1 : totalTime := s.flushedTime
597 1 : for l := 0; l < len(s.ingestedBytes); l++ {
598 1 : if s.ingestedCount[l] == 0 {
599 1 : continue
600 : }
601 1 : maybeWriteHeader()
602 1 : fmt.Fprintf(&sb, "%-7s %7s %7d %7s\n",
603 1 : "ingest", fmt.Sprintf("L%d", l), s.ingestedCount[l], humanize.Bytes.Uint64(s.ingestedBytes[l]))
604 1 : count += s.ingestedCount[l]
605 1 : sum += s.ingestedBytes[l]
606 : }
607 1 : if headerWritten {
608 1 : fmt.Fprintf(&sb, "total %7d %7s %9s\n",
609 1 : count, humanize.Bytes.Uint64(sum), totalTime.Truncate(time.Second),
610 1 : )
611 1 : }
612 : }
613 :
614 : // Print compactions statistics.
615 1 : if len(s.compactionCounts) > 0 {
616 1 : sb.WriteString("_kind______from______to___default____move___elide__delete___count___in(B)__out(B)__mov(B)__del(B)______time\n")
617 1 : var totalDef, totalMove, totalElision, totalDel int
618 1 : var totalBytesIn, totalBytesOut, totalBytesMoved, totalBytesDel uint64
619 1 : var totalTime time.Duration
620 1 : for _, p := range pairs {
621 1 : def := p.counts[compactionTypeDefault]
622 1 : move := p.counts[compactionTypeMove]
623 1 : elision := p.counts[compactionTypeElisionOnly]
624 1 : del := p.counts[compactionTypeDeleteOnly]
625 1 : total := def + move + elision + del
626 1 :
627 1 : str := fmt.Sprintf("%-7s %7s %7s %7d %7d %7d %7d %7d %7s %7s %7s %7s %9s\n",
628 1 : "compact", p.ft.from, p.ft.to, def, move, elision, del, total,
629 1 : humanize.Bytes.Uint64(p.bytesIn), humanize.Bytes.Uint64(p.bytesOut),
630 1 : humanize.Bytes.Uint64(p.bytesMoved), humanize.Bytes.Uint64(p.bytesDel),
631 1 : p.duration.Truncate(time.Second))
632 1 : sb.WriteString(str)
633 1 :
634 1 : totalDef += def
635 1 : totalMove += move
636 1 : totalElision += elision
637 1 : totalDel += del
638 1 : totalBytesIn += p.bytesIn
639 1 : totalBytesOut += p.bytesOut
640 1 : totalBytesMoved += p.bytesMoved
641 1 : totalBytesDel += p.bytesDel
642 1 : totalTime += p.duration
643 1 : }
644 1 : sb.WriteString(fmt.Sprintf("total %19d %7d %7d %7d %7d %7s %7s %7s %7s %9s\n",
645 1 : totalDef, totalMove, totalElision, totalDel, s.eventCount,
646 1 : humanize.Bytes.Uint64(totalBytesIn), humanize.Bytes.Uint64(totalBytesOut),
647 1 : humanize.Bytes.Uint64(totalBytesMoved), humanize.Bytes.Uint64(totalBytesDel),
648 1 : totalTime.Truncate(time.Second)))
649 : }
650 :
651 : // (Optional) Long running events.
652 1 : if len(s.longRunning) > 0 {
653 1 : sb.WriteString("long-running events (descending runtime):\n")
654 1 : sb.WriteString("_kind________from________to_______job______type_____start_______end____dur(s)_____bytes:\n")
655 1 : for _, e := range s.longRunning {
656 1 : c := e.compaction
657 1 : kind := "compact"
658 1 : if c.fromLevel == -1 {
659 0 : kind = "flush"
660 0 : }
661 1 : sb.WriteString(fmt.Sprintf("%-7s %9s %9s %9d %9s %9s %9s %9.0f %9s\n",
662 1 : kind, level(c.fromLevel), level(c.toLevel), e.jobID, c.cType,
663 1 : e.timeStart.Format(timeFmtHrMinSec), e.timeEnd.Format(timeFmtHrMinSec),
664 1 : e.timeEnd.Sub(e.timeStart).Seconds(), humanize.Bytes.Uint64(c.outputBytes)))
665 : }
666 : }
667 :
668 1 : return sb.String()
669 : }
670 :
671 : // windowSummarySlice is a slice of windowSummary that sorts in order of start
672 : // time, node, then store.
673 : type windowsSummarySlice []windowSummary
674 :
675 1 : func (s windowsSummarySlice) Len() int {
676 1 : return len(s)
677 1 : }
678 :
679 1 : func (s windowsSummarySlice) Less(i, j int) bool {
680 1 : if !s[i].tStart.Equal(s[j].tStart) {
681 1 : return s[i].tStart.Before(s[j].tStart)
682 1 : }
683 1 : if s[i].nodeID != s[j].nodeID {
684 1 : return s[i].nodeID < s[j].nodeID
685 1 : }
686 1 : return s[i].storeID < s[j].storeID
687 : }
688 :
689 1 : func (s windowsSummarySlice) Swap(i, j int) {
690 1 : s[i], s[j] = s[j], s[i]
691 1 : }
692 :
693 : // eventSlice is a slice of events that sorts in order of node, store,
694 : // then event start time.
695 : type eventSlice []event
696 :
697 1 : func (s eventSlice) Len() int {
698 1 : return len(s)
699 1 : }
700 :
701 1 : func (s eventSlice) Less(i, j int) bool {
702 1 : if s[i].nodeID != s[j].nodeID {
703 1 : return s[i].nodeID < s[j].nodeID
704 1 : }
705 1 : if s[i].storeID != s[j].storeID {
706 1 : return s[i].storeID < s[j].storeID
707 1 : }
708 1 : return s[i].timeStart.Before(s[j].timeStart)
709 : }
710 :
711 1 : func (s eventSlice) Swap(i, j int) {
712 1 : s[i], s[j] = s[j], s[i]
713 1 : }
714 :
715 : // readAmpSlice is a slice of readAmp events that sorts in order of node, store,
716 : // then read amp event start time.
717 : type readAmpSlice []readAmp
718 :
719 1 : func (r readAmpSlice) Len() int {
720 1 : return len(r)
721 1 : }
722 :
723 1 : func (r readAmpSlice) Less(i, j int) bool {
724 1 : // Sort by node, store, then read-amp.
725 1 : if r[i].ctx.node != r[j].ctx.node {
726 0 : return r[i].ctx.node < r[j].ctx.node
727 0 : }
728 1 : if r[i].ctx.store != r[j].ctx.store {
729 0 : return r[i].ctx.store < r[j].ctx.store
730 0 : }
731 1 : return r[i].ctx.timestamp.Before(r[j].ctx.timestamp)
732 : }
733 :
734 0 : func (r readAmpSlice) Swap(i, j int) {
735 0 : r[i], r[j] = r[j], r[i]
736 0 : }
737 :
738 : // aggregator combines compaction and read-amp events within windows of fixed
739 : // duration and returns one aggregated windowSummary struct per window.
740 : type aggregator struct {
741 : window time.Duration
742 : events []event
743 : readAmps []readAmp
744 : longRunningLimit time.Duration
745 : }
746 :
747 : // newAggregator returns a new aggregator.
748 : func newAggregator(
749 : window, longRunningLimit time.Duration, events []event, readAmps []readAmp,
750 1 : ) *aggregator {
751 1 : return &aggregator{
752 1 : window: window,
753 1 : events: events,
754 1 : readAmps: readAmps,
755 1 : longRunningLimit: longRunningLimit,
756 1 : }
757 1 : }
758 :
759 : // aggregate aggregates the events into windows, returning the windowSummary for
760 : // each interval.
761 1 : func (a *aggregator) aggregate() []windowSummary {
762 1 : if len(a.events) == 0 {
763 0 : return nil
764 0 : }
765 :
766 : // Sort the event and read-amp slices by start time.
767 1 : sort.Sort(eventSlice(a.events))
768 1 : sort.Sort(readAmpSlice(a.readAmps))
769 1 :
770 1 : initWindow := func(e event) *windowSummary {
771 1 : start := e.timeStart.Truncate(a.window)
772 1 : return &windowSummary{
773 1 : nodeID: e.nodeID,
774 1 : storeID: e.storeID,
775 1 : tStart: start,
776 1 : tEnd: start.Add(a.window),
777 1 : compactionCounts: make(map[fromTo]compactionTypeCount),
778 1 : compactionBytesIn: make(map[fromTo]uint64),
779 1 : compactionBytesOut: make(map[fromTo]uint64),
780 1 : compactionBytesMoved: make(map[fromTo]uint64),
781 1 : compactionBytesDel: make(map[fromTo]uint64),
782 1 : compactionTime: make(map[fromTo]time.Duration),
783 1 : }
784 1 : }
785 :
786 1 : var windows []windowSummary
787 1 : var j int // index for read-amps
788 1 : finishWindow := func(cur *windowSummary) {
789 1 : // Collect read-amp values for the previous window.
790 1 : var readAmps []readAmp
791 1 : for j < len(a.readAmps) {
792 1 : ra := a.readAmps[j]
793 1 :
794 1 : // Skip values before the current window.
795 1 : if ra.ctx.node < cur.nodeID ||
796 1 : ra.ctx.store < cur.storeID ||
797 1 : ra.ctx.timestamp.Before(cur.tStart) {
798 0 : j++
799 0 : continue
800 : }
801 :
802 : // We've passed over the current window. Stop.
803 1 : if ra.ctx.node > cur.nodeID ||
804 1 : ra.ctx.store > cur.storeID ||
805 1 : ra.ctx.timestamp.After(cur.tEnd) {
806 1 : break
807 : }
808 :
809 : // Collect this read-amp value.
810 1 : readAmps = append(readAmps, ra)
811 1 : j++
812 : }
813 1 : cur.readAmps = readAmps
814 1 :
815 1 : // Sort long running compactions in descending order of duration.
816 1 : slices.SortFunc(cur.longRunning, func(l, r event) int {
817 0 : return cmp.Compare(l.timeEnd.Sub(l.timeStart), r.timeEnd.Sub(r.timeStart))
818 0 : })
819 :
820 : // Add the completed window to the set of windows.
821 1 : windows = append(windows, *cur)
822 : }
823 :
824 : // Move through the compactions, collecting relevant compactions into the same
825 : // window. Windows have the same node and store, and a compaction start time
826 : // within a given range.
827 1 : i := 0
828 1 : curWindow := initWindow(a.events[i])
829 1 : for ; ; i++ {
830 1 : // No more windows. Complete the current window.
831 1 : if i == len(a.events) {
832 1 : finishWindow(curWindow)
833 1 : break
834 : }
835 1 : e := a.events[i]
836 1 :
837 1 : // If we're at the start of a new interval, finalize the current window and
838 1 : // start a new one.
839 1 : if curWindow.nodeID != e.nodeID ||
840 1 : curWindow.storeID != e.storeID ||
841 1 : e.timeStart.After(curWindow.tEnd) {
842 1 : finishWindow(curWindow)
843 1 : curWindow = initWindow(e)
844 1 : }
845 :
846 1 : switch {
847 1 : case e.ingest != nil:
848 1 : // Update ingest stats.
849 1 : for _, f := range e.ingest.files {
850 1 : curWindow.ingestedCount[f.level]++
851 1 : curWindow.ingestedBytes[f.level] += f.sizeBytes
852 1 : }
853 1 : case e.compaction != nil && e.compaction.cType == compactionTypeFlush:
854 1 : // Update flush stats.
855 1 : f := e.compaction
856 1 : curWindow.flushedCount++
857 1 : curWindow.flushedBytes += f.outputBytes
858 1 : curWindow.flushedTime += e.timeEnd.Sub(e.timeStart)
859 1 : case e.compaction != nil:
860 1 : // Update compaction stats.
861 1 : c := e.compaction
862 1 : // Update compaction counts.
863 1 : ft := fromTo{level(c.fromLevel), level(c.toLevel)}
864 1 : m, ok := curWindow.compactionCounts[ft]
865 1 : if !ok {
866 1 : m = make(compactionTypeCount)
867 1 : curWindow.compactionCounts[ft] = m
868 1 : }
869 1 : m[c.cType]++
870 1 : curWindow.eventCount++
871 1 :
872 1 : // Update compacted bytes in / out / moved / deleted.
873 1 : switch c.cType {
874 1 : case compactionTypeMove:
875 1 : curWindow.compactionBytesMoved[ft] += c.inputBytes
876 1 : case compactionTypeDeleteOnly:
877 1 : curWindow.compactionBytesDel[ft] += c.inputBytes
878 1 : default:
879 1 : curWindow.compactionBytesIn[ft] += c.inputBytes
880 1 : curWindow.compactionBytesOut[ft] += c.outputBytes
881 : }
882 :
883 : // Update compaction time.
884 1 : _, ok = curWindow.compactionTime[ft]
885 1 : if !ok {
886 1 : curWindow.compactionTime[ft] = 0
887 1 : }
888 1 : curWindow.compactionTime[ft] += e.timeEnd.Sub(e.timeStart)
889 :
890 : }
891 : // Add "long-running" events. Those that start in this window
892 : // that have duration longer than the window interval.
893 1 : if e.timeEnd.Sub(e.timeStart) > a.longRunningLimit {
894 1 : curWindow.longRunning = append(curWindow.longRunning, e)
895 1 : }
896 : }
897 :
898 : // Windows are added in order of (node, store, time). Re-sort the windows by
899 : // (time, node, store) for better presentation.
900 1 : sort.Sort(windowsSummarySlice(windows))
901 1 :
902 1 : return windows
903 : }
904 :
905 : // parseLog parses the log file with the given path, using the given parse
906 : // function to collect events in the given logEventCollector. parseLog
907 : // returns a non-nil error if an I/O error was encountered while reading
908 : // the log file. Parsing errors are accumulated in the
909 : // logEventCollector.
910 1 : func parseLog(path string, b *logEventCollector) error {
911 1 : f, err := os.Open(path)
912 1 : if err != nil {
913 0 : return err
914 0 : }
915 1 : defer f.Close()
916 1 :
917 1 : s := bufio.NewScanner(f)
918 1 : for s.Scan() {
919 1 : line := s.Text()
920 1 : // Store the log context for the current line, if we have one.
921 1 : if err := parseLogContext(line, b); err != nil {
922 0 : return err
923 0 : }
924 :
925 : // First check for a flush or compaction.
926 1 : matches := sentinelPattern.FindStringSubmatch(line)
927 1 : if matches != nil {
928 1 : // Determine which regexp to apply by testing the first letter of the prefix.
929 1 : var err error
930 1 : switch matches[sentinelPatternPrefixIdx][0] {
931 1 : case 'c':
932 1 : err = parseCompaction(line, b)
933 1 : case 'f':
934 1 : err = parseFlush(line, b)
935 1 : case 'i':
936 1 : err = parseIngest(line, b)
937 0 : default:
938 0 : err = errors.Newf("unexpected line: neither compaction nor flush: %s", line)
939 : }
940 1 : if err != nil {
941 0 : b.addError(path, line, err)
942 0 : }
943 1 : continue
944 : }
945 :
946 : // Else check for an LSM debug line.
947 1 : if err = parseReadAmp(line, b); err != nil {
948 0 : b.addError(path, line, err)
949 0 : continue
950 : }
951 : }
952 1 : return s.Err()
953 : }
954 :
955 : // parseLogContext extracts contextual information from the log line (e.g. the
956 : // timestamp, node and store).
957 1 : func parseLogContext(line string, b *logEventCollector) error {
958 1 : matches := logContextPattern.FindStringSubmatch(line)
959 1 : if matches == nil {
960 1 : return nil
961 1 : }
962 :
963 : // Parse start time.
964 1 : t, err := time.Parse(timeFmt, matches[logContextPatternTimestampIdx])
965 1 : if err != nil {
966 0 : return errors.Newf("could not parse timestamp: %s", err)
967 0 : }
968 :
969 : // Parse node and store.
970 1 : nodeID, err := strconv.Atoi(matches[logContextPatternNodeIdx])
971 1 : if err != nil {
972 1 : if matches[logContextPatternNodeIdx] != "?" {
973 0 : return errors.Newf("could not parse node ID: %s", err)
974 0 : }
975 1 : nodeID = -1
976 : }
977 :
978 1 : storeID, err := strconv.Atoi(matches[logContextPatternStoreIdx])
979 1 : if err != nil {
980 1 : if matches[logContextPatternStoreIdx] != "?" {
981 0 : return errors.Newf("could not parse store ID: %s", err)
982 0 : }
983 1 : storeID = -1
984 : }
985 :
986 1 : b.saveContext(logContext{
987 1 : timestamp: t,
988 1 : node: nodeID,
989 1 : store: storeID,
990 1 : })
991 1 : return nil
992 : }
993 :
994 : // parseCompaction parses and collects Pebble compaction events.
995 1 : func parseCompaction(line string, b *logEventCollector) error {
996 1 : matches := compactionPattern.FindStringSubmatch(line)
997 1 : if matches == nil {
998 0 : return nil
999 0 : }
1000 :
1001 : // "compacting": implies start line.
1002 1 : if matches[compactionPatternSuffixIdx] == "ing" {
1003 1 : start, err := parseCompactionStart(matches)
1004 1 : if err != nil {
1005 0 : return err
1006 0 : }
1007 1 : if err := b.addCompactionStart(start); err != nil {
1008 0 : return err
1009 0 : }
1010 1 : return nil
1011 : }
1012 :
1013 : // "compacted": implies end line.
1014 1 : end, err := parseCompactionEnd(matches)
1015 1 : if err != nil {
1016 0 : return err
1017 0 : }
1018 :
1019 1 : b.addCompactionEnd(end)
1020 1 : return nil
1021 : }
1022 :
1023 : // parseFlush parses and collects Pebble memtable flush events.
1024 1 : func parseFlush(line string, b *logEventCollector) error {
1025 1 : matches := flushPattern.FindStringSubmatch(line)
1026 1 : if matches == nil {
1027 0 : return nil
1028 0 : }
1029 :
1030 1 : if matches[flushPatternSuffixIdx] == "ing" {
1031 1 : start, err := parseFlushStart(matches)
1032 1 : if err != nil {
1033 0 : return err
1034 0 : }
1035 1 : return b.addCompactionStart(start)
1036 : }
1037 :
1038 1 : end, err := parseFlushEnd(matches)
1039 1 : if err != nil {
1040 0 : return err
1041 0 : }
1042 :
1043 1 : b.addCompactionEnd(end)
1044 1 : return nil
1045 : }
1046 :
1047 1 : func parseIngestDuringFlush(line string, b *logEventCollector) error {
1048 1 : matches := flushableIngestedPattern.FindStringSubmatch(line)
1049 1 : if matches == nil {
1050 0 : return nil
1051 0 : }
1052 : // Parse job ID.
1053 1 : jobID, err := strconv.Atoi(matches[flushableIngestedPatternJobIdx])
1054 1 : if err != nil {
1055 0 : return errors.Newf("could not parse jobID: %s", err)
1056 0 : }
1057 1 : return parseRemainingIngestLogLine(jobID, line, b)
1058 : }
1059 :
1060 : // parseIngest parses and collects Pebble ingest complete events.
1061 1 : func parseIngest(line string, b *logEventCollector) error {
1062 1 : matches := ingestedPattern.FindStringSubmatch(line)
1063 1 : if matches == nil {
1064 1 : // Try and parse the other kind of ingest.
1065 1 : return parseIngestDuringFlush(line, b)
1066 1 : }
1067 : // Parse job ID.
1068 1 : jobID, err := strconv.Atoi(matches[ingestedPatternJobIdx])
1069 1 : if err != nil {
1070 0 : return errors.Newf("could not parse jobID: %s", err)
1071 0 : }
1072 1 : return parseRemainingIngestLogLine(jobID, line, b)
1073 : }
1074 :
1075 : // parses the level, filenum, and bytes for the files which were ingested.
1076 1 : func parseRemainingIngestLogLine(jobID int, line string, b *logEventCollector) error {
1077 1 : fileMatches := ingestedFilePattern.FindAllStringSubmatch(line, -1)
1078 1 : files := make([]ingestedFile, len(fileMatches))
1079 1 : for i := range fileMatches {
1080 1 : level, err := strconv.Atoi(fileMatches[i][ingestedFilePatternLevelIdx])
1081 1 : if err != nil {
1082 0 : return errors.Newf("could not parse level: %s", err)
1083 0 : }
1084 1 : fileNum, err := strconv.Atoi(fileMatches[i][ingestedFilePatternFileIdx])
1085 1 : if err != nil {
1086 0 : return errors.Newf("could not parse file number: %s", err)
1087 0 : }
1088 1 : files[i] = ingestedFile{
1089 1 : level: level,
1090 1 : fileNum: fileNum,
1091 1 : sizeBytes: unHumanize(fileMatches[i][ingestedFilePatternBytesIdx]),
1092 1 : }
1093 : }
1094 1 : b.events = append(b.events, event{
1095 1 : nodeID: b.ctx.node,
1096 1 : storeID: b.ctx.store,
1097 1 : jobID: jobID,
1098 1 : timeStart: b.ctx.timestamp,
1099 1 : timeEnd: b.ctx.timestamp,
1100 1 : ingest: &ingest{
1101 1 : files: files,
1102 1 : },
1103 1 : })
1104 1 : return nil
1105 : }
1106 :
1107 : // parseReadAmp attempts to parse the current line as a read amp value
1108 1 : func parseReadAmp(line string, b *logEventCollector) error {
1109 1 : matches := readAmpPattern.FindStringSubmatch(line)
1110 1 : if matches == nil {
1111 1 : return nil
1112 1 : }
1113 1 : val, err := strconv.Atoi(matches[readAmpPatternValueIdx])
1114 1 : if err != nil {
1115 0 : return errors.Newf("could not parse read amp: %s", err)
1116 0 : }
1117 1 : b.addReadAmp(readAmp{
1118 1 : readAmp: val,
1119 1 : })
1120 1 : return nil
1121 : }
1122 :
1123 : // runCompactionLogs is runnable function of the top-level cobra.Command that
1124 : // parses and collects Pebble compaction events and LSM information.
1125 0 : func runCompactionLogs(cmd *cobra.Command, args []string) error {
1126 0 : // The args contain a list of log files to read.
1127 0 : files := args
1128 0 :
1129 0 : // Scan the log files collecting start and end compaction lines.
1130 0 : b := newEventCollector()
1131 0 : for _, file := range files {
1132 0 : err := parseLog(file, b)
1133 0 : // parseLog returns an error only on I/O errors, which we
1134 0 : // immediately exit with.
1135 0 : if err != nil {
1136 0 : return err
1137 0 : }
1138 : }
1139 :
1140 0 : window, err := cmd.Flags().GetDuration("window")
1141 0 : if err != nil {
1142 0 : return err
1143 0 : }
1144 :
1145 0 : longRunningLimit, err := cmd.Flags().GetDuration("long-running-limit")
1146 0 : if err != nil {
1147 0 : return err
1148 0 : }
1149 0 : if longRunningLimit == 0 {
1150 0 : // Off by default. Set to infinite duration.
1151 0 : longRunningLimit = time.Duration(math.MaxInt64)
1152 0 : }
1153 :
1154 : // Aggregate the lines.
1155 0 : a := newAggregator(window, longRunningLimit, b.events, b.readAmps)
1156 0 : summaries := a.aggregate()
1157 0 : for _, s := range summaries {
1158 0 : fmt.Printf("%s\n", s)
1159 0 : }
1160 :
1161 : // After the summaries, print accumulated parsing errors to stderr.
1162 0 : for _, e := range b.errors {
1163 0 : fmt.Fprintf(os.Stderr, "-\n%s: %s\nError: %s\n", filepath.Base(e.path), e.line, e.err)
1164 0 : }
1165 0 : return nil
1166 : }
1167 :
1168 : // unHumanize performs the opposite of humanize.Bytes.Uint64 (e.g. "10B",
1169 : // "10MB") or the 23.1 humanize.IEC.Uint64 (e.g. "10 B", "10 M"), converting a
1170 : // human-readable value into a raw number of bytes.
1171 1 : func unHumanize(s string) uint64 {
1172 1 : if len(s) < 2 || !(s[0] >= '0' && s[0] <= '9') {
1173 0 : panic(errors.Newf("invalid bytes value %q", s))
1174 : }
1175 1 : if s[len(s)-1] == 'B' {
1176 1 : s = s[:len(s)-1]
1177 1 : }
1178 :
1179 1 : multiplier := uint64(1)
1180 1 : switch s[len(s)-1] {
1181 1 : case 'K':
1182 1 : multiplier = 1 << 10
1183 1 : case 'M':
1184 1 : multiplier = 1 << 20
1185 0 : case 'G':
1186 0 : multiplier = 1 << 30
1187 0 : case 'T':
1188 0 : multiplier = 1 << 40
1189 0 : case 'P':
1190 0 : multiplier = 1 << 50
1191 0 : case 'E':
1192 0 : multiplier = 1 << 60
1193 : }
1194 1 : if multiplier != 1 {
1195 1 : s = s[:len(s)-1]
1196 1 : }
1197 1 : if s[len(s)-1] == ' ' {
1198 1 : s = s[:len(s)-1]
1199 1 : }
1200 1 : val, err := strconv.ParseFloat(s, 64)
1201 1 : if err != nil {
1202 0 : panic(fmt.Sprintf("parsing %s: %v", s, err))
1203 : }
1204 :
1205 1 : return uint64(val * float64(multiplier))
1206 : }
1207 :
1208 : // sumInputBytes takes a string as input and returns the sum of the
1209 : // human-readable sizes, as an integer number of bytes.
1210 1 : func sumInputBytes(s string) (total uint64, _ error) {
1211 1 : var (
1212 1 : open bool
1213 1 : b bytes.Buffer
1214 1 : )
1215 1 : for _, c := range s {
1216 1 : switch c {
1217 1 : case '(':
1218 1 : open = true
1219 1 : case ')':
1220 1 : total += unHumanize(b.String())
1221 1 : b.Reset()
1222 1 : open = false
1223 1 : default:
1224 1 : if open {
1225 1 : b.WriteRune(c)
1226 1 : }
1227 : }
1228 : }
1229 1 : return
1230 : }
|