LCOV - code coverage report
Current view: top level - pebble/replay - replay.go (source / functions) Hit Total Coverage
Test: 2024-05-17 08:16Z 2ac449bb - tests + meta.lcov Lines: 586 749 78.2 %
Date: 2024-05-17 08:17:28 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package replay implements collection and replaying of compaction benchmarking
       6             : // workloads. A workload is a collection of flushed and ingested sstables, along
       7             : // with the corresponding manifests describing the order and grouping with which
       8             : // they were applied. Replaying a workload flushes and ingests the same keys and
       9             : // sstables to reproduce the write workload for the purpose of evaluating
      10             : // compaction heuristics.
      11             : package replay
      12             : 
      13             : import (
      14             :         "context"
      15             :         "encoding/binary"
      16             :         "fmt"
      17             :         "io"
      18             :         "os"
      19             :         "sort"
      20             :         "strings"
      21             :         "sync"
      22             :         "sync/atomic"
      23             :         "time"
      24             : 
      25             :         "github.com/cockroachdb/errors"
      26             :         "github.com/cockroachdb/pebble"
      27             :         "github.com/cockroachdb/pebble/internal/base"
      28             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      29             :         "github.com/cockroachdb/pebble/internal/manifest"
      30             :         "github.com/cockroachdb/pebble/internal/rangedel"
      31             :         "github.com/cockroachdb/pebble/internal/rangekey"
      32             :         "github.com/cockroachdb/pebble/record"
      33             :         "github.com/cockroachdb/pebble/sstable"
      34             :         "github.com/cockroachdb/pebble/vfs"
      35             :         "golang.org/x/perf/benchfmt"
      36             :         "golang.org/x/sync/errgroup"
      37             : )
      38             : 
      39             : // A Pacer paces replay of a workload, determining when to apply the next
      40             : // incoming write.
      41             : type Pacer interface {
      42             :         pace(r *Runner, step workloadStep) time.Duration
      43             : }
      44             : 
      45             : // computeReadAmp calculates the read amplification from a manifest.Version
      46           0 : func computeReadAmp(v *manifest.Version) int {
      47           0 :         refRAmp := v.L0Sublevels.ReadAmplification()
      48           0 :         for _, lvl := range v.Levels[1:] {
      49           0 :                 if !lvl.Empty() {
      50           0 :                         refRAmp++
      51           0 :                 }
      52             :         }
      53           0 :         return refRAmp
      54             : }
      55             : 
      56             : // waitForReadAmpLE is a common function used by PaceByReferenceReadAmp and
      57             : // PaceByFixedReadAmp to wait on the dbMetricsNotifier condition variable if the
      58             : // read amplification observed is greater than the specified target (refRAmp).
      59           1 : func waitForReadAmpLE(r *Runner, rAmp int) {
      60           1 :         r.dbMetricsCond.L.Lock()
      61           1 :         m := r.dbMetrics
      62           1 :         ra := m.ReadAmp()
      63           1 :         for ra > rAmp {
      64           1 :                 r.dbMetricsCond.Wait()
      65           1 :                 ra = r.dbMetrics.ReadAmp()
      66           1 :         }
      67           1 :         r.dbMetricsCond.L.Unlock()
      68             : }
      69             : 
      70             : // Unpaced implements Pacer by applying each new write as soon as possible. It
      71             : // may be useful for examining performance under high read amplification.
      72             : type Unpaced struct{}
      73             : 
      74           1 : func (Unpaced) pace(*Runner, workloadStep) (d time.Duration) { return }
      75             : 
      76             : // PaceByReferenceReadAmp implements Pacer by applying each new write following
      77             : // the collected workloads read amplification.
      78             : type PaceByReferenceReadAmp struct{}
      79             : 
      80           0 : func (PaceByReferenceReadAmp) pace(r *Runner, w workloadStep) time.Duration {
      81           0 :         startTime := time.Now()
      82           0 :         refRAmp := computeReadAmp(w.pv)
      83           0 :         waitForReadAmpLE(r, refRAmp)
      84           0 :         return time.Since(startTime)
      85           0 : }
      86             : 
      87             : // PaceByFixedReadAmp implements Pacer by applying each new write following a
      88             : // fixed read amplification.
      89             : type PaceByFixedReadAmp int
      90             : 
      91           1 : func (pra PaceByFixedReadAmp) pace(r *Runner, _ workloadStep) time.Duration {
      92           1 :         startTime := time.Now()
      93           1 :         waitForReadAmpLE(r, int(pra))
      94           1 :         return time.Since(startTime)
      95           1 : }
      96             : 
      97             : // Metrics holds the various statistics on a replay run and its performance.
      98             : type Metrics struct {
      99             :         CompactionCounts struct {
     100             :                 Total       int64
     101             :                 Default     int64
     102             :                 DeleteOnly  int64
     103             :                 ElisionOnly int64
     104             :                 Move        int64
     105             :                 Read        int64
     106             :                 Rewrite     int64
     107             :                 Copy        int64
     108             :                 MultiLevel  int64
     109             :         }
     110             :         EstimatedDebt SampledMetric
     111             :         Final         *pebble.Metrics
     112             :         Ingest        struct {
     113             :                 BytesIntoL0 uint64
     114             :                 // BytesWeightedByLevel is calculated as the number of bytes ingested
     115             :                 // into a level multiplied by the level's distance from the bottommost
     116             :                 // level (L6), summed across all levels. It can be used to guage how
     117             :                 // effective heuristics are at ingesting files into lower levels, saving
     118             :                 // write amplification.
     119             :                 BytesWeightedByLevel uint64
     120             :         }
     121             :         // PaceDuration is the time waiting for the pacer to allow the workload to
     122             :         // continue.
     123             :         PaceDuration time.Duration
     124             :         ReadAmp      SampledMetric
     125             :         // QuiesceDuration is the time between completing application of the workload and
     126             :         // compactions quiescing.
     127             :         QuiesceDuration time.Duration
     128             :         TombstoneCount  SampledMetric
     129             :         // TotalSize holds the total size of the database, sampled after each
     130             :         // workload step.
     131             :         TotalSize           SampledMetric
     132             :         TotalWriteAmp       float64
     133             :         WorkloadDuration    time.Duration
     134             :         WriteBytes          uint64
     135             :         WriteStalls         map[string]int
     136             :         WriteStallsDuration map[string]time.Duration
     137             :         WriteThroughput     SampledMetric
     138             : }
     139             : 
     140             : // Plot holds an ascii plot and its name.
     141             : type Plot struct {
     142             :         Name string
     143             :         Plot string
     144             : }
     145             : 
     146             : // Plots returns a slice of ascii plots describing metrics change over time.
     147           0 : func (m *Metrics) Plots(width, height int) []Plot {
     148           0 :         const scaleMB = 1.0 / float64(1<<20)
     149           0 :         return []Plot{
     150           0 :                 {Name: "Write throughput (MB/s)", Plot: m.WriteThroughput.PlotIncreasingPerSec(width, height, scaleMB)},
     151           0 :                 {Name: "Estimated compaction debt (MB)", Plot: m.EstimatedDebt.Plot(width, height, scaleMB)},
     152           0 :                 {Name: "Total database size (MB)", Plot: m.TotalSize.Plot(width, height, scaleMB)},
     153           0 :                 {Name: "ReadAmp", Plot: m.ReadAmp.Plot(width, height, 1.0)},
     154           0 :         }
     155           0 : }
     156             : 
     157             : // WriteBenchmarkString writes the metrics in the form of a series of
     158             : // 'Benchmark' lines understandable by benchstat.
     159           1 : func (m *Metrics) WriteBenchmarkString(name string, w io.Writer) error {
     160           1 :         type benchmarkSection struct {
     161           1 :                 label  string
     162           1 :                 values []benchfmt.Value
     163           1 :         }
     164           1 :         groups := []benchmarkSection{
     165           1 :                 {label: "CompactionCounts", values: []benchfmt.Value{
     166           1 :                         {Value: float64(m.CompactionCounts.Total), Unit: "compactions"},
     167           1 :                         {Value: float64(m.CompactionCounts.Default), Unit: "default"},
     168           1 :                         {Value: float64(m.CompactionCounts.DeleteOnly), Unit: "delete"},
     169           1 :                         {Value: float64(m.CompactionCounts.ElisionOnly), Unit: "elision"},
     170           1 :                         {Value: float64(m.CompactionCounts.Move), Unit: "move"},
     171           1 :                         {Value: float64(m.CompactionCounts.Read), Unit: "read"},
     172           1 :                         {Value: float64(m.CompactionCounts.Rewrite), Unit: "rewrite"},
     173           1 :                         {Value: float64(m.CompactionCounts.Copy), Unit: "copy"},
     174           1 :                         {Value: float64(m.CompactionCounts.MultiLevel), Unit: "multilevel"},
     175           1 :                 }},
     176           1 :                 // Total database sizes sampled after every workload step and
     177           1 :                 // compaction. This can be used to evaluate the relative LSM space
     178           1 :                 // amplification between runs of the same workload. Calculating the true
     179           1 :                 // space amplification continuously is prohibitvely expensive (it
     180           1 :                 // requires totally compacting a copy of the LSM).
     181           1 :                 {label: "DatabaseSize/mean", values: []benchfmt.Value{
     182           1 :                         {Value: m.TotalSize.Mean(), Unit: "bytes"},
     183           1 :                 }},
     184           1 :                 {label: "DatabaseSize/max", values: []benchfmt.Value{
     185           1 :                         {Value: float64(m.TotalSize.Max()), Unit: "bytes"},
     186           1 :                 }},
     187           1 :                 // Time applying the workload and time waiting for compactions to
     188           1 :                 // quiesce after the workload has completed.
     189           1 :                 {label: "DurationWorkload", values: []benchfmt.Value{
     190           1 :                         {Value: m.WorkloadDuration.Seconds(), Unit: "sec/op"},
     191           1 :                 }},
     192           1 :                 {label: "DurationQuiescing", values: []benchfmt.Value{
     193           1 :                         {Value: m.QuiesceDuration.Seconds(), Unit: "sec/op"},
     194           1 :                 }},
     195           1 :                 {label: "DurationPaceDelay", values: []benchfmt.Value{
     196           1 :                         {Value: m.PaceDuration.Seconds(), Unit: "sec/op"},
     197           1 :                 }},
     198           1 :                 // Estimated compaction debt, sampled after every workload step and
     199           1 :                 // compaction.
     200           1 :                 {label: "EstimatedDebt/mean", values: []benchfmt.Value{
     201           1 :                         {Value: m.EstimatedDebt.Mean(), Unit: "bytes"},
     202           1 :                 }},
     203           1 :                 {label: "EstimatedDebt/max", values: []benchfmt.Value{
     204           1 :                         {Value: float64(m.EstimatedDebt.Max()), Unit: "bytes"},
     205           1 :                 }},
     206           1 :                 {label: "FlushUtilization", values: []benchfmt.Value{
     207           1 :                         {Value: m.Final.Flush.WriteThroughput.Utilization(), Unit: "util"},
     208           1 :                 }},
     209           1 :                 {label: "IngestedIntoL0", values: []benchfmt.Value{
     210           1 :                         {Value: float64(m.Ingest.BytesIntoL0), Unit: "bytes"},
     211           1 :                 }},
     212           1 :                 {label: "IngestWeightedByLevel", values: []benchfmt.Value{
     213           1 :                         {Value: float64(m.Ingest.BytesWeightedByLevel), Unit: "bytes"},
     214           1 :                 }},
     215           1 :                 {label: "ReadAmp/mean", values: []benchfmt.Value{
     216           1 :                         {Value: m.ReadAmp.Mean(), Unit: "files"},
     217           1 :                 }},
     218           1 :                 {label: "ReadAmp/max", values: []benchfmt.Value{
     219           1 :                         {Value: float64(m.ReadAmp.Max()), Unit: "files"},
     220           1 :                 }},
     221           1 :                 {label: "TombstoneCount/mean", values: []benchfmt.Value{
     222           1 :                         {Value: m.TombstoneCount.Mean(), Unit: "tombstones"},
     223           1 :                 }},
     224           1 :                 {label: "TombstoneCount/max", values: []benchfmt.Value{
     225           1 :                         {Value: float64(m.TombstoneCount.Max()), Unit: "tombstones"},
     226           1 :                 }},
     227           1 :                 {label: "Throughput", values: []benchfmt.Value{
     228           1 :                         {Value: float64(m.WriteBytes) / (m.WorkloadDuration + m.QuiesceDuration).Seconds(), Unit: "B/s"},
     229           1 :                 }},
     230           1 :                 {label: "WriteAmp", values: []benchfmt.Value{
     231           1 :                         {Value: float64(m.TotalWriteAmp), Unit: "wamp"},
     232           1 :                 }},
     233           1 :         }
     234           1 : 
     235           1 :         for _, reason := range []string{"L0", "memtable"} {
     236           1 :                 groups = append(groups, benchmarkSection{
     237           1 :                         label: fmt.Sprintf("WriteStall/%s", reason),
     238           1 :                         values: []benchfmt.Value{
     239           1 :                                 {Value: float64(m.WriteStalls[reason]), Unit: "stalls"},
     240           1 :                                 {Value: float64(m.WriteStallsDuration[reason].Seconds()), Unit: "stallsec/op"},
     241           1 :                         },
     242           1 :                 })
     243           1 :         }
     244             : 
     245           1 :         bw := benchfmt.NewWriter(w)
     246           1 :         for _, grp := range groups {
     247           1 :                 err := bw.Write(&benchfmt.Result{
     248           1 :                         Name:   benchfmt.Name(fmt.Sprintf("BenchmarkReplay/%s/%s", name, grp.label)),
     249           1 :                         Iters:  1,
     250           1 :                         Values: grp.values,
     251           1 :                 })
     252           1 :                 if err != nil {
     253           0 :                         return err
     254           0 :                 }
     255             :         }
     256           1 :         return nil
     257             : }
     258             : 
     259             : // Runner runs a captured workload against a test database, collecting
     260             : // metrics on performance.
     261             : type Runner struct {
     262             :         RunDir        string
     263             :         WorkloadFS    vfs.FS
     264             :         WorkloadPath  string
     265             :         Pacer         Pacer
     266             :         Opts          *pebble.Options
     267             :         MaxWriteBytes uint64
     268             : 
     269             :         // Internal state.
     270             : 
     271             :         d *pebble.DB
     272             :         // dbMetrics and dbMetricsCond work in unison to update the metrics and
     273             :         // notify (broadcast) to any waiting clients that metrics have been updated.
     274             :         dbMetrics     *pebble.Metrics
     275             :         dbMetricsCond sync.Cond
     276             :         cancel        func()
     277             :         err           atomic.Value
     278             :         errgroup      *errgroup.Group
     279             :         readerOpts    sstable.ReaderOptions
     280             :         stagingDir    string
     281             :         steps         chan workloadStep
     282             :         stepsApplied  chan workloadStep
     283             : 
     284             :         metrics struct {
     285             :                 estimatedDebt    SampledMetric
     286             :                 quiesceDuration  time.Duration
     287             :                 readAmp          SampledMetric
     288             :                 tombstoneCount   SampledMetric
     289             :                 totalSize        SampledMetric
     290             :                 paceDurationNano atomic.Uint64
     291             :                 workloadDuration time.Duration
     292             :                 writeBytes       atomic.Uint64
     293             :                 writeThroughput  SampledMetric
     294             :         }
     295             :         writeStallMetrics struct {
     296             :                 sync.Mutex
     297             :                 countByReason    map[string]int
     298             :                 durationByReason map[string]time.Duration
     299             :         }
     300             :         // compactionMu holds state for tracking the number of compactions
     301             :         // started and completed and waking waiting goroutines when a new compaction
     302             :         // completes. See nextCompactionCompletes.
     303             :         compactionMu struct {
     304             :                 sync.Mutex
     305             :                 ch        chan struct{}
     306             :                 started   int64
     307             :                 completed int64
     308             :         }
     309             :         workload struct {
     310             :                 manifests []string
     311             :                 // manifest{Idx,Off} record the starting position of the workload
     312             :                 // relative to the initial database state.
     313             :                 manifestIdx int
     314             :                 manifestOff int64
     315             :                 // sstables records the set of captured workload sstables by file num.
     316             :                 sstables map[base.FileNum]struct{}
     317             :         }
     318             : }
     319             : 
     320             : // Run begins executing the workload and returns.
     321             : //
     322             : // The workload application will respect the provided context's cancellation.
     323           1 : func (r *Runner) Run(ctx context.Context) error {
     324           1 :         // Find the workload start relative to the RunDir's existing database state.
     325           1 :         // A prefix of the workload's manifest edits are expected to have already
     326           1 :         // been applied to the checkpointed existing database state.
     327           1 :         var err error
     328           1 :         r.workload.manifests, r.workload.sstables, err = findWorkloadFiles(r.WorkloadPath, r.WorkloadFS)
     329           1 :         if err != nil {
     330           0 :                 return err
     331           0 :         }
     332           1 :         r.workload.manifestIdx, r.workload.manifestOff, err = findManifestStart(r.RunDir, r.Opts.FS, r.workload.manifests)
     333           1 :         if err != nil {
     334           0 :                 return err
     335           0 :         }
     336             : 
     337             :         // Set up a staging dir for files that will be ingested.
     338           1 :         r.stagingDir = r.Opts.FS.PathJoin(r.RunDir, "staging")
     339           1 :         if err := r.Opts.FS.MkdirAll(r.stagingDir, os.ModePerm); err != nil {
     340           0 :                 return err
     341           0 :         }
     342             : 
     343           1 :         r.dbMetricsCond = sync.Cond{
     344           1 :                 L: &sync.Mutex{},
     345           1 :         }
     346           1 : 
     347           1 :         // Extend the user-provided Options with extensions necessary for replay
     348           1 :         // mechanics.
     349           1 :         r.compactionMu.ch = make(chan struct{})
     350           1 :         r.Opts.AddEventListener(r.eventListener())
     351           1 :         r.writeStallMetrics.countByReason = make(map[string]int)
     352           1 :         r.writeStallMetrics.durationByReason = make(map[string]time.Duration)
     353           1 :         r.Opts.EnsureDefaults()
     354           1 :         r.readerOpts = r.Opts.MakeReaderOptions()
     355           1 :         r.Opts.DisableWAL = true
     356           1 :         r.d, err = pebble.Open(r.RunDir, r.Opts)
     357           1 :         if err != nil {
     358           0 :                 return err
     359           0 :         }
     360             : 
     361           1 :         r.dbMetrics = r.d.Metrics()
     362           1 : 
     363           1 :         // Use a buffered channel to allow the prepareWorkloadSteps to read ahead,
     364           1 :         // buffering up to cap(r.steps) steps ahead of the current applied state.
     365           1 :         // Flushes need to be buffered and ingested sstables need to be copied, so
     366           1 :         // pipelining this preparation makes it more likely the step will be ready
     367           1 :         // to apply when the pacer decides to apply it.
     368           1 :         r.steps = make(chan workloadStep, 5)
     369           1 :         r.stepsApplied = make(chan workloadStep, 5)
     370           1 : 
     371           1 :         ctx, r.cancel = context.WithCancel(ctx)
     372           1 :         r.errgroup, ctx = errgroup.WithContext(ctx)
     373           1 :         r.errgroup.Go(func() error { return r.prepareWorkloadSteps(ctx) })
     374           1 :         r.errgroup.Go(func() error { return r.applyWorkloadSteps(ctx) })
     375           1 :         r.errgroup.Go(func() error { return r.refreshMetrics(ctx) })
     376           1 :         return nil
     377             : }
     378             : 
     379             : // refreshMetrics runs in its own goroutine, collecting metrics from the Pebble
     380             : // instance whenever a) a workload step completes, or b) a compaction completes.
     381             : // The Pacer implementations that pace based on read-amplification rely on these
     382             : // refreshed metrics to decide when to allow the workload to proceed.
     383           1 : func (r *Runner) refreshMetrics(ctx context.Context) error {
     384           1 :         startAt := time.Now()
     385           1 :         var workloadExhausted bool
     386           1 :         var workloadExhaustedAt time.Time
     387           1 :         stepsApplied := r.stepsApplied
     388           1 :         compactionCount, alreadyCompleted, compactionCh := r.nextCompactionCompletes(0)
     389           1 :         for {
     390           1 :                 if !alreadyCompleted {
     391           1 :                         select {
     392           0 :                         case <-ctx.Done():
     393           0 :                                 return ctx.Err()
     394           1 :                         case <-compactionCh:
     395             :                                 // Fall through to refreshing dbMetrics.
     396           1 :                         case _, ok := <-stepsApplied:
     397           1 :                                 if !ok {
     398           1 :                                         workloadExhausted = true
     399           1 :                                         workloadExhaustedAt = time.Now()
     400           1 :                                         // Set the [stepsApplied] channel to nil so that we'll never
     401           1 :                                         // hit this case again, and we don't busy loop.
     402           1 :                                         stepsApplied = nil
     403           1 :                                         // Record the replay time.
     404           1 :                                         r.metrics.workloadDuration = workloadExhaustedAt.Sub(startAt)
     405           1 :                                 }
     406             :                                 // Fall through to refreshing dbMetrics.
     407             :                         }
     408             :                 }
     409             : 
     410           1 :                 m := r.d.Metrics()
     411           1 :                 r.dbMetricsCond.L.Lock()
     412           1 :                 r.dbMetrics = m
     413           1 :                 r.dbMetricsCond.Broadcast()
     414           1 :                 r.dbMetricsCond.L.Unlock()
     415           1 : 
     416           1 :                 // Collect sample metrics. These metrics are calculated by sampling
     417           1 :                 // every time we collect metrics.
     418           1 :                 r.metrics.readAmp.record(int64(m.ReadAmp()))
     419           1 :                 r.metrics.estimatedDebt.record(int64(m.Compact.EstimatedDebt))
     420           1 :                 r.metrics.tombstoneCount.record(int64(m.Keys.TombstoneCount))
     421           1 :                 r.metrics.totalSize.record(int64(m.DiskSpaceUsage()))
     422           1 :                 r.metrics.writeThroughput.record(int64(r.metrics.writeBytes.Load()))
     423           1 : 
     424           1 :                 compactionCount, alreadyCompleted, compactionCh = r.nextCompactionCompletes(compactionCount)
     425           1 :                 // Consider whether replaying is complete. There are two necessary
     426           1 :                 // conditions:
     427           1 :                 //
     428           1 :                 //   1. The workload must be exhausted.
     429           1 :                 //   2. Compactions must have quiesced.
     430           1 :                 //
     431           1 :                 // The first condition is simple. The replay tool is responsible for
     432           1 :                 // applying the workload. The goroutine responsible for applying the
     433           1 :                 // workload closes the `stepsApplied` channel after the last step has
     434           1 :                 // been applied, and we'll flip `workloadExhausted` to true.
     435           1 :                 //
     436           1 :                 // The second condition is tricky. The replay tool doesn't control
     437           1 :                 // compactions and doesn't have visibility into whether the compaction
     438           1 :                 // picker is about to schedule a new compaction. We can tell when
     439           1 :                 // compactions are in progress or may be immeninent (eg, flushes in
     440           1 :                 // progress). If it appears that compactions have quiesced, pause for a
     441           1 :                 // fixed duration to see if a new one is scheduled. If not, consider
     442           1 :                 // compactions quiesced.
     443           1 :                 if workloadExhausted && !alreadyCompleted && r.compactionsAppearQuiesced(m) {
     444           1 :                         select {
     445           1 :                         case <-compactionCh:
     446           1 :                                 // A new compaction just finished; compactions have not
     447           1 :                                 // quiesced.
     448           1 :                                 continue
     449           1 :                         case <-time.After(time.Second):
     450           1 :                                 // No compactions completed. If it still looks like they've
     451           1 :                                 // quiesced according to the metrics, consider them quiesced.
     452           1 :                                 if r.compactionsAppearQuiesced(r.d.Metrics()) {
     453           1 :                                         r.metrics.quiesceDuration = time.Since(workloadExhaustedAt)
     454           1 :                                         return nil
     455           1 :                                 }
     456             :                         }
     457             :                 }
     458             :         }
     459             : }
     460             : 
     461             : // compactionsAppearQuiesced returns true if the database may have quiesced, and
     462             : // there likely won't be additional compactions scheduled. Detecting quiescence
     463             : // is a bit fraught: The various signals that Pebble makes available are
     464             : // adjusted at different points in the compaction lifecycle, and database
     465             : // mutexes are dropped and acquired between them. This makes it difficult to
     466             : // reliably identify when compactions quiesce.
     467             : //
     468             : // For example, our call to DB.Metrics() may acquire the DB.mu mutex when a
     469             : // compaction has just successfully completed, but before it's managed to
     470             : // schedule the next compaction (DB.mu is dropped while it attempts to acquire
     471             : // the manifest lock).
     472           1 : func (r *Runner) compactionsAppearQuiesced(m *pebble.Metrics) bool {
     473           1 :         r.compactionMu.Lock()
     474           1 :         defer r.compactionMu.Unlock()
     475           1 :         if m.Flush.NumInProgress > 0 {
     476           1 :                 return false
     477           1 :         } else if m.Compact.NumInProgress > 0 && r.compactionMu.started != r.compactionMu.completed {
     478           1 :                 return false
     479           1 :         }
     480           1 :         return true
     481             : }
     482             : 
     483             : // nextCompactionCompletes may be used to be notified when new compactions
     484             : // complete. The caller is responsible for holding on to a monotonically
     485             : // increasing count representing the number of compactions that have been
     486             : // observed, beginning at zero.
     487             : //
     488             : // The caller passes their current count as an argument. If a new compaction has
     489             : // already completed since their provided count, nextCompactionCompletes returns
     490             : // the new count and a true boolean return value. If a new compaction has not
     491             : // yet completed, it returns a channel that will be closed when the next
     492             : // compaction completes. This scheme allows the caller to select{...},
     493             : // performing some action on every compaction completion.
     494             : func (r *Runner) nextCompactionCompletes(
     495             :         lastObserved int64,
     496           1 : ) (count int64, alreadyOccurred bool, ch chan struct{}) {
     497           1 :         r.compactionMu.Lock()
     498           1 :         defer r.compactionMu.Unlock()
     499           1 : 
     500           1 :         if lastObserved < r.compactionMu.completed {
     501           1 :                 // There has already been another compaction since the last one observed
     502           1 :                 // by this caller. Return immediately.
     503           1 :                 return r.compactionMu.completed, true, nil
     504           1 :         }
     505             : 
     506             :         // The last observed compaction is still the most recent compaction.
     507             :         // Return a channel that the caller can wait on to be notified when the
     508             :         // next compaction occurs.
     509           1 :         if r.compactionMu.ch == nil {
     510           1 :                 r.compactionMu.ch = make(chan struct{})
     511           1 :         }
     512           1 :         return lastObserved, false, r.compactionMu.ch
     513             : }
     514             : 
     515             : // Wait waits for the workload replay to complete. Wait returns once the entire
     516             : // workload has been replayed, and compactions have quiesced.
     517           1 : func (r *Runner) Wait() (Metrics, error) {
     518           1 :         err := r.errgroup.Wait()
     519           1 :         if storedErr := r.err.Load(); storedErr != nil {
     520           0 :                 err = storedErr.(error)
     521           0 :         }
     522           1 :         pm := r.d.Metrics()
     523           1 :         total := pm.Total()
     524           1 :         var ingestBytesWeighted uint64
     525           1 :         for l := 0; l < len(pm.Levels); l++ {
     526           1 :                 ingestBytesWeighted += pm.Levels[l].BytesIngested * uint64(len(pm.Levels)-l-1)
     527           1 :         }
     528             : 
     529           1 :         m := Metrics{
     530           1 :                 Final:               pm,
     531           1 :                 EstimatedDebt:       r.metrics.estimatedDebt,
     532           1 :                 PaceDuration:        time.Duration(r.metrics.paceDurationNano.Load()),
     533           1 :                 ReadAmp:             r.metrics.readAmp,
     534           1 :                 QuiesceDuration:     r.metrics.quiesceDuration,
     535           1 :                 TombstoneCount:      r.metrics.tombstoneCount,
     536           1 :                 TotalSize:           r.metrics.totalSize,
     537           1 :                 TotalWriteAmp:       total.WriteAmp(),
     538           1 :                 WorkloadDuration:    r.metrics.workloadDuration,
     539           1 :                 WriteBytes:          r.metrics.writeBytes.Load(),
     540           1 :                 WriteStalls:         make(map[string]int),
     541           1 :                 WriteStallsDuration: make(map[string]time.Duration),
     542           1 :                 WriteThroughput:     r.metrics.writeThroughput,
     543           1 :         }
     544           1 : 
     545           1 :         r.writeStallMetrics.Lock()
     546           1 :         for reason, count := range r.writeStallMetrics.countByReason {
     547           1 :                 m.WriteStalls[reason] = count
     548           1 :         }
     549           1 :         for reason, duration := range r.writeStallMetrics.durationByReason {
     550           1 :                 m.WriteStallsDuration[reason] = duration
     551           1 :         }
     552           1 :         r.writeStallMetrics.Unlock()
     553           1 :         m.CompactionCounts.Total = pm.Compact.Count
     554           1 :         m.CompactionCounts.Default = pm.Compact.DefaultCount
     555           1 :         m.CompactionCounts.DeleteOnly = pm.Compact.DeleteOnlyCount
     556           1 :         m.CompactionCounts.ElisionOnly = pm.Compact.ElisionOnlyCount
     557           1 :         m.CompactionCounts.Move = pm.Compact.MoveCount
     558           1 :         m.CompactionCounts.Read = pm.Compact.ReadCount
     559           1 :         m.CompactionCounts.Rewrite = pm.Compact.RewriteCount
     560           1 :         m.CompactionCounts.Copy = pm.Compact.CopyCount
     561           1 :         m.CompactionCounts.MultiLevel = pm.Compact.MultiLevelCount
     562           1 :         m.Ingest.BytesIntoL0 = pm.Levels[0].BytesIngested
     563           1 :         m.Ingest.BytesWeightedByLevel = ingestBytesWeighted
     564           1 :         return m, err
     565             : }
     566             : 
     567             : // Close closes remaining open resources, including the database. It must be
     568             : // called after Wait.
     569           1 : func (r *Runner) Close() error {
     570           1 :         return r.d.Close()
     571           1 : }
     572             : 
     573             : // A workloadStep describes a single manifest edit in the workload. It may be a
     574             : // flush or ingest that should be applied to the test database, or it may be a
     575             : // compaction that is surfaced to allow the replay logic to compare against the
     576             : // state of the database at workload collection time.
     577             : type workloadStep struct {
     578             :         kind stepKind
     579             :         ve   manifest.VersionEdit
     580             :         // a Version describing the state of the LSM *before* the workload was
     581             :         // collected.
     582             :         pv *manifest.Version
     583             :         // a Version describing the state of the LSM when the workload was
     584             :         // collected.
     585             :         v *manifest.Version
     586             :         // non-nil for flushStepKind
     587             :         flushBatch           *pebble.Batch
     588             :         tablesToIngest       []string
     589             :         cumulativeWriteBytes uint64
     590             : }
     591             : 
     592             : type stepKind uint8
     593             : 
     594             : const (
     595             :         flushStepKind stepKind = iota
     596             :         ingestStepKind
     597             :         compactionStepKind
     598             : )
     599             : 
     600             : // eventListener returns a Pebble EventListener that is installed on the replay
     601             : // database so that the replay runner has access to internal Pebble events.
     602           1 : func (r *Runner) eventListener() pebble.EventListener {
     603           1 :         var writeStallBegin time.Time
     604           1 :         var writeStallReason string
     605           1 :         l := pebble.EventListener{
     606           1 :                 BackgroundError: func(err error) {
     607           0 :                         r.err.Store(err)
     608           0 :                         r.cancel()
     609           0 :                 },
     610           1 :                 WriteStallBegin: func(info pebble.WriteStallBeginInfo) {
     611           1 :                         r.writeStallMetrics.Lock()
     612           1 :                         defer r.writeStallMetrics.Unlock()
     613           1 :                         writeStallReason = info.Reason
     614           1 :                         // Take just the first word of the reason.
     615           1 :                         if j := strings.IndexByte(writeStallReason, ' '); j != -1 {
     616           1 :                                 writeStallReason = writeStallReason[:j]
     617           1 :                         }
     618           1 :                         switch writeStallReason {
     619           1 :                         case "L0", "memtable":
     620           1 :                                 r.writeStallMetrics.countByReason[writeStallReason]++
     621           0 :                         default:
     622           0 :                                 panic(fmt.Sprintf("unrecognized write stall reason %q", info.Reason))
     623             :                         }
     624           1 :                         writeStallBegin = time.Now()
     625             :                 },
     626           1 :                 WriteStallEnd: func() {
     627           1 :                         r.writeStallMetrics.Lock()
     628           1 :                         defer r.writeStallMetrics.Unlock()
     629           1 :                         r.writeStallMetrics.durationByReason[writeStallReason] += time.Since(writeStallBegin)
     630           1 :                 },
     631           1 :                 CompactionBegin: func(_ pebble.CompactionInfo) {
     632           1 :                         r.compactionMu.Lock()
     633           1 :                         defer r.compactionMu.Unlock()
     634           1 :                         r.compactionMu.started++
     635           1 :                 },
     636           1 :                 CompactionEnd: func(_ pebble.CompactionInfo) {
     637           1 :                         // Keep track of the number of compactions that complete and notify
     638           1 :                         // anyone waiting for a compaction to complete. See the function
     639           1 :                         // nextCompactionCompletes for the corresponding receiver side.
     640           1 :                         r.compactionMu.Lock()
     641           1 :                         defer r.compactionMu.Unlock()
     642           1 :                         r.compactionMu.completed++
     643           1 :                         if r.compactionMu.ch != nil {
     644           1 :                                 // Signal that a compaction has completed.
     645           1 :                                 close(r.compactionMu.ch)
     646           1 :                                 r.compactionMu.ch = nil
     647           1 :                         }
     648             :                 },
     649             :         }
     650           1 :         l.EnsureDefaults(nil)
     651           1 :         return l
     652             : }
     653             : 
     654             : // applyWorkloadSteps runs in its own goroutine, reading workload steps off the
     655             : // r.steps channel and applying them to the test database.
     656           1 : func (r *Runner) applyWorkloadSteps(ctx context.Context) error {
     657           1 :         for {
     658           1 :                 var ok bool
     659           1 :                 var step workloadStep
     660           1 :                 select {
     661           0 :                 case <-ctx.Done():
     662           0 :                         return ctx.Err()
     663           1 :                 case step, ok = <-r.steps:
     664           1 :                         if !ok {
     665           1 :                                 // Exhausted the workload. Exit.
     666           1 :                                 close(r.stepsApplied)
     667           1 :                                 return nil
     668           1 :                         }
     669             :                 }
     670             : 
     671           1 :                 paceDur := r.Pacer.pace(r, step)
     672           1 :                 r.metrics.paceDurationNano.Add(uint64(paceDur))
     673           1 : 
     674           1 :                 switch step.kind {
     675           1 :                 case flushStepKind:
     676           1 :                         if err := step.flushBatch.Commit(&pebble.WriteOptions{Sync: false}); err != nil {
     677           0 :                                 return err
     678           0 :                         }
     679           1 :                         _, err := r.d.AsyncFlush()
     680           1 :                         if err != nil {
     681           0 :                                 return err
     682           0 :                         }
     683           1 :                         r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
     684           1 :                         r.stepsApplied <- step
     685           0 :                 case ingestStepKind:
     686           0 :                         if err := r.d.Ingest(step.tablesToIngest); err != nil {
     687           0 :                                 return err
     688           0 :                         }
     689           0 :                         r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
     690           0 :                         r.stepsApplied <- step
     691           1 :                 case compactionStepKind:
     692             :                         // No-op.
     693             :                         // TODO(jackson): Should we elide this earlier?
     694           0 :                 default:
     695           0 :                         panic("unreachable")
     696             :                 }
     697             :         }
     698             : }
     699             : 
     700             : // prepareWorkloadSteps runs in its own goroutine, reading the workload
     701             : // manifests in order to reconstruct the workload and prepare each step to be
     702             : // applied. It sends each workload step to the r.steps channel.
     703           1 : func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
     704           1 :         defer func() { close(r.steps) }()
     705             : 
     706           1 :         idx := r.workload.manifestIdx
     707           1 : 
     708           1 :         var cumulativeWriteBytes uint64
     709           1 :         var flushBufs flushBuffers
     710           1 :         var v *manifest.Version
     711           1 :         var previousVersion *manifest.Version
     712           1 :         var bve manifest.BulkVersionEdit
     713           1 :         bve.AddedByFileNum = make(map[base.FileNum]*manifest.FileMetadata)
     714           1 :         applyVE := func(ve *manifest.VersionEdit) error {
     715           1 :                 return bve.Accumulate(ve)
     716           1 :         }
     717           1 :         currentVersion := func() (*manifest.Version, error) {
     718           1 :                 var err error
     719           1 :                 v, err = bve.Apply(v,
     720           1 :                         r.Opts.Comparer,
     721           1 :                         r.Opts.FlushSplitBytes,
     722           1 :                         r.Opts.Experimental.ReadCompactionRate)
     723           1 :                 bve = manifest.BulkVersionEdit{AddedByFileNum: bve.AddedByFileNum}
     724           1 :                 return v, err
     725           1 :         }
     726             : 
     727           1 :         for ; idx < len(r.workload.manifests); idx++ {
     728           1 :                 if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
     729           0 :                         break
     730             :                 }
     731             : 
     732           1 :                 err := func() error {
     733           1 :                         manifestName := r.workload.manifests[idx]
     734           1 :                         f, err := r.WorkloadFS.Open(r.WorkloadFS.PathJoin(r.WorkloadPath, manifestName))
     735           1 :                         if err != nil {
     736           0 :                                 return err
     737           0 :                         }
     738           1 :                         defer f.Close()
     739           1 : 
     740           1 :                         rr := record.NewReader(f, 0 /* logNum */)
     741           1 :                         // A manifest's first record always holds the initial version state.
     742           1 :                         // If this is the first manifest we're examining, we load it in
     743           1 :                         // order to seed `metas` with the file metadata of the existing
     744           1 :                         // files. Otherwise, we can skip it because we already know all the
     745           1 :                         // file metadatas up to this point.
     746           1 :                         rec, err := rr.Next()
     747           1 :                         if err != nil {
     748           0 :                                 return err
     749           0 :                         }
     750           1 :                         if idx == r.workload.manifestIdx {
     751           1 :                                 var ve manifest.VersionEdit
     752           1 :                                 if err := ve.Decode(rec); err != nil {
     753           0 :                                         return err
     754           0 :                                 }
     755           1 :                                 if err := applyVE(&ve); err != nil {
     756           0 :                                         return err
     757           0 :                                 }
     758             :                         }
     759             : 
     760             :                         // Read the remaining of the manifests version edits, one-by-one.
     761           1 :                         for {
     762           1 :                                 rec, err := rr.Next()
     763           1 :                                 if err == io.EOF || record.IsInvalidRecord(err) {
     764           1 :                                         break
     765           1 :                                 } else if err != nil {
     766           0 :                                         return err
     767           0 :                                 }
     768           1 :                                 var ve manifest.VersionEdit
     769           1 :                                 if err = ve.Decode(rec); err == io.EOF || record.IsInvalidRecord(err) {
     770           0 :                                         break
     771           1 :                                 } else if err != nil {
     772           0 :                                         return err
     773           0 :                                 }
     774           1 :                                 if err := applyVE(&ve); err != nil {
     775           0 :                                         return err
     776           0 :                                 }
     777           1 :                                 if idx == r.workload.manifestIdx && rr.Offset() <= r.workload.manifestOff {
     778           1 :                                         // The record rec began at an offset strictly less than
     779           1 :                                         // rr.Offset(), which means it's strictly less than
     780           1 :                                         // r.workload.manifestOff, and we should skip it.
     781           1 :                                         continue
     782             :                                 }
     783           1 :                                 if len(ve.NewFiles) == 0 && len(ve.DeletedFiles) == 0 {
     784           1 :                                         // Skip WAL rotations and other events that don't affect the
     785           1 :                                         // files of the LSM.
     786           1 :                                         continue
     787             :                                 }
     788             : 
     789           1 :                                 s := workloadStep{ve: ve}
     790           1 :                                 if len(ve.DeletedFiles) > 0 {
     791           1 :                                         // If a version edit deletes files, we assume it's a compaction.
     792           1 :                                         s.kind = compactionStepKind
     793           1 :                                 } else {
     794           1 :                                         // Default to ingest. If any files have unequal
     795           1 :                                         // smallest,largest sequence numbers, we'll update this to a
     796           1 :                                         // flush.
     797           1 :                                         s.kind = ingestStepKind
     798           1 :                                 }
     799           1 :                                 var newFiles []base.DiskFileNum
     800           1 :                                 for _, nf := range ve.NewFiles {
     801           1 :                                         newFiles = append(newFiles, nf.Meta.FileBacking.DiskFileNum)
     802           1 :                                         if s.kind == ingestStepKind && (nf.Meta.SmallestSeqNum != nf.Meta.LargestSeqNum || nf.Level != 0) {
     803           1 :                                                 s.kind = flushStepKind
     804           1 :                                         }
     805             :                                 }
     806             :                                 // Add the current reference *Version to the step. This provides
     807             :                                 // access to, for example, the read-amplification of the
     808             :                                 // database at this point when the workload was collected. This
     809             :                                 // can be useful for pacing.
     810           1 :                                 if s.v, err = currentVersion(); err != nil {
     811           0 :                                         return err
     812           0 :                                 }
     813             :                                 // On the first time through, we set the previous version to the current
     814             :                                 // version otherwise we set it to the actual previous version.
     815           1 :                                 if previousVersion == nil {
     816           1 :                                         previousVersion = s.v
     817           1 :                                 }
     818           1 :                                 s.pv = previousVersion
     819           1 :                                 previousVersion = s.v
     820           1 : 
     821           1 :                                 // It's possible that the workload collector captured this
     822           1 :                                 // version edit, but wasn't able to collect all of the
     823           1 :                                 // corresponding sstables before being terminated.
     824           1 :                                 if s.kind == flushStepKind || s.kind == ingestStepKind {
     825           1 :                                         for _, fileNum := range newFiles {
     826           1 :                                                 if _, ok := r.workload.sstables[base.PhysicalTableFileNum(fileNum)]; !ok {
     827           0 :                                                         // TODO(jackson,leon): This isn't exactly an error
     828           0 :                                                         // condition. Give this more thought; do we want to
     829           0 :                                                         // require graceful exiting of workload collection,
     830           0 :                                                         // such that the last version edit must have had its
     831           0 :                                                         // corresponding sstables collected?
     832           0 :                                                         return errors.Newf("sstable %s not found", fileNum)
     833           0 :                                                 }
     834             :                                         }
     835             :                                 }
     836             : 
     837           1 :                                 switch s.kind {
     838           1 :                                 case flushStepKind:
     839           1 :                                         // Load all of the flushed sstables' keys into a batch.
     840           1 :                                         s.flushBatch = r.d.NewBatch()
     841           1 :                                         if err := loadFlushedSSTableKeys(s.flushBatch, r.WorkloadFS, r.WorkloadPath, newFiles, r.readerOpts, &flushBufs); err != nil {
     842           0 :                                                 return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset())
     843           0 :                                         }
     844           1 :                                         cumulativeWriteBytes += uint64(s.flushBatch.Len())
     845           0 :                                 case ingestStepKind:
     846           0 :                                         // Copy the ingested sstables into a staging area within the
     847           0 :                                         // run dir. This is necessary for two reasons:
     848           0 :                                         //  a) Ingest will remove the source file, and we don't want
     849           0 :                                         //     to mutate the workload.
     850           0 :                                         //  b) If the workload stored on another volume, Ingest
     851           0 :                                         //     would need to fall back to copying the file since
     852           0 :                                         //     it's not possible to link across volumes. The true
     853           0 :                                         //     workload likely linked the file. Staging the file
     854           0 :                                         //     ahead of time ensures that we're able to Link the
     855           0 :                                         //     file like the original workload did.
     856           0 :                                         for _, fileNum := range newFiles {
     857           0 :                                                 src := base.MakeFilepath(r.WorkloadFS, r.WorkloadPath, base.FileTypeTable, fileNum)
     858           0 :                                                 dst := base.MakeFilepath(r.Opts.FS, r.stagingDir, base.FileTypeTable, fileNum)
     859           0 :                                                 if err := vfs.CopyAcrossFS(r.WorkloadFS, src, r.Opts.FS, dst); err != nil {
     860           0 :                                                         return errors.Wrapf(err, "ingest in %q at offset %d", manifestName, rr.Offset())
     861           0 :                                                 }
     862           0 :                                                 finfo, err := r.Opts.FS.Stat(dst)
     863           0 :                                                 if err != nil {
     864           0 :                                                         return errors.Wrapf(err, "stating %q", dst)
     865           0 :                                                 }
     866           0 :                                                 cumulativeWriteBytes += uint64(finfo.Size())
     867           0 :                                                 s.tablesToIngest = append(s.tablesToIngest, dst)
     868             :                                         }
     869           1 :                                 case compactionStepKind:
     870             :                                         // Nothing to do.
     871             :                                 }
     872           1 :                                 s.cumulativeWriteBytes = cumulativeWriteBytes
     873           1 : 
     874           1 :                                 select {
     875           0 :                                 case <-ctx.Done():
     876           0 :                                         return ctx.Err()
     877           1 :                                 case r.steps <- s:
     878             :                                 }
     879             : 
     880           1 :                                 if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
     881           0 :                                         break
     882             :                                 }
     883             :                         }
     884           1 :                         return nil
     885             :                 }()
     886           1 :                 if err != nil {
     887           0 :                         return err
     888           0 :                 }
     889             :         }
     890           1 :         return nil
     891             : }
     892             : 
     893             : // findWorkloadFiles finds all manifests and tables in the provided path on fs.
     894             : func findWorkloadFiles(
     895             :         path string, fs vfs.FS,
     896           1 : ) (manifests []string, sstables map[base.FileNum]struct{}, err error) {
     897           1 :         dirents, err := fs.List(path)
     898           1 :         if err != nil {
     899           0 :                 return nil, nil, err
     900           0 :         }
     901           1 :         sstables = make(map[base.FileNum]struct{})
     902           1 :         for _, dirent := range dirents {
     903           1 :                 typ, fileNum, ok := base.ParseFilename(fs, dirent)
     904           1 :                 if !ok {
     905           1 :                         continue
     906             :                 }
     907           1 :                 switch typ {
     908           1 :                 case base.FileTypeManifest:
     909           1 :                         manifests = append(manifests, dirent)
     910           1 :                 case base.FileTypeTable:
     911           1 :                         sstables[base.PhysicalTableFileNum(fileNum)] = struct{}{}
     912             :                 }
     913             :         }
     914           1 :         if len(manifests) == 0 {
     915           1 :                 return nil, nil, errors.Newf("no manifests found")
     916           1 :         }
     917           1 :         sort.Strings(manifests)
     918           1 :         return manifests, sstables, err
     919             : }
     920             : 
     921             : // findManifestStart takes a database directory and FS containing the initial
     922             : // database state that a workload will be run against, and a list of a workloads
     923             : // manifests. It examines the database's current manifest to determine where
     924             : // workload replay should begin, so as to not duplicate already-applied version
     925             : // edits.
     926             : //
     927             : // It returns the index of the starting manifest, and the database's current
     928             : // offset within the manifest.
     929             : func findManifestStart(
     930             :         dbDir string, dbFS vfs.FS, manifests []string,
     931           1 : ) (index int, offset int64, err error) {
     932           1 :         // Identify the database's current manifest.
     933           1 :         dbDesc, err := pebble.Peek(dbDir, dbFS)
     934           1 :         if err != nil {
     935           0 :                 return 0, 0, err
     936           0 :         }
     937           1 :         dbManifest := dbFS.PathBase(dbDesc.ManifestFilename)
     938           1 :         // If there is no initial database state, begin workload replay from the
     939           1 :         // beginning of the first manifest.
     940           1 :         if !dbDesc.Exists {
     941           1 :                 return 0, 0, nil
     942           1 :         }
     943           1 :         for index = 0; index < len(manifests); index++ {
     944           1 :                 if manifests[index] == dbManifest {
     945           1 :                         break
     946             :                 }
     947             :         }
     948           1 :         if index == len(manifests) {
     949           1 :                 // The initial database state has a manifest that does not appear within
     950           1 :                 // the workload's set of manifests. This is possible if we began
     951           1 :                 // recording the workload at the same time as a manifest rotation, but
     952           1 :                 // more likely we're applying a workload to a different initial database
     953           1 :                 // state than the one from which the workload was collected. Either way,
     954           1 :                 // start from the beginning of the first manifest.
     955           1 :                 return 0, 0, nil
     956           1 :         }
     957             :         // Find the initial database's offset within the manifest.
     958           1 :         info, err := dbFS.Stat(dbFS.PathJoin(dbDir, dbManifest))
     959           1 :         if err != nil {
     960           0 :                 return 0, 0, err
     961           0 :         }
     962           1 :         return index, info.Size(), nil
     963             : }
     964             : 
     965             : // loadFlushedSSTableKeys copies keys from the sstables specified by `fileNums`
     966             : // in the directory specified by `path` into the provided the batch. Keys are
     967             : // applied to the batch in the order dictated by their sequence numbers within
     968             : // the sstables, ensuring the relative relationship between sequence numbers is
     969             : // maintained.
     970             : //
     971             : // Preserving the relative relationship between sequence numbers is not strictly
     972             : // necessary, but it ensures we accurately exercise some microoptimizations (eg,
     973             : // detecting user key changes by descending trailer). There may be additional
     974             : // dependencies on sequence numbers in the future.
     975             : func loadFlushedSSTableKeys(
     976             :         b *pebble.Batch,
     977             :         fs vfs.FS,
     978             :         path string,
     979             :         fileNums []base.DiskFileNum,
     980             :         readOpts sstable.ReaderOptions,
     981             :         bufs *flushBuffers,
     982           1 : ) error {
     983           1 :         // Load all the keys across all the sstables.
     984           1 :         for _, fileNum := range fileNums {
     985           1 :                 if err := func() error {
     986           1 :                         filePath := base.MakeFilepath(fs, path, base.FileTypeTable, fileNum)
     987           1 :                         f, err := fs.Open(filePath)
     988           1 :                         if err != nil {
     989           0 :                                 return err
     990           0 :                         }
     991           1 :                         readable, err := sstable.NewSimpleReadable(f)
     992           1 :                         if err != nil {
     993           0 :                                 f.Close()
     994           0 :                                 return err
     995           0 :                         }
     996           1 :                         r, err := sstable.NewReader(readable, readOpts)
     997           1 :                         if err != nil {
     998           0 :                                 return err
     999           0 :                         }
    1000           1 :                         defer r.Close()
    1001           1 : 
    1002           1 :                         // Load all the point keys.
    1003           1 :                         iter, err := r.NewIter(sstable.NoTransforms, nil, nil)
    1004           1 :                         if err != nil {
    1005           0 :                                 return err
    1006           0 :                         }
    1007           1 :                         defer iter.Close()
    1008           1 :                         for kv := iter.First(); kv != nil; kv = iter.Next() {
    1009           1 :                                 var key flushedKey
    1010           1 :                                 key.Trailer = kv.K.Trailer
    1011           1 :                                 bufs.alloc, key.UserKey = bufs.alloc.Copy(kv.K.UserKey)
    1012           1 :                                 if v, callerOwned, err := kv.Value(nil); err != nil {
    1013           0 :                                         return err
    1014           1 :                                 } else if callerOwned {
    1015           0 :                                         key.value = v
    1016           1 :                                 } else {
    1017           1 :                                         bufs.alloc, key.value = bufs.alloc.Copy(v)
    1018           1 :                                 }
    1019           1 :                                 bufs.keys = append(bufs.keys, key)
    1020             :                         }
    1021             : 
    1022             :                         // Load all the range tombstones.
    1023           1 :                         if iter, err := r.NewRawRangeDelIter(sstable.NoTransforms); err != nil {
    1024           0 :                                 return err
    1025           1 :                         } else if iter != nil {
    1026           1 :                                 defer iter.Close()
    1027           1 :                                 s, err := iter.First()
    1028           1 :                                 for ; s != nil; s, err = iter.Next() {
    1029           1 :                                         if err := rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
    1030           1 :                                                 var key flushedKey
    1031           1 :                                                 key.Trailer = k.Trailer
    1032           1 :                                                 bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
    1033           1 :                                                 bufs.alloc, key.value = bufs.alloc.Copy(v)
    1034           1 :                                                 bufs.keys = append(bufs.keys, key)
    1035           1 :                                                 return nil
    1036           1 :                                         }); err != nil {
    1037           0 :                                                 return err
    1038           0 :                                         }
    1039             :                                 }
    1040           1 :                                 if err != nil {
    1041           0 :                                         return err
    1042           0 :                                 }
    1043             :                         }
    1044             : 
    1045             :                         // Load all the range keys.
    1046           1 :                         if iter, err := r.NewRawRangeKeyIter(sstable.NoTransforms); err != nil {
    1047           0 :                                 return err
    1048           1 :                         } else if iter != nil {
    1049           1 :                                 defer iter.Close()
    1050           1 :                                 s, err := iter.First()
    1051           1 :                                 for ; s != nil; s, err = iter.Next() {
    1052           1 :                                         if err := rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
    1053           1 :                                                 var key flushedKey
    1054           1 :                                                 key.Trailer = k.Trailer
    1055           1 :                                                 bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
    1056           1 :                                                 bufs.alloc, key.value = bufs.alloc.Copy(v)
    1057           1 :                                                 bufs.keys = append(bufs.keys, key)
    1058           1 :                                                 return nil
    1059           1 :                                         }); err != nil {
    1060           0 :                                                 return err
    1061           0 :                                         }
    1062             :                                 }
    1063           1 :                                 if err != nil {
    1064           0 :                                         return err
    1065           0 :                                 }
    1066             :                         }
    1067           1 :                         return nil
    1068           0 :                 }(); err != nil {
    1069           0 :                         return err
    1070           0 :                 }
    1071             :         }
    1072             : 
    1073             :         // Sort the flushed keys by their sequence numbers so that we can apply them
    1074             :         // to the batch in the same order, maintaining the relative relationship
    1075             :         // between keys.
    1076             :         // NB: We use a stable sort so that keys corresponding to span fragments
    1077             :         // (eg, range tombstones and range keys) have a deterministic ordering for
    1078             :         // testing.
    1079           1 :         sort.Stable(bufs.keys)
    1080           1 : 
    1081           1 :         // Add the keys to the batch in the order they were committed when the
    1082           1 :         // workload was captured.
    1083           1 :         for i := 0; i < len(bufs.keys); i++ {
    1084           1 :                 var err error
    1085           1 :                 switch bufs.keys[i].Kind() {
    1086           1 :                 case base.InternalKeyKindDelete:
    1087           1 :                         err = b.Delete(bufs.keys[i].UserKey, nil)
    1088           0 :                 case base.InternalKeyKindDeleteSized:
    1089           0 :                         v, _ := binary.Uvarint(bufs.keys[i].value)
    1090           0 :                         // Batch.DeleteSized takes just the length of the value being
    1091           0 :                         // deleted and adds the key's length to derive the overall entry
    1092           0 :                         // size of the value being deleted. This has already been done to
    1093           0 :                         // the key we're reading from the sstable, so we must subtract the
    1094           0 :                         // key length from the encoded value before calling b.DeleteSized,
    1095           0 :                         // which will again add the key length before encoding.
    1096           0 :                         err = b.DeleteSized(bufs.keys[i].UserKey, uint32(v-uint64(len(bufs.keys[i].UserKey))), nil)
    1097           1 :                 case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete:
    1098           1 :                         err = b.Set(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
    1099           0 :                 case base.InternalKeyKindMerge:
    1100           0 :                         err = b.Merge(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
    1101           1 :                 case base.InternalKeyKindSingleDelete:
    1102           1 :                         err = b.SingleDelete(bufs.keys[i].UserKey, nil)
    1103           1 :                 case base.InternalKeyKindRangeDelete:
    1104           1 :                         err = b.DeleteRange(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
    1105           1 :                 case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
    1106           1 :                         s, err := rangekey.Decode(bufs.keys[i].InternalKey, bufs.keys[i].value, nil)
    1107           1 :                         if err != nil {
    1108           0 :                                 return err
    1109           0 :                         }
    1110           1 :                         if len(s.Keys) != 1 {
    1111           0 :                                 return errors.Newf("range key span unexpectedly contains %d keys", len(s.Keys))
    1112           0 :                         }
    1113           1 :                         switch bufs.keys[i].Kind() {
    1114           1 :                         case base.InternalKeyKindRangeKeySet:
    1115           1 :                                 err = b.RangeKeySet(s.Start, s.End, s.Keys[0].Suffix, s.Keys[0].Value, nil)
    1116           1 :                         case base.InternalKeyKindRangeKeyUnset:
    1117           1 :                                 err = b.RangeKeyUnset(s.Start, s.End, s.Keys[0].Suffix, nil)
    1118           1 :                         case base.InternalKeyKindRangeKeyDelete:
    1119           1 :                                 err = b.RangeKeyDelete(s.Start, s.End, nil)
    1120           0 :                         default:
    1121           0 :                                 err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
    1122             :                         }
    1123           1 :                         if err != nil {
    1124           0 :                                 return err
    1125           0 :                         }
    1126           0 :                 default:
    1127           0 :                         err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
    1128             :                 }
    1129           1 :                 if err != nil {
    1130           0 :                         return err
    1131           0 :                 }
    1132             :         }
    1133             : 
    1134             :         // Done with the flushBuffers. Reset.
    1135           1 :         bufs.keys = bufs.keys[:0]
    1136           1 :         return nil
    1137             : }
    1138             : 
    1139             : type flushBuffers struct {
    1140             :         keys  flushedKeysByTrailer
    1141             :         alloc bytealloc.A
    1142             : }
    1143             : 
    1144             : type flushedKeysByTrailer []flushedKey
    1145             : 
    1146           1 : func (s flushedKeysByTrailer) Len() int           { return len(s) }
    1147           1 : func (s flushedKeysByTrailer) Less(i, j int) bool { return s[i].Trailer < s[j].Trailer }
    1148           1 : func (s flushedKeysByTrailer) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
    1149             : 
    1150             : type flushedKey struct {
    1151             :         base.InternalKey
    1152             :         value []byte
    1153             : }

Generated by: LCOV version 1.14