Line data Source code
1 : package main
2 :
3 : import (
4 : "bufio"
5 : "bytes"
6 : "cmp"
7 : "compress/bzip2"
8 : "compress/gzip"
9 : "encoding/json"
10 : "fmt"
11 : "io"
12 : "math"
13 : "os"
14 : "path/filepath"
15 : "slices"
16 : "sort"
17 : "strings"
18 : "time"
19 :
20 : "github.com/cockroachdb/errors/oserror"
21 : "github.com/spf13/cobra"
22 : )
23 :
24 : // A note to the reader on nomenclature used in this command.
25 : //
26 : // The write-throughput benchmark is generated by a roachtest with a number of
27 : // independent worker VMs running the same benchmark (to allow for an average
28 : // value to be recorded).
29 : //
30 : // An instance of the roachtest on a given day, for a given workload type (e.g.
31 : // values of size 1024B, values of size 64B, etc.) is modelled as a `writeRun`.
32 : // Each worker VM in a `writeRun` produces data modelled as a `rawWriteRun`.
33 : // Each `rawWriteRun` contains the raw data points emitted periodically by the
34 : // VM and are modelled as `writePoint`s.
35 : //
36 : // A `writeWorkload` (i.e. singular) models all data for a particular type of
37 : // benchmark run (e.g. values of size 1024B), across all days. It is a mapping
38 : // of day to `writeRun`, which is a collection of `rawWriteRun`s.
39 : //
40 : // The `writeWorkloads` (i.e. plural) is a mapping from workload name to its
41 : // `writeWorkload`.
42 : //
43 : // The data can be thought of being modelled as follows:
44 : //
45 : // `writeWorkloads`---------\
46 : // - workload-name-A: `writeWorkload`-------\ |
47 : // - day-1: `writeRun`---------\ | |
48 : // - VM-1: `rawWriteRun`----\ | | |
49 : // [ ... raw data point ... ] `writePoint` x | | |
50 : // ... | | |
51 : // - VM-N: | | |
52 : // [ ... raw data point ... ] x | |
53 : // ... | |
54 : // - day-N: | |
55 : // - VM-1: | |
56 : // [ ... raw data point ... ] | |
57 : // ... | |
58 : // - VM-N: | |
59 : // [ ... raw data point ... ] x |
60 : // ... |
61 : // - workload-name-Z: |
62 : // - day-1: |
63 : // - VM-1: |
64 : // [ ... raw data point ... ] |
65 : // ... |
66 : // - VM-N: |
67 : // [ ... raw data point ... ] |
68 : // ... |
69 : // - day-N: |
70 : // - VM-1: |
71 : // [ ... raw data point ... ] |
72 : // ... |
73 : // - VM-N: |
74 : // [ ... raw data point ... ] x
75 :
76 : const (
77 : // summaryFilename is the filename for the top-level summary output.
78 : summaryFilename = "summary.json"
79 :
80 : // rawRunFmt is the format string for raw benchmark data.
81 : rawRunFmt = "BenchmarkRaw%s %d ops/sec %v pass %s elapsed %d bytes %d levels %f writeAmp"
82 : )
83 :
84 1 : func getWriteCommand() *cobra.Command {
85 1 : c := &cobra.Command{
86 1 : Use: "write",
87 1 : Short: "parse write throughput benchmark data",
88 1 : Long: `
89 1 : Parses write-throughput benchmark data into two sets of JSON "summary" files:
90 1 :
91 1 : 1. A top-level summary.json file. Data in this file is reported per-day, per
92 1 : workload (i.e. values=1024, etc.), and is responsible for the top-level
93 1 : write-throughput visualizations on the Pebble benchmarks page.
94 1 :
95 1 : Each data-point for a time-series contains an ops/sec figure (measured as a
96 1 : simple average over all data points for that workload run), and a relative path
97 1 : to a per-run summary JSON file, containing the raw data for the run.
98 1 :
99 1 : 2. A per-run *-summary.json file. Data in this file contains the raw data for
100 1 : each of the benchmark instances participating in the workload run on the given
101 1 : day. Each key in the file is the relative path to the original raw data file.
102 1 : Each data point contains the calculated optimal ops/sec for the instance of the
103 1 : run (see split.go for more detail on the algorithm), in addition to the raw data
104 1 : in CSV format.
105 1 :
106 1 : This command can be run without flags at the root of the directory containing
107 1 : the raw data. By default the raw data will be pulled from "data", and the
108 1 : resulting top-level and per-run summary files are written to "write-throughput".
109 1 : Both locations can be overridden with the --data-dir and --summary-dir flags,
110 1 : respectively.
111 1 : `,
112 1 : RunE: func(cmd *cobra.Command, args []string) error {
113 0 : dataDir, err := cmd.Flags().GetString("data-dir")
114 0 : if err != nil {
115 0 : return err
116 0 : }
117 :
118 0 : summaryDir, err := cmd.Flags().GetString("summary-dir")
119 0 : if err != nil {
120 0 : return err
121 0 : }
122 :
123 0 : return parseWrite(dataDir, summaryDir)
124 : },
125 : }
126 :
127 1 : c.Flags().String("data-dir", "data", "path to the raw data directory")
128 1 : c.Flags().String("summary-dir", "write-throughput", "output directory containing the summary files")
129 1 : c.SilenceUsage = true
130 1 :
131 1 : return c
132 : }
133 :
134 : // writePoint is a raw datapoint from an individual write-throughput benchmark
135 : // run.
136 : type writePoint struct {
137 : elapsedSecs int
138 : opsSec int
139 : passed bool
140 : size uint64
141 : levels int
142 : writeAmp float64
143 : }
144 :
145 : // formatCSV returns a comma-separated string representation of the datapoint.
146 1 : func (p writePoint) formatCSV() string {
147 1 : return fmt.Sprintf(
148 1 : "%d,%d,%v,%d,%d,%.2f",
149 1 : p.elapsedSecs, p.opsSec, p.passed, p.size, p.levels, p.writeAmp)
150 1 : }
151 :
152 : // rawWriteRun is a collection of datapoints from a single instance of a
153 : // benchmark run (i.e. datapoints comprising a single roachtest instance of a
154 : // write-throughput benchmark).
155 : type rawWriteRun struct {
156 : points []writePoint
157 : split int // memoized
158 : }
159 :
160 : // opsPerSecSplit returns an optimal-split point that divides the passes and
161 : // fails from the datapoints in a rawWriteRun.
162 1 : func (r *rawWriteRun) opsPerSecSplit() int {
163 1 : if r.split > 0 {
164 0 : return r.split
165 0 : }
166 :
167 : // Pre-process by partitioning the datapoint into passes and fails.
168 1 : var passes, fails []int
169 1 : for _, p := range r.points {
170 1 : if p.passed {
171 1 : passes = append(passes, p.opsSec)
172 1 : } else {
173 1 : fails = append(fails, p.opsSec)
174 1 : }
175 : }
176 :
177 : // Compute and cache the split point as we only need to calculate it once.
178 1 : split := findOptimalSplit(passes, fails)
179 1 : r.split = split
180 1 :
181 1 : return split
182 : }
183 :
184 : // writeAmp returns the value of the write-amplification at the end of the run.
185 1 : func (r *rawWriteRun) writeAmp() float64 {
186 1 : return r.points[len(r.points)-1].writeAmp
187 1 : }
188 :
189 : // formatCSV returns a comma-separated string representation of the rawWriteRun.
190 : // The value itself is a newline-delimited string value comprised of the CSV
191 : // representation of the individual writePoints.
192 1 : func (r rawWriteRun) formatCSV() string {
193 1 : var b bytes.Buffer
194 1 : for _, p := range r.points {
195 1 : _, _ = fmt.Fprintf(&b, "%s\n", p.formatCSV())
196 1 : }
197 1 : return b.String()
198 : }
199 :
200 : // writeRunSummary represents a single summary datapoint across all rawWriteRuns
201 : // that comprise a writeRun. The datapoint contains a summary ops-per-second
202 : // value, in addition to a path to the summary.json file with the combined data
203 : // for the run.
204 : type writeRunSummary struct {
205 : Name string `json:"name"`
206 : Date string `json:"date"`
207 : OpsSec int `json:"opsSec"`
208 : WriteAmp float64 `json:"writeAmp"`
209 : SummaryPath string `json:"summaryPath"`
210 : }
211 :
212 : // writeWorkloadSummary is an alias for a slice of writeRunSummaries.
213 : type writeWorkloadSummary []writeRunSummary
214 :
215 : // writeRun is a collection of one or more rawWriteRuns (i.e. the union of all
216 : // rawWriteRuns from each worker participating in the roachtest cluster used for
217 : // running the write-throughput benchmarks).
218 : type writeRun struct {
219 : // name is the benchmark workload name (i.e. "values=1024").
220 : name string
221 :
222 : // date is the date on which the writeRun took place.
223 : date string
224 :
225 : // dir is path to the directory containing the raw data. The path is
226 : // relative to the data-dir.
227 : dir string
228 :
229 : // rawRuns is a map from input data filename to its rawWriteRun data.
230 : rawRuns map[string]rawWriteRun
231 : }
232 :
233 : // summaryFilename returns the filename to be used for storing the summary
234 : // output for the writeRun. The filename preserves the original data source path
235 : // for ease of debugging / data-provenance.
236 1 : func (r writeRun) summaryFilename() string {
237 1 : parts := strings.Split(r.dir, string(os.PathSeparator))
238 1 : parts = append(parts, summaryFilename)
239 1 : return strings.Join(parts, "-")
240 1 : }
241 :
242 : // summarize computes a writeRunSummary datapoint for the writeRun.
243 1 : func (r writeRun) summarize() writeRunSummary {
244 1 : var (
245 1 : sumOpsSec int
246 1 : sumWriteAmp float64
247 1 : )
248 1 : for _, rr := range r.rawRuns {
249 1 : sumOpsSec += rr.opsPerSecSplit()
250 1 : sumWriteAmp += rr.writeAmp()
251 1 : }
252 1 : l := len(r.rawRuns)
253 1 :
254 1 : return writeRunSummary{
255 1 : Name: r.name,
256 1 : Date: r.date,
257 1 : SummaryPath: r.summaryFilename(),
258 1 : // Calculate an average across all raw runs in this run.
259 1 : // TODO(travers): test how this works in practice, after we have
260 1 : // gathered enough data.
261 1 : OpsSec: sumOpsSec / l,
262 1 : WriteAmp: math.Round(100*sumWriteAmp/float64(l)) / 100, // round to 2dp.
263 1 : }
264 : }
265 :
266 : // cookedWriteRun is a representation of a previously parsed (or "cooked")
267 : // writeRun.
268 : type cookedWriteRun struct {
269 : OpsSec int `json:"opsSec"`
270 : Raw string `json:"rawData"`
271 : }
272 :
273 : // formatSummaryJSON returns a JSON representation of the combined raw data from
274 : // all rawWriteRuns that comprise the writeRun. It has the form:
275 : //
276 : // {
277 : // "original-raw-write-run-log-file-1.gz": {
278 : // "opsSec": ...,
279 : // "raw": ...,
280 : // },
281 : // ...
282 : // "original-raw-write-run-log-file-N.gz": {
283 : // "opsSec": ...,
284 : // "raw": ...,
285 : // },
286 : // }
287 1 : func (r writeRun) formatSummaryJSON() ([]byte, error) {
288 1 : m := make(map[string]cookedWriteRun)
289 1 : for name, data := range r.rawRuns {
290 1 : m[name] = cookedWriteRun{
291 1 : OpsSec: data.opsPerSecSplit(),
292 1 : Raw: data.formatCSV(),
293 1 : }
294 1 : }
295 1 : return prettyJSON(&m), nil
296 : }
297 :
298 : // write workload is a map from "day" to corresponding writeRun, for a given
299 : // write-throughput benchmark workload (i.e. values=1024).
300 : type writeWorkload struct {
301 : days map[string]*writeRun // map from day to runs for the given workload
302 : }
303 :
304 : // writeWorkloads is an alias for a map from workload name to its corresponding
305 : // map from day to writeRun.
306 : type writeWorkloads map[string]*writeWorkload
307 :
308 : // nameDay is a (name, day) tuple, used as a map key.
309 : type nameDay struct {
310 : name, day string
311 : }
312 :
313 : type writeLoader struct {
314 : // rootDir is the path to the root directory containing the data.
315 : dataDir string
316 :
317 : // summaryFilename is the name of the file containing the summary data.
318 : summaryDir string
319 :
320 : // workloads is a map from workload name to its corresponding data.
321 : workloads writeWorkloads
322 :
323 : // cooked is a "set" of (workload, day) tuples representing whether
324 : // previously parsed data was present for the (workload, day).
325 : cooked map[nameDay]bool
326 :
327 : // cookedSummaries is a map from workload name to previously generated data
328 : // for the workload. This data is "mixed-in" with new data when the summary
329 : // files are written out.
330 : cookedSummaries map[string]writeWorkloadSummary
331 : }
332 :
333 : // newWriteLoader returns a new writeLoader that can be used to generate the
334 : // summary files for write-throughput benchmarking data.
335 1 : func newWriteLoader(dataDir, summaryDir string) *writeLoader {
336 1 : return &writeLoader{
337 1 : dataDir: dataDir,
338 1 : summaryDir: summaryDir,
339 1 : workloads: make(writeWorkloads),
340 1 : cooked: make(map[nameDay]bool),
341 1 : cookedSummaries: make(map[string]writeWorkloadSummary),
342 1 : }
343 1 : }
344 :
345 : // loadCooked loads previously summarized write throughput benchmark data.
346 1 : func (l *writeLoader) loadCooked() error {
347 1 : b, err := os.ReadFile(filepath.Join(l.summaryDir, summaryFilename))
348 1 : if err != nil {
349 1 : // The first ever run will not find the summary file. Return early in
350 1 : // this case, and we'll start afresh.
351 1 : if oserror.IsNotExist(err) {
352 1 : return nil
353 1 : }
354 0 : return err
355 : }
356 :
357 : // Reconstruct the summary.
358 1 : summaries := make(map[string]writeWorkloadSummary)
359 1 : err = json.Unmarshal(b, &summaries)
360 1 : if err != nil {
361 0 : return err
362 0 : }
363 :
364 : // Populate the cooked map.
365 1 : l.cookedSummaries = summaries
366 1 :
367 1 : // Populate the set used for determining whether we can skip a raw file.
368 1 : for name, workloadSummary := range summaries {
369 1 : for _, runSummary := range workloadSummary {
370 1 : l.cooked[nameDay{name, runSummary.Date}] = true
371 1 : }
372 : }
373 :
374 1 : return nil
375 : }
376 :
377 : // loadRaw loads the raw data from the root data directory.
378 1 : func (l *writeLoader) loadRaw() error {
379 1 : walkFn := func(path, pathRel string, info os.FileInfo) error {
380 1 : // The relative directory structure is of the form:
381 1 : // $day/pebble/write/$name/$run/$file
382 1 : parts := strings.Split(pathRel, string(os.PathSeparator))
383 1 : if len(parts) < 6 {
384 1 : return nil // stumble forward on invalid paths
385 1 : }
386 :
387 : // Filter out files that aren't in write benchmark directories.
388 1 : if parts[2] != "write" {
389 1 : return nil
390 1 : }
391 1 : day := parts[0]
392 1 :
393 1 : f, err := os.Open(path)
394 1 : if err != nil {
395 0 : _, _ = fmt.Fprintf(os.Stderr, "%+v\n", err)
396 0 : return nil // stumble forward on error
397 0 : }
398 1 : defer func() { _ = f.Close() }()
399 :
400 1 : rd := io.Reader(f)
401 1 : if strings.HasSuffix(path, ".bz2") {
402 0 : rd = bzip2.NewReader(f)
403 1 : } else if strings.HasSuffix(path, ".gz") {
404 1 : var err error
405 1 : rd, err = gzip.NewReader(f)
406 1 : if err != nil {
407 0 : _, _ = fmt.Fprintf(os.Stderr, "%+v\n", err)
408 0 : return nil // stumble forward on error
409 0 : }
410 : }
411 :
412 : // Parse the data for this file and add to the appropriate workload.
413 1 : s := bufio.NewScanner(rd)
414 1 : r := rawWriteRun{}
415 1 : var name string
416 1 : for s.Scan() {
417 1 : line := s.Text()
418 1 : if !strings.HasPrefix(line, "BenchmarkRaw") {
419 0 : continue
420 : }
421 :
422 1 : var p writePoint
423 1 : var nameInner, elapsed string
424 1 : n, err := fmt.Sscanf(line, rawRunFmt,
425 1 : &nameInner, &p.opsSec, &p.passed, &elapsed, &p.size, &p.levels, &p.writeAmp)
426 1 : if err != nil || n != 7 {
427 0 : // Stumble forward on error.
428 0 : _, _ = fmt.Fprintf(os.Stderr, "%s: %v\n", s.Text(), err)
429 0 : continue
430 : }
431 :
432 : // The first datapoint we see in the file is assumed to be the same
433 : // for all datapoints.
434 1 : if name == "" {
435 1 : name = nameInner
436 1 :
437 1 : // Skip files for (workload, day) pairs that have been parsed
438 1 : // previously. Note that this relies on loadCooked having been
439 1 : // called previously to seed the map with cooked data.
440 1 : if ok := l.cooked[nameDay{name, day}]; ok {
441 1 : _, _ = fmt.Fprintf(os.Stderr,
442 1 : "skipping previously cooked data in file %s (workload=%q, day=%q)\n",
443 1 : pathRel, name, day)
444 1 : return nil
445 1 : }
446 1 : } else if name != nameInner {
447 0 : _, _ = fmt.Fprintf(os.Stderr,
448 0 : "WARN: benchmark name %q differs from previously seen name %q: %s",
449 0 : nameInner, name, s.Text())
450 0 : }
451 :
452 : // Convert the elapsed time into seconds.
453 1 : secs, err := time.ParseDuration(elapsed)
454 1 : if err != nil {
455 0 : // Stumble forward on error.
456 0 : _, _ = fmt.Fprintf(os.Stderr, "%s: %v\n", s.Text(), err)
457 0 : continue
458 : }
459 1 : p.elapsedSecs = int(secs.Seconds())
460 1 :
461 1 : // Add this data point to the collection of points for this run.
462 1 : r.points = append(r.points, p)
463 : }
464 :
465 : // Add the raw run to the map.
466 1 : l.addRawRun(name, day, pathRel, r)
467 1 :
468 1 : return nil
469 : }
470 1 : return walkDir(l.dataDir, walkFn)
471 : }
472 :
473 : // addRawRun adds a rawWriteRun to the corresponding datastructures by looking
474 : // up the workload name (i.e. "values=1024"), then appending the rawWriteRun to
475 : // the corresponding slice of all rawWriteRuns.
476 1 : func (l *writeLoader) addRawRun(name, day, path string, raw rawWriteRun) {
477 1 : // Skip files with no points (i.e. files that couldn't be parsed).
478 1 : if len(raw.points) == 0 {
479 0 : return
480 0 : }
481 :
482 1 : _, _ = fmt.Fprintf(
483 1 : os.Stderr, "adding raw run: (workload=%q, day=%q); nPoints=%d; file=%s\n",
484 1 : name, day, len(raw.points), path)
485 1 :
486 1 : w := l.workloads[name]
487 1 : if w == nil {
488 1 : w = &writeWorkload{days: make(map[string]*writeRun)}
489 1 : l.workloads[name] = w
490 1 : }
491 :
492 1 : r := w.days[day]
493 1 : if r == nil {
494 1 : r = &writeRun{
495 1 : name: name,
496 1 : date: day,
497 1 : dir: filepath.Dir(path),
498 1 : rawRuns: make(map[string]rawWriteRun),
499 1 : }
500 1 : w.days[day] = r
501 1 : }
502 1 : r.rawRuns[path] = raw
503 : }
504 :
505 : // cookSummary writes out the data in the loader to the summary file (new or
506 : // existing).
507 1 : func (l *writeLoader) cookSummary() error {
508 1 : summary := make(map[string]writeWorkloadSummary)
509 1 : for name, w := range l.workloads {
510 1 : summary[name] = cookWriteSummary(w)
511 1 : }
512 :
513 : // Mix in the previously cooked values.
514 1 : for name, cooked := range l.cookedSummaries {
515 1 : existing, ok := summary[name]
516 1 : if !ok {
517 0 : summary[name] = cooked
518 1 : } else {
519 1 : // We must merge and re-sort by date.
520 1 : existing = append(existing, cooked...)
521 1 : slices.SortFunc(existing, func(a, b writeRunSummary) int {
522 1 : return cmp.Compare(a.Date, b.Date)
523 1 : })
524 1 : summary[name] = existing
525 : }
526 : }
527 1 : b := prettyJSON(&summary)
528 1 : b = append(b, '\n')
529 1 :
530 1 : outputPath := filepath.Join(l.summaryDir, summaryFilename)
531 1 : err := os.WriteFile(outputPath, b, 0644)
532 1 : if err != nil {
533 0 : return err
534 0 : }
535 :
536 1 : return nil
537 : }
538 :
539 : // cookWriteSummary is a helper that generates the summary for a write workload
540 : // by computing the per-day summaries across all runs.
541 1 : func cookWriteSummary(w *writeWorkload) writeWorkloadSummary {
542 1 : days := make([]string, 0, len(w.days))
543 1 : for day := range w.days {
544 1 : days = append(days, day)
545 1 : }
546 1 : sort.Strings(days)
547 1 :
548 1 : var summary writeWorkloadSummary
549 1 : for _, day := range days {
550 1 : r := w.days[day]
551 1 : summary = append(summary, r.summarize())
552 1 : }
553 :
554 1 : return summary
555 : }
556 :
557 : // cookWriteRunSummaries writes out the per-run summary files.
558 1 : func (l *writeLoader) cookWriteRunSummaries() error {
559 1 : for _, w := range l.workloads {
560 1 : for _, r := range w.days {
561 1 : // Write out files preserving the original directory structure for
562 1 : // ease of understanding / debugging.
563 1 : outputPath := filepath.Join(l.summaryDir, r.summaryFilename())
564 1 : if err := outputWriteRunSummary(r, outputPath); err != nil {
565 0 : return err
566 0 : }
567 : }
568 : }
569 1 : return nil
570 : }
571 :
572 : // outputWriteRunSummary is a helper that generates the summary JSON for the
573 : // writeRun and writes it to the given output path.
574 1 : func outputWriteRunSummary(r *writeRun, outputPath string) error {
575 1 : f, err := os.OpenFile(outputPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
576 1 : if err != nil {
577 0 : return err
578 0 : }
579 1 : defer func() { _ = f.Close() }()
580 :
581 1 : b, err := r.formatSummaryJSON()
582 1 : if err != nil {
583 0 : return err
584 0 : }
585 1 : b = append(b, '\n')
586 1 :
587 1 : _, err = f.Write(b)
588 1 : return err
589 : }
590 :
591 : // parseWrite parses the raw write-throughput benchmark data and writes out the
592 : // summary files.
593 1 : func parseWrite(dataDir, summaryDir string) error {
594 1 : l := newWriteLoader(dataDir, summaryDir)
595 1 : if err := l.loadCooked(); err != nil {
596 0 : return err
597 0 : }
598 :
599 1 : if err := l.loadRaw(); err != nil {
600 0 : return err
601 0 : }
602 :
603 1 : if err := l.cookSummary(); err != nil {
604 0 : return err
605 0 : }
606 :
607 1 : return l.cookWriteRunSummaries()
608 : }
|