LCOV - code coverage report
Current view: top level - pebble/replay - replay.go (source / functions) Hit Total Coverage
Test: 2024-01-10 08:16Z d50db878 - tests only.lcov Lines: 582 741 78.5 %
Date: 2024-01-10 08:16:40 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package replay implements collection and replaying of compaction benchmarking
       6             : // workloads. A workload is a collection of flushed and ingested sstables, along
       7             : // with the corresponding manifests describing the order and grouping with which
       8             : // they were applied. Replaying a workload flushes and ingests the same keys and
       9             : // sstables to reproduce the write workload for the purpose of evaluating
      10             : // compaction heuristics.
      11             : package replay
      12             : 
      13             : import (
      14             :         "context"
      15             :         "encoding/binary"
      16             :         "fmt"
      17             :         "io"
      18             :         "os"
      19             :         "sort"
      20             :         "strings"
      21             :         "sync"
      22             :         "sync/atomic"
      23             :         "time"
      24             : 
      25             :         "github.com/cockroachdb/errors"
      26             :         "github.com/cockroachdb/pebble"
      27             :         "github.com/cockroachdb/pebble/internal/base"
      28             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      29             :         "github.com/cockroachdb/pebble/internal/manifest"
      30             :         "github.com/cockroachdb/pebble/internal/rangedel"
      31             :         "github.com/cockroachdb/pebble/internal/rangekey"
      32             :         "github.com/cockroachdb/pebble/record"
      33             :         "github.com/cockroachdb/pebble/sstable"
      34             :         "github.com/cockroachdb/pebble/vfs"
      35             :         "golang.org/x/perf/benchfmt"
      36             :         "golang.org/x/sync/errgroup"
      37             : )
      38             : 
      39             : // A Pacer paces replay of a workload, determining when to apply the next
      40             : // incoming write.
      41             : type Pacer interface {
      42             :         pace(r *Runner, step workloadStep) time.Duration
      43             : }
      44             : 
      45             : // computeReadAmp calculates the read amplification from a manifest.Version
      46           0 : func computeReadAmp(v *manifest.Version) int {
      47           0 :         refRAmp := v.L0Sublevels.ReadAmplification()
      48           0 :         for _, lvl := range v.Levels[1:] {
      49           0 :                 if !lvl.Empty() {
      50           0 :                         refRAmp++
      51           0 :                 }
      52             :         }
      53           0 :         return refRAmp
      54             : }
      55             : 
      56             : // waitForReadAmpLE is a common function used by PaceByReferenceReadAmp and
      57             : // PaceByFixedReadAmp to wait on the dbMetricsNotifier condition variable if the
      58             : // read amplification observed is greater than the specified target (refRAmp).
      59           1 : func waitForReadAmpLE(r *Runner, rAmp int) {
      60           1 :         r.dbMetricsCond.L.Lock()
      61           1 :         m := r.dbMetrics
      62           1 :         ra := m.ReadAmp()
      63           1 :         for ra > rAmp {
      64           1 :                 r.dbMetricsCond.Wait()
      65           1 :                 ra = r.dbMetrics.ReadAmp()
      66           1 :         }
      67           1 :         r.dbMetricsCond.L.Unlock()
      68             : }
      69             : 
      70             : // Unpaced implements Pacer by applying each new write as soon as possible. It
      71             : // may be useful for examining performance under high read amplification.
      72             : type Unpaced struct{}
      73             : 
      74           1 : func (Unpaced) pace(*Runner, workloadStep) (d time.Duration) { return }
      75             : 
      76             : // PaceByReferenceReadAmp implements Pacer by applying each new write following
      77             : // the collected workloads read amplification.
      78             : type PaceByReferenceReadAmp struct{}
      79             : 
      80           0 : func (PaceByReferenceReadAmp) pace(r *Runner, w workloadStep) time.Duration {
      81           0 :         startTime := time.Now()
      82           0 :         refRAmp := computeReadAmp(w.pv)
      83           0 :         waitForReadAmpLE(r, refRAmp)
      84           0 :         return time.Since(startTime)
      85           0 : }
      86             : 
      87             : // PaceByFixedReadAmp implements Pacer by applying each new write following a
      88             : // fixed read amplification.
      89             : type PaceByFixedReadAmp int
      90             : 
      91           1 : func (pra PaceByFixedReadAmp) pace(r *Runner, _ workloadStep) time.Duration {
      92           1 :         startTime := time.Now()
      93           1 :         waitForReadAmpLE(r, int(pra))
      94           1 :         return time.Since(startTime)
      95           1 : }
      96             : 
      97             : // Metrics holds the various statistics on a replay run and its performance.
      98             : type Metrics struct {
      99             :         CompactionCounts struct {
     100             :                 Total       int64
     101             :                 Default     int64
     102             :                 DeleteOnly  int64
     103             :                 ElisionOnly int64
     104             :                 Move        int64
     105             :                 Read        int64
     106             :                 Rewrite     int64
     107             :                 MultiLevel  int64
     108             :         }
     109             :         EstimatedDebt SampledMetric
     110             :         Final         *pebble.Metrics
     111             :         Ingest        struct {
     112             :                 BytesIntoL0 uint64
     113             :                 // BytesWeightedByLevel is calculated as the number of bytes ingested
     114             :                 // into a level multiplied by the level's distance from the bottommost
     115             :                 // level (L6), summed across all levels. It can be used to guage how
     116             :                 // effective heuristics are at ingesting files into lower levels, saving
     117             :                 // write amplification.
     118             :                 BytesWeightedByLevel uint64
     119             :         }
     120             :         // PaceDuration is the time waiting for the pacer to allow the workload to
     121             :         // continue.
     122             :         PaceDuration time.Duration
     123             :         ReadAmp      SampledMetric
     124             :         // QuiesceDuration is the time between completing application of the workload and
     125             :         // compactions quiescing.
     126             :         QuiesceDuration time.Duration
     127             :         TombstoneCount  SampledMetric
     128             :         // TotalSize holds the total size of the database, sampled after each
     129             :         // workload step.
     130             :         TotalSize           SampledMetric
     131             :         TotalWriteAmp       float64
     132             :         WorkloadDuration    time.Duration
     133             :         WriteBytes          uint64
     134             :         WriteStalls         map[string]int
     135             :         WriteStallsDuration map[string]time.Duration
     136             :         WriteThroughput     SampledMetric
     137             : }
     138             : 
     139             : // Plot holds an ascii plot and its name.
     140             : type Plot struct {
     141             :         Name string
     142             :         Plot string
     143             : }
     144             : 
     145             : // Plots returns a slice of ascii plots describing metrics change over time.
     146           0 : func (m *Metrics) Plots(width, height int) []Plot {
     147           0 :         const scaleMB = 1.0 / float64(1<<20)
     148           0 :         return []Plot{
     149           0 :                 {Name: "Write throughput (MB/s)", Plot: m.WriteThroughput.PlotIncreasingPerSec(width, height, scaleMB)},
     150           0 :                 {Name: "Estimated compaction debt (MB)", Plot: m.EstimatedDebt.Plot(width, height, scaleMB)},
     151           0 :                 {Name: "Total database size (MB)", Plot: m.TotalSize.Plot(width, height, scaleMB)},
     152           0 :                 {Name: "ReadAmp", Plot: m.ReadAmp.Plot(width, height, 1.0)},
     153           0 :         }
     154           0 : }
     155             : 
     156             : // WriteBenchmarkString writes the metrics in the form of a series of
     157             : // 'Benchmark' lines understandable by benchstat.
     158           1 : func (m *Metrics) WriteBenchmarkString(name string, w io.Writer) error {
     159           1 :         type benchmarkSection struct {
     160           1 :                 label  string
     161           1 :                 values []benchfmt.Value
     162           1 :         }
     163           1 :         groups := []benchmarkSection{
     164           1 :                 {label: "CompactionCounts", values: []benchfmt.Value{
     165           1 :                         {Value: float64(m.CompactionCounts.Total), Unit: "compactions"},
     166           1 :                         {Value: float64(m.CompactionCounts.Default), Unit: "default"},
     167           1 :                         {Value: float64(m.CompactionCounts.DeleteOnly), Unit: "delete"},
     168           1 :                         {Value: float64(m.CompactionCounts.ElisionOnly), Unit: "elision"},
     169           1 :                         {Value: float64(m.CompactionCounts.Move), Unit: "move"},
     170           1 :                         {Value: float64(m.CompactionCounts.Read), Unit: "read"},
     171           1 :                         {Value: float64(m.CompactionCounts.Rewrite), Unit: "rewrite"},
     172           1 :                         {Value: float64(m.CompactionCounts.MultiLevel), Unit: "multilevel"},
     173           1 :                 }},
     174           1 :                 // Total database sizes sampled after every workload step and
     175           1 :                 // compaction. This can be used to evaluate the relative LSM space
     176           1 :                 // amplification between runs of the same workload. Calculating the true
     177           1 :                 // space amplification continuously is prohibitvely expensive (it
     178           1 :                 // requires totally compacting a copy of the LSM).
     179           1 :                 {label: "DatabaseSize/mean", values: []benchfmt.Value{
     180           1 :                         {Value: m.TotalSize.Mean(), Unit: "bytes"},
     181           1 :                 }},
     182           1 :                 {label: "DatabaseSize/max", values: []benchfmt.Value{
     183           1 :                         {Value: float64(m.TotalSize.Max()), Unit: "bytes"},
     184           1 :                 }},
     185           1 :                 // Time applying the workload and time waiting for compactions to
     186           1 :                 // quiesce after the workload has completed.
     187           1 :                 {label: "DurationWorkload", values: []benchfmt.Value{
     188           1 :                         {Value: m.WorkloadDuration.Seconds(), Unit: "sec/op"},
     189           1 :                 }},
     190           1 :                 {label: "DurationQuiescing", values: []benchfmt.Value{
     191           1 :                         {Value: m.QuiesceDuration.Seconds(), Unit: "sec/op"},
     192           1 :                 }},
     193           1 :                 {label: "DurationPaceDelay", values: []benchfmt.Value{
     194           1 :                         {Value: m.PaceDuration.Seconds(), Unit: "sec/op"},
     195           1 :                 }},
     196           1 :                 // Estimated compaction debt, sampled after every workload step and
     197           1 :                 // compaction.
     198           1 :                 {label: "EstimatedDebt/mean", values: []benchfmt.Value{
     199           1 :                         {Value: m.EstimatedDebt.Mean(), Unit: "bytes"},
     200           1 :                 }},
     201           1 :                 {label: "EstimatedDebt/max", values: []benchfmt.Value{
     202           1 :                         {Value: float64(m.EstimatedDebt.Max()), Unit: "bytes"},
     203           1 :                 }},
     204           1 :                 {label: "FlushUtilization", values: []benchfmt.Value{
     205           1 :                         {Value: m.Final.Flush.WriteThroughput.Utilization(), Unit: "util"},
     206           1 :                 }},
     207           1 :                 {label: "IngestedIntoL0", values: []benchfmt.Value{
     208           1 :                         {Value: float64(m.Ingest.BytesIntoL0), Unit: "bytes"},
     209           1 :                 }},
     210           1 :                 {label: "IngestWeightedByLevel", values: []benchfmt.Value{
     211           1 :                         {Value: float64(m.Ingest.BytesWeightedByLevel), Unit: "bytes"},
     212           1 :                 }},
     213           1 :                 {label: "ReadAmp/mean", values: []benchfmt.Value{
     214           1 :                         {Value: m.ReadAmp.Mean(), Unit: "files"},
     215           1 :                 }},
     216           1 :                 {label: "ReadAmp/max", values: []benchfmt.Value{
     217           1 :                         {Value: float64(m.ReadAmp.Max()), Unit: "files"},
     218           1 :                 }},
     219           1 :                 {label: "TombstoneCount/mean", values: []benchfmt.Value{
     220           1 :                         {Value: m.TombstoneCount.Mean(), Unit: "tombstones"},
     221           1 :                 }},
     222           1 :                 {label: "TombstoneCount/max", values: []benchfmt.Value{
     223           1 :                         {Value: float64(m.TombstoneCount.Max()), Unit: "tombstones"},
     224           1 :                 }},
     225           1 :                 {label: "Throughput", values: []benchfmt.Value{
     226           1 :                         {Value: float64(m.WriteBytes) / (m.WorkloadDuration + m.QuiesceDuration).Seconds(), Unit: "B/s"},
     227           1 :                 }},
     228           1 :                 {label: "WriteAmp", values: []benchfmt.Value{
     229           1 :                         {Value: float64(m.TotalWriteAmp), Unit: "wamp"},
     230           1 :                 }},
     231           1 :         }
     232           1 : 
     233           1 :         for _, reason := range []string{"L0", "memtable"} {
     234           1 :                 groups = append(groups, benchmarkSection{
     235           1 :                         label: fmt.Sprintf("WriteStall/%s", reason),
     236           1 :                         values: []benchfmt.Value{
     237           1 :                                 {Value: float64(m.WriteStalls[reason]), Unit: "stalls"},
     238           1 :                                 {Value: float64(m.WriteStallsDuration[reason].Seconds()), Unit: "stallsec/op"},
     239           1 :                         },
     240           1 :                 })
     241           1 :         }
     242             : 
     243           1 :         bw := benchfmt.NewWriter(w)
     244           1 :         for _, grp := range groups {
     245           1 :                 err := bw.Write(&benchfmt.Result{
     246           1 :                         Name:   benchfmt.Name(fmt.Sprintf("BenchmarkReplay/%s/%s", name, grp.label)),
     247           1 :                         Iters:  1,
     248           1 :                         Values: grp.values,
     249           1 :                 })
     250           1 :                 if err != nil {
     251           0 :                         return err
     252           0 :                 }
     253             :         }
     254           1 :         return nil
     255             : }
     256             : 
     257             : // Runner runs a captured workload against a test database, collecting
     258             : // metrics on performance.
     259             : type Runner struct {
     260             :         RunDir        string
     261             :         WorkloadFS    vfs.FS
     262             :         WorkloadPath  string
     263             :         Pacer         Pacer
     264             :         Opts          *pebble.Options
     265             :         MaxWriteBytes uint64
     266             : 
     267             :         // Internal state.
     268             : 
     269             :         d *pebble.DB
     270             :         // dbMetrics and dbMetricsCond work in unison to update the metrics and
     271             :         // notify (broadcast) to any waiting clients that metrics have been updated.
     272             :         dbMetrics     *pebble.Metrics
     273             :         dbMetricsCond sync.Cond
     274             :         cancel        func()
     275             :         err           atomic.Value
     276             :         errgroup      *errgroup.Group
     277             :         readerOpts    sstable.ReaderOptions
     278             :         stagingDir    string
     279             :         steps         chan workloadStep
     280             :         stepsApplied  chan workloadStep
     281             : 
     282             :         metrics struct {
     283             :                 estimatedDebt    SampledMetric
     284             :                 quiesceDuration  time.Duration
     285             :                 readAmp          SampledMetric
     286             :                 tombstoneCount   SampledMetric
     287             :                 totalSize        SampledMetric
     288             :                 paceDurationNano atomic.Uint64
     289             :                 workloadDuration time.Duration
     290             :                 writeBytes       atomic.Uint64
     291             :                 writeThroughput  SampledMetric
     292             :         }
     293             :         writeStallMetrics struct {
     294             :                 sync.Mutex
     295             :                 countByReason    map[string]int
     296             :                 durationByReason map[string]time.Duration
     297             :         }
     298             :         // compactionMu holds state for tracking the number of compactions
     299             :         // started and completed and waking waiting goroutines when a new compaction
     300             :         // completes. See nextCompactionCompletes.
     301             :         compactionMu struct {
     302             :                 sync.Mutex
     303             :                 ch        chan struct{}
     304             :                 started   int64
     305             :                 completed int64
     306             :         }
     307             :         workload struct {
     308             :                 manifests []string
     309             :                 // manifest{Idx,Off} record the starting position of the workload
     310             :                 // relative to the initial database state.
     311             :                 manifestIdx int
     312             :                 manifestOff int64
     313             :                 // sstables records the set of captured workload sstables by file num.
     314             :                 sstables map[base.FileNum]struct{}
     315             :         }
     316             : }
     317             : 
     318             : // Run begins executing the workload and returns.
     319             : //
     320             : // The workload application will respect the provided context's cancellation.
     321           1 : func (r *Runner) Run(ctx context.Context) error {
     322           1 :         // Find the workload start relative to the RunDir's existing database state.
     323           1 :         // A prefix of the workload's manifest edits are expected to have already
     324           1 :         // been applied to the checkpointed existing database state.
     325           1 :         var err error
     326           1 :         r.workload.manifests, r.workload.sstables, err = findWorkloadFiles(r.WorkloadPath, r.WorkloadFS)
     327           1 :         if err != nil {
     328           0 :                 return err
     329           0 :         }
     330           1 :         r.workload.manifestIdx, r.workload.manifestOff, err = findManifestStart(r.RunDir, r.Opts.FS, r.workload.manifests)
     331           1 :         if err != nil {
     332           0 :                 return err
     333           0 :         }
     334             : 
     335             :         // Set up a staging dir for files that will be ingested.
     336           1 :         r.stagingDir = r.Opts.FS.PathJoin(r.RunDir, "staging")
     337           1 :         if err := r.Opts.FS.MkdirAll(r.stagingDir, os.ModePerm); err != nil {
     338           0 :                 return err
     339           0 :         }
     340             : 
     341           1 :         r.dbMetricsCond = sync.Cond{
     342           1 :                 L: &sync.Mutex{},
     343           1 :         }
     344           1 : 
     345           1 :         // Extend the user-provided Options with extensions necessary for replay
     346           1 :         // mechanics.
     347           1 :         r.compactionMu.ch = make(chan struct{})
     348           1 :         r.Opts.AddEventListener(r.eventListener())
     349           1 :         r.writeStallMetrics.countByReason = make(map[string]int)
     350           1 :         r.writeStallMetrics.durationByReason = make(map[string]time.Duration)
     351           1 :         r.Opts.EnsureDefaults()
     352           1 :         r.readerOpts = r.Opts.MakeReaderOptions()
     353           1 :         r.Opts.DisableWAL = true
     354           1 :         r.d, err = pebble.Open(r.RunDir, r.Opts)
     355           1 :         if err != nil {
     356           0 :                 return err
     357           0 :         }
     358             : 
     359           1 :         r.dbMetrics = r.d.Metrics()
     360           1 : 
     361           1 :         // Use a buffered channel to allow the prepareWorkloadSteps to read ahead,
     362           1 :         // buffering up to cap(r.steps) steps ahead of the current applied state.
     363           1 :         // Flushes need to be buffered and ingested sstables need to be copied, so
     364           1 :         // pipelining this preparation makes it more likely the step will be ready
     365           1 :         // to apply when the pacer decides to apply it.
     366           1 :         r.steps = make(chan workloadStep, 5)
     367           1 :         r.stepsApplied = make(chan workloadStep, 5)
     368           1 : 
     369           1 :         ctx, r.cancel = context.WithCancel(ctx)
     370           1 :         r.errgroup, ctx = errgroup.WithContext(ctx)
     371           1 :         r.errgroup.Go(func() error { return r.prepareWorkloadSteps(ctx) })
     372           1 :         r.errgroup.Go(func() error { return r.applyWorkloadSteps(ctx) })
     373           1 :         r.errgroup.Go(func() error { return r.refreshMetrics(ctx) })
     374           1 :         return nil
     375             : }
     376             : 
     377             : // refreshMetrics runs in its own goroutine, collecting metrics from the Pebble
     378             : // instance whenever a) a workload step completes, or b) a compaction completes.
     379             : // The Pacer implementations that pace based on read-amplification rely on these
     380             : // refreshed metrics to decide when to allow the workload to proceed.
     381           1 : func (r *Runner) refreshMetrics(ctx context.Context) error {
     382           1 :         startAt := time.Now()
     383           1 :         var workloadExhausted bool
     384           1 :         var workloadExhaustedAt time.Time
     385           1 :         stepsApplied := r.stepsApplied
     386           1 :         compactionCount, alreadyCompleted, compactionCh := r.nextCompactionCompletes(0)
     387           1 :         for {
     388           1 :                 if !alreadyCompleted {
     389           1 :                         select {
     390           0 :                         case <-ctx.Done():
     391           0 :                                 return ctx.Err()
     392           1 :                         case <-compactionCh:
     393             :                                 // Fall through to refreshing dbMetrics.
     394           1 :                         case _, ok := <-stepsApplied:
     395           1 :                                 if !ok {
     396           1 :                                         workloadExhausted = true
     397           1 :                                         workloadExhaustedAt = time.Now()
     398           1 :                                         // Set the [stepsApplied] channel to nil so that we'll never
     399           1 :                                         // hit this case again, and we don't busy loop.
     400           1 :                                         stepsApplied = nil
     401           1 :                                         // Record the replay time.
     402           1 :                                         r.metrics.workloadDuration = workloadExhaustedAt.Sub(startAt)
     403           1 :                                 }
     404             :                                 // Fall through to refreshing dbMetrics.
     405             :                         }
     406             :                 }
     407             : 
     408           1 :                 m := r.d.Metrics()
     409           1 :                 r.dbMetricsCond.L.Lock()
     410           1 :                 r.dbMetrics = m
     411           1 :                 r.dbMetricsCond.Broadcast()
     412           1 :                 r.dbMetricsCond.L.Unlock()
     413           1 : 
     414           1 :                 // Collect sample metrics. These metrics are calculated by sampling
     415           1 :                 // every time we collect metrics.
     416           1 :                 r.metrics.readAmp.record(int64(m.ReadAmp()))
     417           1 :                 r.metrics.estimatedDebt.record(int64(m.Compact.EstimatedDebt))
     418           1 :                 r.metrics.tombstoneCount.record(int64(m.Keys.TombstoneCount))
     419           1 :                 r.metrics.totalSize.record(int64(m.DiskSpaceUsage()))
     420           1 :                 r.metrics.writeThroughput.record(int64(r.metrics.writeBytes.Load()))
     421           1 : 
     422           1 :                 compactionCount, alreadyCompleted, compactionCh = r.nextCompactionCompletes(compactionCount)
     423           1 :                 // Consider whether replaying is complete. There are two necessary
     424           1 :                 // conditions:
     425           1 :                 //
     426           1 :                 //   1. The workload must be exhausted.
     427           1 :                 //   2. Compactions must have quiesced.
     428           1 :                 //
     429           1 :                 // The first condition is simple. The replay tool is responsible for
     430           1 :                 // applying the workload. The goroutine responsible for applying the
     431           1 :                 // workload closes the `stepsApplied` channel after the last step has
     432           1 :                 // been applied, and we'll flip `workloadExhausted` to true.
     433           1 :                 //
     434           1 :                 // The second condition is tricky. The replay tool doesn't control
     435           1 :                 // compactions and doesn't have visibility into whether the compaction
     436           1 :                 // picker is about to schedule a new compaction. We can tell when
     437           1 :                 // compactions are in progress or may be immeninent (eg, flushes in
     438           1 :                 // progress). If it appears that compactions have quiesced, pause for a
     439           1 :                 // fixed duration to see if a new one is scheduled. If not, consider
     440           1 :                 // compactions quiesced.
     441           1 :                 if workloadExhausted && !alreadyCompleted && r.compactionsAppearQuiesced(m) {
     442           1 :                         select {
     443           1 :                         case <-compactionCh:
     444           1 :                                 // A new compaction just finished; compactions have not
     445           1 :                                 // quiesced.
     446           1 :                                 continue
     447           1 :                         case <-time.After(time.Second):
     448           1 :                                 // No compactions completed. If it still looks like they've
     449           1 :                                 // quiesced according to the metrics, consider them quiesced.
     450           1 :                                 if r.compactionsAppearQuiesced(r.d.Metrics()) {
     451           1 :                                         r.metrics.quiesceDuration = time.Since(workloadExhaustedAt)
     452           1 :                                         return nil
     453           1 :                                 }
     454             :                         }
     455             :                 }
     456             :         }
     457             : }
     458             : 
     459             : // compactionsAppearQuiesced returns true if the database may have quiesced, and
     460             : // there likely won't be additional compactions scheduled. Detecting quiescence
     461             : // is a bit fraught: The various signals that Pebble makes available are
     462             : // adjusted at different points in the compaction lifecycle, and database
     463             : // mutexes are dropped and acquired between them. This makes it difficult to
     464             : // reliably identify when compactions quiesce.
     465             : //
     466             : // For example, our call to DB.Metrics() may acquire the DB.mu mutex when a
     467             : // compaction has just successfully completed, but before it's managed to
     468             : // schedule the next compaction (DB.mu is dropped while it attempts to acquire
     469             : // the manifest lock).
     470           1 : func (r *Runner) compactionsAppearQuiesced(m *pebble.Metrics) bool {
     471           1 :         r.compactionMu.Lock()
     472           1 :         defer r.compactionMu.Unlock()
     473           1 :         if m.Flush.NumInProgress > 0 {
     474           1 :                 return false
     475           1 :         } else if m.Compact.NumInProgress > 0 && r.compactionMu.started != r.compactionMu.completed {
     476           1 :                 return false
     477           1 :         }
     478           1 :         return true
     479             : }
     480             : 
     481             : // nextCompactionCompletes may be used to be notified when new compactions
     482             : // complete. The caller is responsible for holding on to a monotonically
     483             : // increasing count representing the number of compactions that have been
     484             : // observed, beginning at zero.
     485             : //
     486             : // The caller passes their current count as an argument. If a new compaction has
     487             : // already completed since their provided count, nextCompactionCompletes returns
     488             : // the new count and a true boolean return value. If a new compaction has not
     489             : // yet completed, it returns a channel that will be closed when the next
     490             : // compaction completes. This scheme allows the caller to select{...},
     491             : // performing some action on every compaction completion.
     492             : func (r *Runner) nextCompactionCompletes(
     493             :         lastObserved int64,
     494           1 : ) (count int64, alreadyOccurred bool, ch chan struct{}) {
     495           1 :         r.compactionMu.Lock()
     496           1 :         defer r.compactionMu.Unlock()
     497           1 : 
     498           1 :         if lastObserved < r.compactionMu.completed {
     499           1 :                 // There has already been another compaction since the last one observed
     500           1 :                 // by this caller. Return immediately.
     501           1 :                 return r.compactionMu.completed, true, nil
     502           1 :         }
     503             : 
     504             :         // The last observed compaction is still the most recent compaction.
     505             :         // Return a channel that the caller can wait on to be notified when the
     506             :         // next compaction occurs.
     507           1 :         if r.compactionMu.ch == nil {
     508           1 :                 r.compactionMu.ch = make(chan struct{})
     509           1 :         }
     510           1 :         return lastObserved, false, r.compactionMu.ch
     511             : }
     512             : 
     513             : // Wait waits for the workload replay to complete. Wait returns once the entire
     514             : // workload has been replayed, and compactions have quiesced.
     515           1 : func (r *Runner) Wait() (Metrics, error) {
     516           1 :         err := r.errgroup.Wait()
     517           1 :         if storedErr := r.err.Load(); storedErr != nil {
     518           0 :                 err = storedErr.(error)
     519           0 :         }
     520           1 :         pm := r.d.Metrics()
     521           1 :         total := pm.Total()
     522           1 :         var ingestBytesWeighted uint64
     523           1 :         for l := 0; l < len(pm.Levels); l++ {
     524           1 :                 ingestBytesWeighted += pm.Levels[l].BytesIngested * uint64(len(pm.Levels)-l-1)
     525           1 :         }
     526             : 
     527           1 :         m := Metrics{
     528           1 :                 Final:               pm,
     529           1 :                 EstimatedDebt:       r.metrics.estimatedDebt,
     530           1 :                 PaceDuration:        time.Duration(r.metrics.paceDurationNano.Load()),
     531           1 :                 ReadAmp:             r.metrics.readAmp,
     532           1 :                 QuiesceDuration:     r.metrics.quiesceDuration,
     533           1 :                 TombstoneCount:      r.metrics.tombstoneCount,
     534           1 :                 TotalSize:           r.metrics.totalSize,
     535           1 :                 TotalWriteAmp:       total.WriteAmp(),
     536           1 :                 WorkloadDuration:    r.metrics.workloadDuration,
     537           1 :                 WriteBytes:          r.metrics.writeBytes.Load(),
     538           1 :                 WriteStalls:         make(map[string]int),
     539           1 :                 WriteStallsDuration: make(map[string]time.Duration),
     540           1 :                 WriteThroughput:     r.metrics.writeThroughput,
     541           1 :         }
     542           1 : 
     543           1 :         r.writeStallMetrics.Lock()
     544           1 :         for reason, count := range r.writeStallMetrics.countByReason {
     545           1 :                 m.WriteStalls[reason] = count
     546           1 :         }
     547           1 :         for reason, duration := range r.writeStallMetrics.durationByReason {
     548           1 :                 m.WriteStallsDuration[reason] = duration
     549           1 :         }
     550           1 :         r.writeStallMetrics.Unlock()
     551           1 :         m.CompactionCounts.Total = pm.Compact.Count
     552           1 :         m.CompactionCounts.Default = pm.Compact.DefaultCount
     553           1 :         m.CompactionCounts.DeleteOnly = pm.Compact.DeleteOnlyCount
     554           1 :         m.CompactionCounts.ElisionOnly = pm.Compact.ElisionOnlyCount
     555           1 :         m.CompactionCounts.Move = pm.Compact.MoveCount
     556           1 :         m.CompactionCounts.Read = pm.Compact.ReadCount
     557           1 :         m.CompactionCounts.Rewrite = pm.Compact.RewriteCount
     558           1 :         m.CompactionCounts.MultiLevel = pm.Compact.MultiLevelCount
     559           1 :         m.Ingest.BytesIntoL0 = pm.Levels[0].BytesIngested
     560           1 :         m.Ingest.BytesWeightedByLevel = ingestBytesWeighted
     561           1 :         return m, err
     562             : }
     563             : 
     564             : // Close closes remaining open resources, including the database. It must be
     565             : // called after Wait.
     566           1 : func (r *Runner) Close() error {
     567           1 :         return r.d.Close()
     568           1 : }
     569             : 
     570             : // A workloadStep describes a single manifest edit in the workload. It may be a
     571             : // flush or ingest that should be applied to the test database, or it may be a
     572             : // compaction that is surfaced to allow the replay logic to compare against the
     573             : // state of the database at workload collection time.
     574             : type workloadStep struct {
     575             :         kind stepKind
     576             :         ve   manifest.VersionEdit
     577             :         // a Version describing the state of the LSM *before* the workload was
     578             :         // collected.
     579             :         pv *manifest.Version
     580             :         // a Version describing the state of the LSM when the workload was
     581             :         // collected.
     582             :         v *manifest.Version
     583             :         // non-nil for flushStepKind
     584             :         flushBatch           *pebble.Batch
     585             :         tablesToIngest       []string
     586             :         cumulativeWriteBytes uint64
     587             : }
     588             : 
     589             : type stepKind uint8
     590             : 
     591             : const (
     592             :         flushStepKind stepKind = iota
     593             :         ingestStepKind
     594             :         compactionStepKind
     595             : )
     596             : 
     597             : // eventListener returns a Pebble EventListener that is installed on the replay
     598             : // database so that the replay runner has access to internal Pebble events.
     599           1 : func (r *Runner) eventListener() pebble.EventListener {
     600           1 :         var writeStallBegin time.Time
     601           1 :         var writeStallReason string
     602           1 :         l := pebble.EventListener{
     603           1 :                 BackgroundError: func(err error) {
     604           0 :                         r.err.Store(err)
     605           0 :                         r.cancel()
     606           0 :                 },
     607           1 :                 WriteStallBegin: func(info pebble.WriteStallBeginInfo) {
     608           1 :                         r.writeStallMetrics.Lock()
     609           1 :                         defer r.writeStallMetrics.Unlock()
     610           1 :                         writeStallReason = info.Reason
     611           1 :                         // Take just the first word of the reason.
     612           1 :                         if j := strings.IndexByte(writeStallReason, ' '); j != -1 {
     613           1 :                                 writeStallReason = writeStallReason[:j]
     614           1 :                         }
     615           1 :                         switch writeStallReason {
     616           1 :                         case "L0", "memtable":
     617           1 :                                 r.writeStallMetrics.countByReason[writeStallReason]++
     618           0 :                         default:
     619           0 :                                 panic(fmt.Sprintf("unrecognized write stall reason %q", info.Reason))
     620             :                         }
     621           1 :                         writeStallBegin = time.Now()
     622             :                 },
     623           1 :                 WriteStallEnd: func() {
     624           1 :                         r.writeStallMetrics.Lock()
     625           1 :                         defer r.writeStallMetrics.Unlock()
     626           1 :                         r.writeStallMetrics.durationByReason[writeStallReason] += time.Since(writeStallBegin)
     627           1 :                 },
     628           1 :                 CompactionBegin: func(_ pebble.CompactionInfo) {
     629           1 :                         r.compactionMu.Lock()
     630           1 :                         defer r.compactionMu.Unlock()
     631           1 :                         r.compactionMu.started++
     632           1 :                 },
     633           1 :                 CompactionEnd: func(_ pebble.CompactionInfo) {
     634           1 :                         // Keep track of the number of compactions that complete and notify
     635           1 :                         // anyone waiting for a compaction to complete. See the function
     636           1 :                         // nextCompactionCompletes for the corresponding receiver side.
     637           1 :                         r.compactionMu.Lock()
     638           1 :                         defer r.compactionMu.Unlock()
     639           1 :                         r.compactionMu.completed++
     640           1 :                         if r.compactionMu.ch != nil {
     641           1 :                                 // Signal that a compaction has completed.
     642           1 :                                 close(r.compactionMu.ch)
     643           1 :                                 r.compactionMu.ch = nil
     644           1 :                         }
     645             :                 },
     646             :         }
     647           1 :         l.EnsureDefaults(nil)
     648           1 :         return l
     649             : }
     650             : 
     651             : // applyWorkloadSteps runs in its own goroutine, reading workload steps off the
     652             : // r.steps channel and applying them to the test database.
     653           1 : func (r *Runner) applyWorkloadSteps(ctx context.Context) error {
     654           1 :         for {
     655           1 :                 var ok bool
     656           1 :                 var step workloadStep
     657           1 :                 select {
     658           0 :                 case <-ctx.Done():
     659           0 :                         return ctx.Err()
     660           1 :                 case step, ok = <-r.steps:
     661           1 :                         if !ok {
     662           1 :                                 // Exhausted the workload. Exit.
     663           1 :                                 close(r.stepsApplied)
     664           1 :                                 return nil
     665           1 :                         }
     666             :                 }
     667             : 
     668           1 :                 paceDur := r.Pacer.pace(r, step)
     669           1 :                 r.metrics.paceDurationNano.Add(uint64(paceDur))
     670           1 : 
     671           1 :                 switch step.kind {
     672           1 :                 case flushStepKind:
     673           1 :                         if err := step.flushBatch.Commit(&pebble.WriteOptions{Sync: false}); err != nil {
     674           0 :                                 return err
     675           0 :                         }
     676           1 :                         _, err := r.d.AsyncFlush()
     677           1 :                         if err != nil {
     678           0 :                                 return err
     679           0 :                         }
     680           1 :                         r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
     681           1 :                         r.stepsApplied <- step
     682           0 :                 case ingestStepKind:
     683           0 :                         if err := r.d.Ingest(step.tablesToIngest); err != nil {
     684           0 :                                 return err
     685           0 :                         }
     686           0 :                         r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
     687           0 :                         r.stepsApplied <- step
     688           1 :                 case compactionStepKind:
     689             :                         // No-op.
     690             :                         // TODO(jackson): Should we elide this earlier?
     691           0 :                 default:
     692           0 :                         panic("unreachable")
     693             :                 }
     694             :         }
     695             : }
     696             : 
     697             : // prepareWorkloadSteps runs in its own goroutine, reading the workload
     698             : // manifests in order to reconstruct the workload and prepare each step to be
     699             : // applied. It sends each workload step to the r.steps channel.
     700           1 : func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
     701           1 :         defer func() { close(r.steps) }()
     702             : 
     703           1 :         idx := r.workload.manifestIdx
     704           1 : 
     705           1 :         var cumulativeWriteBytes uint64
     706           1 :         var flushBufs flushBuffers
     707           1 :         var v *manifest.Version
     708           1 :         var previousVersion *manifest.Version
     709           1 :         var bve manifest.BulkVersionEdit
     710           1 :         bve.AddedByFileNum = make(map[base.FileNum]*manifest.FileMetadata)
     711           1 :         applyVE := func(ve *manifest.VersionEdit) error {
     712           1 :                 return bve.Accumulate(ve)
     713           1 :         }
     714           1 :         currentVersion := func() (*manifest.Version, error) {
     715           1 :                 var err error
     716           1 :                 v, err = bve.Apply(v,
     717           1 :                         r.Opts.Comparer.Compare,
     718           1 :                         r.Opts.Comparer.FormatKey,
     719           1 :                         r.Opts.FlushSplitBytes,
     720           1 :                         r.Opts.Experimental.ReadCompactionRate,
     721           1 :                         nil /* zombies */)
     722           1 :                 bve = manifest.BulkVersionEdit{AddedByFileNum: bve.AddedByFileNum}
     723           1 :                 return v, err
     724           1 :         }
     725             : 
     726           1 :         for ; idx < len(r.workload.manifests); idx++ {
     727           1 :                 if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
     728           0 :                         break
     729             :                 }
     730             : 
     731           1 :                 err := func() error {
     732           1 :                         manifestName := r.workload.manifests[idx]
     733           1 :                         f, err := r.WorkloadFS.Open(r.WorkloadFS.PathJoin(r.WorkloadPath, manifestName))
     734           1 :                         if err != nil {
     735           0 :                                 return err
     736           0 :                         }
     737           1 :                         defer f.Close()
     738           1 : 
     739           1 :                         rr := record.NewReader(f, 0 /* logNum */)
     740           1 :                         // A manifest's first record always holds the initial version state.
     741           1 :                         // If this is the first manifest we're examining, we load it in
     742           1 :                         // order to seed `metas` with the file metadata of the existing
     743           1 :                         // files. Otherwise, we can skip it because we already know all the
     744           1 :                         // file metadatas up to this point.
     745           1 :                         rec, err := rr.Next()
     746           1 :                         if err != nil {
     747           0 :                                 return err
     748           0 :                         }
     749           1 :                         if idx == r.workload.manifestIdx {
     750           1 :                                 var ve manifest.VersionEdit
     751           1 :                                 if err := ve.Decode(rec); err != nil {
     752           0 :                                         return err
     753           0 :                                 }
     754           1 :                                 if err := applyVE(&ve); err != nil {
     755           0 :                                         return err
     756           0 :                                 }
     757             :                         }
     758             : 
     759             :                         // Read the remaining of the manifests version edits, one-by-one.
     760           1 :                         for {
     761           1 :                                 rec, err := rr.Next()
     762           1 :                                 if err == io.EOF || record.IsInvalidRecord(err) {
     763           1 :                                         break
     764           1 :                                 } else if err != nil {
     765           0 :                                         return err
     766           0 :                                 }
     767           1 :                                 var ve manifest.VersionEdit
     768           1 :                                 if err = ve.Decode(rec); err == io.EOF || record.IsInvalidRecord(err) {
     769           0 :                                         break
     770           1 :                                 } else if err != nil {
     771           0 :                                         return err
     772           0 :                                 }
     773           1 :                                 if err := applyVE(&ve); err != nil {
     774           0 :                                         return err
     775           0 :                                 }
     776           1 :                                 if idx == r.workload.manifestIdx && rr.Offset() <= r.workload.manifestOff {
     777           1 :                                         // The record rec began at an offset strictly less than
     778           1 :                                         // rr.Offset(), which means it's strictly less than
     779           1 :                                         // r.workload.manifestOff, and we should skip it.
     780           1 :                                         continue
     781             :                                 }
     782           1 :                                 if len(ve.NewFiles) == 0 && len(ve.DeletedFiles) == 0 {
     783           1 :                                         // Skip WAL rotations and other events that don't affect the
     784           1 :                                         // files of the LSM.
     785           1 :                                         continue
     786             :                                 }
     787             : 
     788           1 :                                 s := workloadStep{ve: ve}
     789           1 :                                 if len(ve.DeletedFiles) > 0 {
     790           1 :                                         // If a version edit deletes files, we assume it's a compaction.
     791           1 :                                         s.kind = compactionStepKind
     792           1 :                                 } else {
     793           1 :                                         // Default to ingest. If any files have unequal
     794           1 :                                         // smallest,largest sequence numbers, we'll update this to a
     795           1 :                                         // flush.
     796           1 :                                         s.kind = ingestStepKind
     797           1 :                                 }
     798           1 :                                 var newFiles []base.DiskFileNum
     799           1 :                                 for _, nf := range ve.NewFiles {
     800           1 :                                         newFiles = append(newFiles, nf.Meta.FileBacking.DiskFileNum)
     801           1 :                                         if s.kind == ingestStepKind && (nf.Meta.SmallestSeqNum != nf.Meta.LargestSeqNum || nf.Level != 0) {
     802           1 :                                                 s.kind = flushStepKind
     803           1 :                                         }
     804             :                                 }
     805             :                                 // Add the current reference *Version to the step. This provides
     806             :                                 // access to, for example, the read-amplification of the
     807             :                                 // database at this point when the workload was collected. This
     808             :                                 // can be useful for pacing.
     809           1 :                                 if s.v, err = currentVersion(); err != nil {
     810           0 :                                         return err
     811           0 :                                 }
     812             :                                 // On the first time through, we set the previous version to the current
     813             :                                 // version otherwise we set it to the actual previous version.
     814           1 :                                 if previousVersion == nil {
     815           1 :                                         previousVersion = s.v
     816           1 :                                 }
     817           1 :                                 s.pv = previousVersion
     818           1 :                                 previousVersion = s.v
     819           1 : 
     820           1 :                                 // It's possible that the workload collector captured this
     821           1 :                                 // version edit, but wasn't able to collect all of the
     822           1 :                                 // corresponding sstables before being terminated.
     823           1 :                                 if s.kind == flushStepKind || s.kind == ingestStepKind {
     824           1 :                                         for _, fileNum := range newFiles {
     825           1 :                                                 if _, ok := r.workload.sstables[fileNum.FileNum()]; !ok {
     826           0 :                                                         // TODO(jackson,leon): This isn't exactly an error
     827           0 :                                                         // condition. Give this more thought; do we want to
     828           0 :                                                         // require graceful exiting of workload collection,
     829           0 :                                                         // such that the last version edit must have had its
     830           0 :                                                         // corresponding sstables collected?
     831           0 :                                                         return errors.Newf("sstable %s not found", fileNum)
     832           0 :                                                 }
     833             :                                         }
     834             :                                 }
     835             : 
     836           1 :                                 switch s.kind {
     837           1 :                                 case flushStepKind:
     838           1 :                                         // Load all of the flushed sstables' keys into a batch.
     839           1 :                                         s.flushBatch = r.d.NewBatch()
     840           1 :                                         if err := loadFlushedSSTableKeys(s.flushBatch, r.WorkloadFS, r.WorkloadPath, newFiles, r.readerOpts, &flushBufs); err != nil {
     841           0 :                                                 return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset())
     842           0 :                                         }
     843           1 :                                         cumulativeWriteBytes += uint64(s.flushBatch.Len())
     844           0 :                                 case ingestStepKind:
     845           0 :                                         // Copy the ingested sstables into a staging area within the
     846           0 :                                         // run dir. This is necessary for two reasons:
     847           0 :                                         //  a) Ingest will remove the source file, and we don't want
     848           0 :                                         //     to mutate the workload.
     849           0 :                                         //  b) If the workload stored on another volume, Ingest
     850           0 :                                         //     would need to fall back to copying the file since
     851           0 :                                         //     it's not possible to link across volumes. The true
     852           0 :                                         //     workload likely linked the file. Staging the file
     853           0 :                                         //     ahead of time ensures that we're able to Link the
     854           0 :                                         //     file like the original workload did.
     855           0 :                                         for _, fileNum := range newFiles {
     856           0 :                                                 src := base.MakeFilepath(r.WorkloadFS, r.WorkloadPath, base.FileTypeTable, fileNum)
     857           0 :                                                 dst := base.MakeFilepath(r.Opts.FS, r.stagingDir, base.FileTypeTable, fileNum)
     858           0 :                                                 if err := vfs.CopyAcrossFS(r.WorkloadFS, src, r.Opts.FS, dst); err != nil {
     859           0 :                                                         return errors.Wrapf(err, "ingest in %q at offset %d", manifestName, rr.Offset())
     860           0 :                                                 }
     861           0 :                                                 finfo, err := r.Opts.FS.Stat(dst)
     862           0 :                                                 if err != nil {
     863           0 :                                                         return errors.Wrapf(err, "stating %q", dst)
     864           0 :                                                 }
     865           0 :                                                 cumulativeWriteBytes += uint64(finfo.Size())
     866           0 :                                                 s.tablesToIngest = append(s.tablesToIngest, dst)
     867             :                                         }
     868           1 :                                 case compactionStepKind:
     869             :                                         // Nothing to do.
     870             :                                 }
     871           1 :                                 s.cumulativeWriteBytes = cumulativeWriteBytes
     872           1 : 
     873           1 :                                 select {
     874           0 :                                 case <-ctx.Done():
     875           0 :                                         return ctx.Err()
     876           1 :                                 case r.steps <- s:
     877             :                                 }
     878             : 
     879           1 :                                 if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
     880           0 :                                         break
     881             :                                 }
     882             :                         }
     883           1 :                         return nil
     884             :                 }()
     885           1 :                 if err != nil {
     886           0 :                         return err
     887           0 :                 }
     888             :         }
     889           1 :         return nil
     890             : }
     891             : 
     892             : // findWorkloadFiles finds all manifests and tables in the provided path on fs.
     893             : func findWorkloadFiles(
     894             :         path string, fs vfs.FS,
     895           1 : ) (manifests []string, sstables map[base.FileNum]struct{}, err error) {
     896           1 :         dirents, err := fs.List(path)
     897           1 :         if err != nil {
     898           0 :                 return nil, nil, err
     899           0 :         }
     900           1 :         sstables = make(map[base.FileNum]struct{})
     901           1 :         for _, dirent := range dirents {
     902           1 :                 typ, fileNum, ok := base.ParseFilename(fs, dirent)
     903           1 :                 if !ok {
     904           1 :                         continue
     905             :                 }
     906           1 :                 switch typ {
     907           1 :                 case base.FileTypeManifest:
     908           1 :                         manifests = append(manifests, dirent)
     909           1 :                 case base.FileTypeTable:
     910           1 :                         sstables[fileNum.FileNum()] = struct{}{}
     911             :                 }
     912             :         }
     913           1 :         if len(manifests) == 0 {
     914           1 :                 return nil, nil, errors.Newf("no manifests found")
     915           1 :         }
     916           1 :         sort.Strings(manifests)
     917           1 :         return manifests, sstables, err
     918             : }
     919             : 
     920             : // findManifestStart takes a database directory and FS containing the initial
     921             : // database state that a workload will be run against, and a list of a workloads
     922             : // manifests. It examines the database's current manifest to determine where
     923             : // workload replay should begin, so as to not duplicate already-applied version
     924             : // edits.
     925             : //
     926             : // It returns the index of the starting manifest, and the database's current
     927             : // offset within the manifest.
     928             : func findManifestStart(
     929             :         dbDir string, dbFS vfs.FS, manifests []string,
     930           1 : ) (index int, offset int64, err error) {
     931           1 :         // Identify the database's current manifest.
     932           1 :         dbDesc, err := pebble.Peek(dbDir, dbFS)
     933           1 :         if err != nil {
     934           0 :                 return 0, 0, err
     935           0 :         }
     936           1 :         dbManifest := dbFS.PathBase(dbDesc.ManifestFilename)
     937           1 :         // If there is no initial database state, begin workload replay from the
     938           1 :         // beginning of the first manifest.
     939           1 :         if !dbDesc.Exists {
     940           1 :                 return 0, 0, nil
     941           1 :         }
     942           1 :         for index = 0; index < len(manifests); index++ {
     943           1 :                 if manifests[index] == dbManifest {
     944           1 :                         break
     945             :                 }
     946             :         }
     947           1 :         if index == len(manifests) {
     948           1 :                 // The initial database state has a manifest that does not appear within
     949           1 :                 // the workload's set of manifests. This is possible if we began
     950           1 :                 // recording the workload at the same time as a manifest rotation, but
     951           1 :                 // more likely we're applying a workload to a different initial database
     952           1 :                 // state than the one from which the workload was collected. Either way,
     953           1 :                 // start from the beginning of the first manifest.
     954           1 :                 return 0, 0, nil
     955           1 :         }
     956             :         // Find the initial database's offset within the manifest.
     957           1 :         info, err := dbFS.Stat(dbFS.PathJoin(dbDir, dbManifest))
     958           1 :         if err != nil {
     959           0 :                 return 0, 0, err
     960           0 :         }
     961           1 :         return index, info.Size(), nil
     962             : }
     963             : 
     964             : // loadFlushedSSTableKeys copies keys from the sstables specified by `fileNums`
     965             : // in the directory specified by `path` into the provided the batch. Keys are
     966             : // applied to the batch in the order dictated by their sequence numbers within
     967             : // the sstables, ensuring the relative relationship between sequence numbers is
     968             : // maintained.
     969             : //
     970             : // Preserving the relative relationship between sequence numbers is not strictly
     971             : // necessary, but it ensures we accurately exercise some microoptimizations (eg,
     972             : // detecting user key changes by descending trailer). There may be additional
     973             : // dependencies on sequence numbers in the future.
     974             : func loadFlushedSSTableKeys(
     975             :         b *pebble.Batch,
     976             :         fs vfs.FS,
     977             :         path string,
     978             :         fileNums []base.DiskFileNum,
     979             :         readOpts sstable.ReaderOptions,
     980             :         bufs *flushBuffers,
     981           1 : ) error {
     982           1 :         // Load all the keys across all the sstables.
     983           1 :         for _, fileNum := range fileNums {
     984           1 :                 if err := func() error {
     985           1 :                         filePath := base.MakeFilepath(fs, path, base.FileTypeTable, fileNum)
     986           1 :                         f, err := fs.Open(filePath)
     987           1 :                         if err != nil {
     988           0 :                                 return err
     989           0 :                         }
     990           1 :                         readable, err := sstable.NewSimpleReadable(f)
     991           1 :                         if err != nil {
     992           0 :                                 f.Close()
     993           0 :                                 return err
     994           0 :                         }
     995           1 :                         r, err := sstable.NewReader(readable, readOpts)
     996           1 :                         if err != nil {
     997           0 :                                 return err
     998           0 :                         }
     999           1 :                         defer r.Close()
    1000           1 : 
    1001           1 :                         // Load all the point keys.
    1002           1 :                         iter, err := r.NewIter(nil, nil)
    1003           1 :                         if err != nil {
    1004           0 :                                 return err
    1005           0 :                         }
    1006           1 :                         defer iter.Close()
    1007           1 :                         for k, lv := iter.First(); k != nil; k, lv = iter.Next() {
    1008           1 :                                 var key flushedKey
    1009           1 :                                 key.Trailer = k.Trailer
    1010           1 :                                 bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
    1011           1 :                                 if v, callerOwned, err := lv.Value(nil); err != nil {
    1012           0 :                                         return err
    1013           1 :                                 } else if callerOwned {
    1014           0 :                                         key.value = v
    1015           1 :                                 } else {
    1016           1 :                                         bufs.alloc, key.value = bufs.alloc.Copy(v)
    1017           1 :                                 }
    1018           1 :                                 bufs.keys = append(bufs.keys, key)
    1019             :                         }
    1020             : 
    1021             :                         // Load all the range tombstones.
    1022           1 :                         if iter, err := r.NewRawRangeDelIter(); err != nil {
    1023           0 :                                 return err
    1024           1 :                         } else if iter != nil {
    1025           1 :                                 defer iter.Close()
    1026           1 :                                 for s := iter.First(); s != nil; s = iter.Next() {
    1027           1 :                                         if err := rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
    1028           1 :                                                 var key flushedKey
    1029           1 :                                                 key.Trailer = k.Trailer
    1030           1 :                                                 bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
    1031           1 :                                                 bufs.alloc, key.value = bufs.alloc.Copy(v)
    1032           1 :                                                 bufs.keys = append(bufs.keys, key)
    1033           1 :                                                 return nil
    1034           1 :                                         }); err != nil {
    1035           0 :                                                 return err
    1036           0 :                                         }
    1037             :                                 }
    1038             :                         }
    1039             : 
    1040             :                         // Load all the range keys.
    1041           1 :                         if iter, err := r.NewRawRangeKeyIter(); err != nil {
    1042           0 :                                 return err
    1043           1 :                         } else if iter != nil {
    1044           1 :                                 defer iter.Close()
    1045           1 :                                 for s := iter.First(); s != nil; s = iter.Next() {
    1046           1 :                                         if err := rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
    1047           1 :                                                 var key flushedKey
    1048           1 :                                                 key.Trailer = k.Trailer
    1049           1 :                                                 bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
    1050           1 :                                                 bufs.alloc, key.value = bufs.alloc.Copy(v)
    1051           1 :                                                 bufs.keys = append(bufs.keys, key)
    1052           1 :                                                 return nil
    1053           1 :                                         }); err != nil {
    1054           0 :                                                 return err
    1055           0 :                                         }
    1056             :                                 }
    1057             :                         }
    1058           1 :                         return nil
    1059           0 :                 }(); err != nil {
    1060           0 :                         return err
    1061           0 :                 }
    1062             :         }
    1063             : 
    1064             :         // Sort the flushed keys by their sequence numbers so that we can apply them
    1065             :         // to the batch in the same order, maintaining the relative relationship
    1066             :         // between keys.
    1067             :         // NB: We use a stable sort so that keys corresponding to span fragments
    1068             :         // (eg, range tombstones and range keys) have a deterministic ordering for
    1069             :         // testing.
    1070           1 :         sort.Stable(bufs.keys)
    1071           1 : 
    1072           1 :         // Add the keys to the batch in the order they were committed when the
    1073           1 :         // workload was captured.
    1074           1 :         for i := 0; i < len(bufs.keys); i++ {
    1075           1 :                 var err error
    1076           1 :                 switch bufs.keys[i].Kind() {
    1077           1 :                 case base.InternalKeyKindDelete:
    1078           1 :                         err = b.Delete(bufs.keys[i].UserKey, nil)
    1079           0 :                 case base.InternalKeyKindDeleteSized:
    1080           0 :                         v, _ := binary.Uvarint(bufs.keys[i].value)
    1081           0 :                         // Batch.DeleteSized takes just the length of the value being
    1082           0 :                         // deleted and adds the key's length to derive the overall entry
    1083           0 :                         // size of the value being deleted. This has already been done to
    1084           0 :                         // the key we're reading from the sstable, so we must subtract the
    1085           0 :                         // key length from the encoded value before calling b.DeleteSized,
    1086           0 :                         // which will again add the key length before encoding.
    1087           0 :                         err = b.DeleteSized(bufs.keys[i].UserKey, uint32(v-uint64(len(bufs.keys[i].UserKey))), nil)
    1088           1 :                 case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete:
    1089           1 :                         err = b.Set(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
    1090           0 :                 case base.InternalKeyKindMerge:
    1091           0 :                         err = b.Merge(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
    1092           1 :                 case base.InternalKeyKindSingleDelete:
    1093           1 :                         err = b.SingleDelete(bufs.keys[i].UserKey, nil)
    1094           1 :                 case base.InternalKeyKindRangeDelete:
    1095           1 :                         err = b.DeleteRange(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
    1096           1 :                 case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
    1097           1 :                         s, err := rangekey.Decode(bufs.keys[i].InternalKey, bufs.keys[i].value, nil)
    1098           1 :                         if err != nil {
    1099           0 :                                 return err
    1100           0 :                         }
    1101           1 :                         if len(s.Keys) != 1 {
    1102           0 :                                 return errors.Newf("range key span unexpectedly contains %d keys", len(s.Keys))
    1103           0 :                         }
    1104           1 :                         switch bufs.keys[i].Kind() {
    1105           1 :                         case base.InternalKeyKindRangeKeySet:
    1106           1 :                                 err = b.RangeKeySet(s.Start, s.End, s.Keys[0].Suffix, s.Keys[0].Value, nil)
    1107           1 :                         case base.InternalKeyKindRangeKeyUnset:
    1108           1 :                                 err = b.RangeKeyUnset(s.Start, s.End, s.Keys[0].Suffix, nil)
    1109           1 :                         case base.InternalKeyKindRangeKeyDelete:
    1110           1 :                                 err = b.RangeKeyDelete(s.Start, s.End, nil)
    1111           0 :                         default:
    1112           0 :                                 err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
    1113             :                         }
    1114           1 :                         if err != nil {
    1115           0 :                                 return err
    1116           0 :                         }
    1117           0 :                 default:
    1118           0 :                         err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
    1119             :                 }
    1120           1 :                 if err != nil {
    1121           0 :                         return err
    1122           0 :                 }
    1123             :         }
    1124             : 
    1125             :         // Done with the flushBuffers. Reset.
    1126           1 :         bufs.keys = bufs.keys[:0]
    1127           1 :         return nil
    1128             : }
    1129             : 
    1130             : type flushBuffers struct {
    1131             :         keys  flushedKeysByTrailer
    1132             :         alloc bytealloc.A
    1133             : }
    1134             : 
    1135             : type flushedKeysByTrailer []flushedKey
    1136             : 
    1137           1 : func (s flushedKeysByTrailer) Len() int           { return len(s) }
    1138           1 : func (s flushedKeysByTrailer) Less(i, j int) bool { return s[i].Trailer < s[j].Trailer }
    1139           1 : func (s flushedKeysByTrailer) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
    1140             : 
    1141             : type flushedKey struct {
    1142             :         base.InternalKey
    1143             :         value []byte
    1144             : }

Generated by: LCOV version 1.14