Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package replay implements collection and replaying of compaction benchmarking
6 : // workloads. A workload is a collection of flushed and ingested sstables, along
7 : // with the corresponding manifests describing the order and grouping with which
8 : // they were applied. Replaying a workload flushes and ingests the same keys and
9 : // sstables to reproduce the write workload for the purpose of evaluating
10 : // compaction heuristics.
11 : package replay
12 :
13 : import (
14 : "context"
15 : "encoding/binary"
16 : "fmt"
17 : "io"
18 : "os"
19 : "sort"
20 : "strings"
21 : "sync"
22 : "sync/atomic"
23 : "time"
24 :
25 : "github.com/cockroachdb/errors"
26 : "github.com/cockroachdb/pebble"
27 : "github.com/cockroachdb/pebble/internal/base"
28 : "github.com/cockroachdb/pebble/internal/bytealloc"
29 : "github.com/cockroachdb/pebble/internal/manifest"
30 : "github.com/cockroachdb/pebble/internal/rangedel"
31 : "github.com/cockroachdb/pebble/internal/rangekey"
32 : "github.com/cockroachdb/pebble/record"
33 : "github.com/cockroachdb/pebble/sstable"
34 : "github.com/cockroachdb/pebble/vfs"
35 : "golang.org/x/perf/benchfmt"
36 : "golang.org/x/sync/errgroup"
37 : )
38 :
39 : // A Pacer paces replay of a workload, determining when to apply the next
40 : // incoming write.
41 : type Pacer interface {
42 : pace(r *Runner, step workloadStep) time.Duration
43 : }
44 :
45 : // computeReadAmp calculates the read amplification from a manifest.Version
46 0 : func computeReadAmp(v *manifest.Version) int {
47 0 : refRAmp := v.L0Sublevels.ReadAmplification()
48 0 : for _, lvl := range v.Levels[1:] {
49 0 : if !lvl.Empty() {
50 0 : refRAmp++
51 0 : }
52 : }
53 0 : return refRAmp
54 : }
55 :
56 : // waitForReadAmpLE is a common function used by PaceByReferenceReadAmp and
57 : // PaceByFixedReadAmp to wait on the dbMetricsNotifier condition variable if the
58 : // read amplification observed is greater than the specified target (refRAmp).
59 1 : func waitForReadAmpLE(r *Runner, rAmp int) {
60 1 : r.dbMetricsCond.L.Lock()
61 1 : m := r.dbMetrics
62 1 : ra := m.ReadAmp()
63 1 : for ra > rAmp {
64 1 : r.dbMetricsCond.Wait()
65 1 : ra = r.dbMetrics.ReadAmp()
66 1 : }
67 1 : r.dbMetricsCond.L.Unlock()
68 : }
69 :
70 : // Unpaced implements Pacer by applying each new write as soon as possible. It
71 : // may be useful for examining performance under high read amplification.
72 : type Unpaced struct{}
73 :
74 1 : func (Unpaced) pace(*Runner, workloadStep) (d time.Duration) { return }
75 :
76 : // PaceByReferenceReadAmp implements Pacer by applying each new write following
77 : // the collected workloads read amplification.
78 : type PaceByReferenceReadAmp struct{}
79 :
80 0 : func (PaceByReferenceReadAmp) pace(r *Runner, w workloadStep) time.Duration {
81 0 : startTime := time.Now()
82 0 : refRAmp := computeReadAmp(w.pv)
83 0 : waitForReadAmpLE(r, refRAmp)
84 0 : return time.Since(startTime)
85 0 : }
86 :
87 : // PaceByFixedReadAmp implements Pacer by applying each new write following a
88 : // fixed read amplification.
89 : type PaceByFixedReadAmp int
90 :
91 1 : func (pra PaceByFixedReadAmp) pace(r *Runner, _ workloadStep) time.Duration {
92 1 : startTime := time.Now()
93 1 : waitForReadAmpLE(r, int(pra))
94 1 : return time.Since(startTime)
95 1 : }
96 :
97 : // Metrics holds the various statistics on a replay run and its performance.
98 : type Metrics struct {
99 : CompactionCounts struct {
100 : Total int64
101 : Default int64
102 : DeleteOnly int64
103 : ElisionOnly int64
104 : Move int64
105 : Read int64
106 : TombstoneDensity int64
107 : Rewrite int64
108 : Copy int64
109 : MultiLevel int64
110 : }
111 : EstimatedDebt SampledMetric
112 : Final *pebble.Metrics
113 : Ingest struct {
114 : BytesIntoL0 uint64
115 : // BytesWeightedByLevel is calculated as the number of bytes ingested
116 : // into a level multiplied by the level's distance from the bottommost
117 : // level (L6), summed across all levels. It can be used to guage how
118 : // effective heuristics are at ingesting files into lower levels, saving
119 : // write amplification.
120 : BytesWeightedByLevel uint64
121 : }
122 : // PaceDuration is the time waiting for the pacer to allow the workload to
123 : // continue.
124 : PaceDuration time.Duration
125 : ReadAmp SampledMetric
126 : // QuiesceDuration is the time between completing application of the workload and
127 : // compactions quiescing.
128 : QuiesceDuration time.Duration
129 : TombstoneCount SampledMetric
130 : // TotalSize holds the total size of the database, sampled after each
131 : // workload step.
132 : TotalSize SampledMetric
133 : TotalWriteAmp float64
134 : WorkloadDuration time.Duration
135 : WriteBytes uint64
136 : WriteStalls map[string]int
137 : WriteStallsDuration map[string]time.Duration
138 : WriteThroughput SampledMetric
139 : }
140 :
141 : // Plot holds an ascii plot and its name.
142 : type Plot struct {
143 : Name string
144 : Plot string
145 : }
146 :
147 : // Plots returns a slice of ascii plots describing metrics change over time.
148 0 : func (m *Metrics) Plots(width, height int) []Plot {
149 0 : const scaleMB = 1.0 / float64(1<<20)
150 0 : return []Plot{
151 0 : {Name: "Write throughput (MB/s)", Plot: m.WriteThroughput.PlotIncreasingPerSec(width, height, scaleMB)},
152 0 : {Name: "Estimated compaction debt (MB)", Plot: m.EstimatedDebt.Plot(width, height, scaleMB)},
153 0 : {Name: "Total database size (MB)", Plot: m.TotalSize.Plot(width, height, scaleMB)},
154 0 : {Name: "ReadAmp", Plot: m.ReadAmp.Plot(width, height, 1.0)},
155 0 : }
156 0 : }
157 :
158 : // WriteBenchmarkString writes the metrics in the form of a series of
159 : // 'Benchmark' lines understandable by benchstat.
160 1 : func (m *Metrics) WriteBenchmarkString(name string, w io.Writer) error {
161 1 : type benchmarkSection struct {
162 1 : label string
163 1 : values []benchfmt.Value
164 1 : }
165 1 : groups := []benchmarkSection{
166 1 : {label: "CompactionCounts", values: []benchfmt.Value{
167 1 : {Value: float64(m.CompactionCounts.Total), Unit: "compactions"},
168 1 : {Value: float64(m.CompactionCounts.Default), Unit: "default"},
169 1 : {Value: float64(m.CompactionCounts.DeleteOnly), Unit: "delete"},
170 1 : {Value: float64(m.CompactionCounts.ElisionOnly), Unit: "elision"},
171 1 : {Value: float64(m.CompactionCounts.Move), Unit: "move"},
172 1 : {Value: float64(m.CompactionCounts.Read), Unit: "read"},
173 1 : {Value: float64(m.CompactionCounts.Rewrite), Unit: "rewrite"},
174 1 : {Value: float64(m.CompactionCounts.Copy), Unit: "copy"},
175 1 : {Value: float64(m.CompactionCounts.MultiLevel), Unit: "multilevel"},
176 1 : }},
177 1 : // Total database sizes sampled after every workload step and
178 1 : // compaction. This can be used to evaluate the relative LSM space
179 1 : // amplification between runs of the same workload. Calculating the true
180 1 : // space amplification continuously is prohibitvely expensive (it
181 1 : // requires totally compacting a copy of the LSM).
182 1 : {label: "DatabaseSize/mean", values: []benchfmt.Value{
183 1 : {Value: m.TotalSize.Mean(), Unit: "bytes"},
184 1 : }},
185 1 : {label: "DatabaseSize/max", values: []benchfmt.Value{
186 1 : {Value: float64(m.TotalSize.Max()), Unit: "bytes"},
187 1 : }},
188 1 : // Time applying the workload and time waiting for compactions to
189 1 : // quiesce after the workload has completed.
190 1 : {label: "DurationWorkload", values: []benchfmt.Value{
191 1 : {Value: m.WorkloadDuration.Seconds(), Unit: "sec/op"},
192 1 : }},
193 1 : {label: "DurationQuiescing", values: []benchfmt.Value{
194 1 : {Value: m.QuiesceDuration.Seconds(), Unit: "sec/op"},
195 1 : }},
196 1 : {label: "DurationPaceDelay", values: []benchfmt.Value{
197 1 : {Value: m.PaceDuration.Seconds(), Unit: "sec/op"},
198 1 : }},
199 1 : // Estimated compaction debt, sampled after every workload step and
200 1 : // compaction.
201 1 : {label: "EstimatedDebt/mean", values: []benchfmt.Value{
202 1 : {Value: m.EstimatedDebt.Mean(), Unit: "bytes"},
203 1 : }},
204 1 : {label: "EstimatedDebt/max", values: []benchfmt.Value{
205 1 : {Value: float64(m.EstimatedDebt.Max()), Unit: "bytes"},
206 1 : }},
207 1 : {label: "FlushUtilization", values: []benchfmt.Value{
208 1 : {Value: m.Final.Flush.WriteThroughput.Utilization(), Unit: "util"},
209 1 : }},
210 1 : {label: "IngestedIntoL0", values: []benchfmt.Value{
211 1 : {Value: float64(m.Ingest.BytesIntoL0), Unit: "bytes"},
212 1 : }},
213 1 : {label: "IngestWeightedByLevel", values: []benchfmt.Value{
214 1 : {Value: float64(m.Ingest.BytesWeightedByLevel), Unit: "bytes"},
215 1 : }},
216 1 : {label: "ReadAmp/mean", values: []benchfmt.Value{
217 1 : {Value: m.ReadAmp.Mean(), Unit: "files"},
218 1 : }},
219 1 : {label: "ReadAmp/max", values: []benchfmt.Value{
220 1 : {Value: float64(m.ReadAmp.Max()), Unit: "files"},
221 1 : }},
222 1 : {label: "TombstoneCount/mean", values: []benchfmt.Value{
223 1 : {Value: m.TombstoneCount.Mean(), Unit: "tombstones"},
224 1 : }},
225 1 : {label: "TombstoneCount/max", values: []benchfmt.Value{
226 1 : {Value: float64(m.TombstoneCount.Max()), Unit: "tombstones"},
227 1 : }},
228 1 : {label: "Throughput", values: []benchfmt.Value{
229 1 : {Value: float64(m.WriteBytes) / (m.WorkloadDuration + m.QuiesceDuration).Seconds(), Unit: "B/s"},
230 1 : }},
231 1 : {label: "WriteAmp", values: []benchfmt.Value{
232 1 : {Value: float64(m.TotalWriteAmp), Unit: "wamp"},
233 1 : }},
234 1 : }
235 1 :
236 1 : for _, reason := range []string{"L0", "memtable"} {
237 1 : groups = append(groups, benchmarkSection{
238 1 : label: fmt.Sprintf("WriteStall/%s", reason),
239 1 : values: []benchfmt.Value{
240 1 : {Value: float64(m.WriteStalls[reason]), Unit: "stalls"},
241 1 : {Value: float64(m.WriteStallsDuration[reason].Seconds()), Unit: "stallsec/op"},
242 1 : },
243 1 : })
244 1 : }
245 :
246 1 : bw := benchfmt.NewWriter(w)
247 1 : for _, grp := range groups {
248 1 : err := bw.Write(&benchfmt.Result{
249 1 : Name: benchfmt.Name(fmt.Sprintf("BenchmarkReplay/%s/%s", name, grp.label)),
250 1 : Iters: 1,
251 1 : Values: grp.values,
252 1 : })
253 1 : if err != nil {
254 0 : return err
255 0 : }
256 : }
257 1 : return nil
258 : }
259 :
260 : // Runner runs a captured workload against a test database, collecting
261 : // metrics on performance.
262 : type Runner struct {
263 : RunDir string
264 : WorkloadFS vfs.FS
265 : WorkloadPath string
266 : Pacer Pacer
267 : Opts *pebble.Options
268 : MaxWriteBytes uint64
269 :
270 : // Internal state.
271 :
272 : d *pebble.DB
273 : // dbMetrics and dbMetricsCond work in unison to update the metrics and
274 : // notify (broadcast) to any waiting clients that metrics have been updated.
275 : dbMetrics *pebble.Metrics
276 : dbMetricsCond sync.Cond
277 : cancel func()
278 : err atomic.Value
279 : errgroup *errgroup.Group
280 : readerOpts sstable.ReaderOptions
281 : stagingDir string
282 : steps chan workloadStep
283 : stepsApplied chan workloadStep
284 :
285 : metrics struct {
286 : estimatedDebt SampledMetric
287 : quiesceDuration time.Duration
288 : readAmp SampledMetric
289 : tombstoneCount SampledMetric
290 : totalSize SampledMetric
291 : paceDurationNano atomic.Uint64
292 : workloadDuration time.Duration
293 : writeBytes atomic.Uint64
294 : writeThroughput SampledMetric
295 : }
296 : writeStallMetrics struct {
297 : sync.Mutex
298 : countByReason map[string]int
299 : durationByReason map[string]time.Duration
300 : }
301 : // compactionMu holds state for tracking the number of compactions
302 : // started and completed and waking waiting goroutines when a new compaction
303 : // completes. See nextCompactionCompletes.
304 : compactionMu struct {
305 : sync.Mutex
306 : ch chan struct{}
307 : started int64
308 : completed int64
309 : }
310 : workload struct {
311 : manifests []string
312 : // manifest{Idx,Off} record the starting position of the workload
313 : // relative to the initial database state.
314 : manifestIdx int
315 : manifestOff int64
316 : // sstables records the set of captured workload sstables by file num.
317 : sstables map[base.FileNum]struct{}
318 : }
319 : }
320 :
321 : // Run begins executing the workload and returns.
322 : //
323 : // The workload application will respect the provided context's cancellation.
324 1 : func (r *Runner) Run(ctx context.Context) error {
325 1 : // Find the workload start relative to the RunDir's existing database state.
326 1 : // A prefix of the workload's manifest edits are expected to have already
327 1 : // been applied to the checkpointed existing database state.
328 1 : var err error
329 1 : r.workload.manifests, r.workload.sstables, err = findWorkloadFiles(r.WorkloadPath, r.WorkloadFS)
330 1 : if err != nil {
331 0 : return err
332 0 : }
333 1 : r.workload.manifestIdx, r.workload.manifestOff, err = findManifestStart(r.RunDir, r.Opts.FS, r.workload.manifests)
334 1 : if err != nil {
335 0 : return err
336 0 : }
337 :
338 : // Set up a staging dir for files that will be ingested.
339 1 : r.stagingDir = r.Opts.FS.PathJoin(r.RunDir, "staging")
340 1 : if err := r.Opts.FS.MkdirAll(r.stagingDir, os.ModePerm); err != nil {
341 0 : return err
342 0 : }
343 :
344 1 : r.dbMetricsCond = sync.Cond{
345 1 : L: &sync.Mutex{},
346 1 : }
347 1 :
348 1 : // Extend the user-provided Options with extensions necessary for replay
349 1 : // mechanics.
350 1 : r.compactionMu.ch = make(chan struct{})
351 1 : r.Opts.AddEventListener(r.eventListener())
352 1 : r.writeStallMetrics.countByReason = make(map[string]int)
353 1 : r.writeStallMetrics.durationByReason = make(map[string]time.Duration)
354 1 : r.Opts.EnsureDefaults()
355 1 : r.readerOpts = r.Opts.MakeReaderOptions()
356 1 : r.Opts.DisableWAL = true
357 1 : r.d, err = pebble.Open(r.RunDir, r.Opts)
358 1 : if err != nil {
359 0 : return err
360 0 : }
361 :
362 1 : r.dbMetrics = r.d.Metrics()
363 1 :
364 1 : // Use a buffered channel to allow the prepareWorkloadSteps to read ahead,
365 1 : // buffering up to cap(r.steps) steps ahead of the current applied state.
366 1 : // Flushes need to be buffered and ingested sstables need to be copied, so
367 1 : // pipelining this preparation makes it more likely the step will be ready
368 1 : // to apply when the pacer decides to apply it.
369 1 : r.steps = make(chan workloadStep, 5)
370 1 : r.stepsApplied = make(chan workloadStep, 5)
371 1 :
372 1 : ctx, r.cancel = context.WithCancel(ctx)
373 1 : r.errgroup, ctx = errgroup.WithContext(ctx)
374 1 : r.errgroup.Go(func() error { return r.prepareWorkloadSteps(ctx) })
375 1 : r.errgroup.Go(func() error { return r.applyWorkloadSteps(ctx) })
376 1 : r.errgroup.Go(func() error { return r.refreshMetrics(ctx) })
377 1 : return nil
378 : }
379 :
380 : // refreshMetrics runs in its own goroutine, collecting metrics from the Pebble
381 : // instance whenever a) a workload step completes, or b) a compaction completes.
382 : // The Pacer implementations that pace based on read-amplification rely on these
383 : // refreshed metrics to decide when to allow the workload to proceed.
384 1 : func (r *Runner) refreshMetrics(ctx context.Context) error {
385 1 : startAt := time.Now()
386 1 : var workloadExhausted bool
387 1 : var workloadExhaustedAt time.Time
388 1 : stepsApplied := r.stepsApplied
389 1 : compactionCount, alreadyCompleted, compactionCh := r.nextCompactionCompletes(0)
390 1 : for {
391 1 : if !alreadyCompleted {
392 1 : select {
393 0 : case <-ctx.Done():
394 0 : return ctx.Err()
395 1 : case <-compactionCh:
396 : // Fall through to refreshing dbMetrics.
397 1 : case _, ok := <-stepsApplied:
398 1 : if !ok {
399 1 : workloadExhausted = true
400 1 : workloadExhaustedAt = time.Now()
401 1 : // Set the [stepsApplied] channel to nil so that we'll never
402 1 : // hit this case again, and we don't busy loop.
403 1 : stepsApplied = nil
404 1 : // Record the replay time.
405 1 : r.metrics.workloadDuration = workloadExhaustedAt.Sub(startAt)
406 1 : }
407 : // Fall through to refreshing dbMetrics.
408 : }
409 : }
410 :
411 1 : m := r.d.Metrics()
412 1 : r.dbMetricsCond.L.Lock()
413 1 : r.dbMetrics = m
414 1 : r.dbMetricsCond.Broadcast()
415 1 : r.dbMetricsCond.L.Unlock()
416 1 :
417 1 : // Collect sample metrics. These metrics are calculated by sampling
418 1 : // every time we collect metrics.
419 1 : r.metrics.readAmp.record(int64(m.ReadAmp()))
420 1 : r.metrics.estimatedDebt.record(int64(m.Compact.EstimatedDebt))
421 1 : r.metrics.tombstoneCount.record(int64(m.Keys.TombstoneCount))
422 1 : r.metrics.totalSize.record(int64(m.DiskSpaceUsage()))
423 1 : r.metrics.writeThroughput.record(int64(r.metrics.writeBytes.Load()))
424 1 :
425 1 : compactionCount, alreadyCompleted, compactionCh = r.nextCompactionCompletes(compactionCount)
426 1 : // Consider whether replaying is complete. There are two necessary
427 1 : // conditions:
428 1 : //
429 1 : // 1. The workload must be exhausted.
430 1 : // 2. Compactions must have quiesced.
431 1 : //
432 1 : // The first condition is simple. The replay tool is responsible for
433 1 : // applying the workload. The goroutine responsible for applying the
434 1 : // workload closes the `stepsApplied` channel after the last step has
435 1 : // been applied, and we'll flip `workloadExhausted` to true.
436 1 : //
437 1 : // The second condition is tricky. The replay tool doesn't control
438 1 : // compactions and doesn't have visibility into whether the compaction
439 1 : // picker is about to schedule a new compaction. We can tell when
440 1 : // compactions are in progress or may be immeninent (eg, flushes in
441 1 : // progress). If it appears that compactions have quiesced, pause for a
442 1 : // fixed duration to see if a new one is scheduled. If not, consider
443 1 : // compactions quiesced.
444 1 : if workloadExhausted && !alreadyCompleted && r.compactionsAppearQuiesced(m) {
445 1 : select {
446 1 : case <-compactionCh:
447 1 : // A new compaction just finished; compactions have not
448 1 : // quiesced.
449 1 : continue
450 1 : case <-time.After(time.Second):
451 1 : // No compactions completed. If it still looks like they've
452 1 : // quiesced according to the metrics, consider them quiesced.
453 1 : if r.compactionsAppearQuiesced(r.d.Metrics()) {
454 1 : r.metrics.quiesceDuration = time.Since(workloadExhaustedAt)
455 1 : return nil
456 1 : }
457 : }
458 : }
459 : }
460 : }
461 :
462 : // compactionsAppearQuiesced returns true if the database may have quiesced, and
463 : // there likely won't be additional compactions scheduled. Detecting quiescence
464 : // is a bit fraught: The various signals that Pebble makes available are
465 : // adjusted at different points in the compaction lifecycle, and database
466 : // mutexes are dropped and acquired between them. This makes it difficult to
467 : // reliably identify when compactions quiesce.
468 : //
469 : // For example, our call to DB.Metrics() may acquire the DB.mu mutex when a
470 : // compaction has just successfully completed, but before it's managed to
471 : // schedule the next compaction (DB.mu is dropped while it attempts to acquire
472 : // the manifest lock).
473 1 : func (r *Runner) compactionsAppearQuiesced(m *pebble.Metrics) bool {
474 1 : r.compactionMu.Lock()
475 1 : defer r.compactionMu.Unlock()
476 1 : if m.Flush.NumInProgress > 0 {
477 1 : return false
478 1 : } else if m.Compact.NumInProgress > 0 && r.compactionMu.started != r.compactionMu.completed {
479 1 : return false
480 1 : }
481 1 : return true
482 : }
483 :
484 : // nextCompactionCompletes may be used to be notified when new compactions
485 : // complete. The caller is responsible for holding on to a monotonically
486 : // increasing count representing the number of compactions that have been
487 : // observed, beginning at zero.
488 : //
489 : // The caller passes their current count as an argument. If a new compaction has
490 : // already completed since their provided count, nextCompactionCompletes returns
491 : // the new count and a true boolean return value. If a new compaction has not
492 : // yet completed, it returns a channel that will be closed when the next
493 : // compaction completes. This scheme allows the caller to select{...},
494 : // performing some action on every compaction completion.
495 : func (r *Runner) nextCompactionCompletes(
496 : lastObserved int64,
497 1 : ) (count int64, alreadyOccurred bool, ch chan struct{}) {
498 1 : r.compactionMu.Lock()
499 1 : defer r.compactionMu.Unlock()
500 1 :
501 1 : if lastObserved < r.compactionMu.completed {
502 1 : // There has already been another compaction since the last one observed
503 1 : // by this caller. Return immediately.
504 1 : return r.compactionMu.completed, true, nil
505 1 : }
506 :
507 : // The last observed compaction is still the most recent compaction.
508 : // Return a channel that the caller can wait on to be notified when the
509 : // next compaction occurs.
510 1 : if r.compactionMu.ch == nil {
511 1 : r.compactionMu.ch = make(chan struct{})
512 1 : }
513 1 : return lastObserved, false, r.compactionMu.ch
514 : }
515 :
516 : // Wait waits for the workload replay to complete. Wait returns once the entire
517 : // workload has been replayed, and compactions have quiesced.
518 1 : func (r *Runner) Wait() (Metrics, error) {
519 1 : err := r.errgroup.Wait()
520 1 : if storedErr := r.err.Load(); storedErr != nil {
521 0 : err = storedErr.(error)
522 0 : }
523 1 : pm := r.d.Metrics()
524 1 : total := pm.Total()
525 1 : var ingestBytesWeighted uint64
526 1 : for l := 0; l < len(pm.Levels); l++ {
527 1 : ingestBytesWeighted += pm.Levels[l].BytesIngested * uint64(len(pm.Levels)-l-1)
528 1 : }
529 :
530 1 : m := Metrics{
531 1 : Final: pm,
532 1 : EstimatedDebt: r.metrics.estimatedDebt,
533 1 : PaceDuration: time.Duration(r.metrics.paceDurationNano.Load()),
534 1 : ReadAmp: r.metrics.readAmp,
535 1 : QuiesceDuration: r.metrics.quiesceDuration,
536 1 : TombstoneCount: r.metrics.tombstoneCount,
537 1 : TotalSize: r.metrics.totalSize,
538 1 : TotalWriteAmp: total.WriteAmp(),
539 1 : WorkloadDuration: r.metrics.workloadDuration,
540 1 : WriteBytes: r.metrics.writeBytes.Load(),
541 1 : WriteStalls: make(map[string]int),
542 1 : WriteStallsDuration: make(map[string]time.Duration),
543 1 : WriteThroughput: r.metrics.writeThroughput,
544 1 : }
545 1 :
546 1 : r.writeStallMetrics.Lock()
547 1 : for reason, count := range r.writeStallMetrics.countByReason {
548 1 : m.WriteStalls[reason] = count
549 1 : }
550 1 : for reason, duration := range r.writeStallMetrics.durationByReason {
551 1 : m.WriteStallsDuration[reason] = duration
552 1 : }
553 1 : r.writeStallMetrics.Unlock()
554 1 : m.CompactionCounts.Total = pm.Compact.Count
555 1 : m.CompactionCounts.Default = pm.Compact.DefaultCount
556 1 : m.CompactionCounts.DeleteOnly = pm.Compact.DeleteOnlyCount
557 1 : m.CompactionCounts.ElisionOnly = pm.Compact.ElisionOnlyCount
558 1 : m.CompactionCounts.Move = pm.Compact.MoveCount
559 1 : m.CompactionCounts.Read = pm.Compact.ReadCount
560 1 : m.CompactionCounts.TombstoneDensity = pm.Compact.TombstoneDensityCount
561 1 : m.CompactionCounts.Rewrite = pm.Compact.RewriteCount
562 1 : m.CompactionCounts.Copy = pm.Compact.CopyCount
563 1 : m.CompactionCounts.MultiLevel = pm.Compact.MultiLevelCount
564 1 : m.Ingest.BytesIntoL0 = pm.Levels[0].BytesIngested
565 1 : m.Ingest.BytesWeightedByLevel = ingestBytesWeighted
566 1 : return m, err
567 : }
568 :
569 : // Close closes remaining open resources, including the database. It must be
570 : // called after Wait.
571 1 : func (r *Runner) Close() error {
572 1 : return r.d.Close()
573 1 : }
574 :
575 : // A workloadStep describes a single manifest edit in the workload. It may be a
576 : // flush or ingest that should be applied to the test database, or it may be a
577 : // compaction that is surfaced to allow the replay logic to compare against the
578 : // state of the database at workload collection time.
579 : type workloadStep struct {
580 : kind stepKind
581 : ve manifest.VersionEdit
582 : // a Version describing the state of the LSM *before* the workload was
583 : // collected.
584 : pv *manifest.Version
585 : // a Version describing the state of the LSM when the workload was
586 : // collected.
587 : v *manifest.Version
588 : // non-nil for flushStepKind
589 : flushBatch *pebble.Batch
590 : tablesToIngest []string
591 : cumulativeWriteBytes uint64
592 : }
593 :
594 : type stepKind uint8
595 :
596 : const (
597 : flushStepKind stepKind = iota
598 : ingestStepKind
599 : compactionStepKind
600 : )
601 :
602 : // eventListener returns a Pebble EventListener that is installed on the replay
603 : // database so that the replay runner has access to internal Pebble events.
604 1 : func (r *Runner) eventListener() pebble.EventListener {
605 1 : var writeStallBegin time.Time
606 1 : var writeStallReason string
607 1 : l := pebble.EventListener{
608 1 : BackgroundError: func(err error) {
609 0 : r.err.Store(err)
610 0 : r.cancel()
611 0 : },
612 1 : WriteStallBegin: func(info pebble.WriteStallBeginInfo) {
613 1 : r.writeStallMetrics.Lock()
614 1 : defer r.writeStallMetrics.Unlock()
615 1 : writeStallReason = info.Reason
616 1 : // Take just the first word of the reason.
617 1 : if j := strings.IndexByte(writeStallReason, ' '); j != -1 {
618 1 : writeStallReason = writeStallReason[:j]
619 1 : }
620 1 : switch writeStallReason {
621 1 : case "L0", "memtable":
622 1 : r.writeStallMetrics.countByReason[writeStallReason]++
623 0 : default:
624 0 : panic(fmt.Sprintf("unrecognized write stall reason %q", info.Reason))
625 : }
626 1 : writeStallBegin = time.Now()
627 : },
628 1 : WriteStallEnd: func() {
629 1 : r.writeStallMetrics.Lock()
630 1 : defer r.writeStallMetrics.Unlock()
631 1 : r.writeStallMetrics.durationByReason[writeStallReason] += time.Since(writeStallBegin)
632 1 : },
633 1 : CompactionBegin: func(_ pebble.CompactionInfo) {
634 1 : r.compactionMu.Lock()
635 1 : defer r.compactionMu.Unlock()
636 1 : r.compactionMu.started++
637 1 : },
638 1 : CompactionEnd: func(_ pebble.CompactionInfo) {
639 1 : // Keep track of the number of compactions that complete and notify
640 1 : // anyone waiting for a compaction to complete. See the function
641 1 : // nextCompactionCompletes for the corresponding receiver side.
642 1 : r.compactionMu.Lock()
643 1 : defer r.compactionMu.Unlock()
644 1 : r.compactionMu.completed++
645 1 : if r.compactionMu.ch != nil {
646 1 : // Signal that a compaction has completed.
647 1 : close(r.compactionMu.ch)
648 1 : r.compactionMu.ch = nil
649 1 : }
650 : },
651 : }
652 1 : l.EnsureDefaults(nil)
653 1 : return l
654 : }
655 :
656 : // applyWorkloadSteps runs in its own goroutine, reading workload steps off the
657 : // r.steps channel and applying them to the test database.
658 1 : func (r *Runner) applyWorkloadSteps(ctx context.Context) error {
659 1 : for {
660 1 : var ok bool
661 1 : var step workloadStep
662 1 : select {
663 0 : case <-ctx.Done():
664 0 : return ctx.Err()
665 1 : case step, ok = <-r.steps:
666 1 : if !ok {
667 1 : // Exhausted the workload. Exit.
668 1 : close(r.stepsApplied)
669 1 : return nil
670 1 : }
671 : }
672 :
673 1 : paceDur := r.Pacer.pace(r, step)
674 1 : r.metrics.paceDurationNano.Add(uint64(paceDur))
675 1 :
676 1 : switch step.kind {
677 1 : case flushStepKind:
678 1 : if err := step.flushBatch.Commit(&pebble.WriteOptions{Sync: false}); err != nil {
679 0 : return err
680 0 : }
681 1 : _, err := r.d.AsyncFlush()
682 1 : if err != nil {
683 0 : return err
684 0 : }
685 1 : r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
686 1 : r.stepsApplied <- step
687 0 : case ingestStepKind:
688 0 : if err := r.d.Ingest(context.Background(), step.tablesToIngest); err != nil {
689 0 : return err
690 0 : }
691 0 : r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
692 0 : r.stepsApplied <- step
693 1 : case compactionStepKind:
694 : // No-op.
695 : // TODO(jackson): Should we elide this earlier?
696 0 : default:
697 0 : panic("unreachable")
698 : }
699 : }
700 : }
701 :
702 : // prepareWorkloadSteps runs in its own goroutine, reading the workload
703 : // manifests in order to reconstruct the workload and prepare each step to be
704 : // applied. It sends each workload step to the r.steps channel.
705 1 : func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
706 1 : defer func() { close(r.steps) }()
707 :
708 1 : idx := r.workload.manifestIdx
709 1 :
710 1 : var cumulativeWriteBytes uint64
711 1 : var flushBufs flushBuffers
712 1 : var v *manifest.Version
713 1 : var previousVersion *manifest.Version
714 1 : var bve manifest.BulkVersionEdit
715 1 : bve.AddedByFileNum = make(map[base.FileNum]*manifest.FileMetadata)
716 1 : applyVE := func(ve *manifest.VersionEdit) error {
717 1 : return bve.Accumulate(ve)
718 1 : }
719 1 : currentVersion := func() (*manifest.Version, error) {
720 1 : var err error
721 1 : v, err = bve.Apply(v,
722 1 : r.Opts.Comparer,
723 1 : r.Opts.FlushSplitBytes,
724 1 : r.Opts.Experimental.ReadCompactionRate)
725 1 : bve = manifest.BulkVersionEdit{AddedByFileNum: bve.AddedByFileNum}
726 1 : return v, err
727 1 : }
728 :
729 1 : for ; idx < len(r.workload.manifests); idx++ {
730 1 : if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
731 0 : break
732 : }
733 :
734 1 : err := func() error {
735 1 : manifestName := r.workload.manifests[idx]
736 1 : f, err := r.WorkloadFS.Open(r.WorkloadFS.PathJoin(r.WorkloadPath, manifestName))
737 1 : if err != nil {
738 0 : return err
739 0 : }
740 1 : defer f.Close()
741 1 :
742 1 : rr := record.NewReader(f, 0 /* logNum */)
743 1 : // A manifest's first record always holds the initial version state.
744 1 : // If this is the first manifest we're examining, we load it in
745 1 : // order to seed `metas` with the file metadata of the existing
746 1 : // files. Otherwise, we can skip it because we already know all the
747 1 : // file metadatas up to this point.
748 1 : rec, err := rr.Next()
749 1 : if err != nil {
750 0 : return err
751 0 : }
752 1 : if idx == r.workload.manifestIdx {
753 1 : var ve manifest.VersionEdit
754 1 : if err := ve.Decode(rec); err != nil {
755 0 : return err
756 0 : }
757 1 : if err := applyVE(&ve); err != nil {
758 0 : return err
759 0 : }
760 : }
761 :
762 : // Read the remaining of the manifests version edits, one-by-one.
763 1 : for {
764 1 : rec, err := rr.Next()
765 1 : if err == io.EOF || record.IsInvalidRecord(err) {
766 1 : break
767 1 : } else if err != nil {
768 0 : return err
769 0 : }
770 1 : var ve manifest.VersionEdit
771 1 : if err = ve.Decode(rec); err == io.EOF || record.IsInvalidRecord(err) {
772 0 : break
773 1 : } else if err != nil {
774 0 : return err
775 0 : }
776 1 : if err := applyVE(&ve); err != nil {
777 0 : return err
778 0 : }
779 1 : if idx == r.workload.manifestIdx && rr.Offset() <= r.workload.manifestOff {
780 1 : // The record rec began at an offset strictly less than
781 1 : // rr.Offset(), which means it's strictly less than
782 1 : // r.workload.manifestOff, and we should skip it.
783 1 : continue
784 : }
785 1 : if len(ve.NewFiles) == 0 && len(ve.DeletedFiles) == 0 {
786 1 : // Skip WAL rotations and other events that don't affect the
787 1 : // files of the LSM.
788 1 : continue
789 : }
790 :
791 1 : s := workloadStep{ve: ve}
792 1 : if len(ve.DeletedFiles) > 0 {
793 1 : // If a version edit deletes files, we assume it's a compaction.
794 1 : s.kind = compactionStepKind
795 1 : } else {
796 1 : // Default to ingest. If any files have unequal
797 1 : // smallest,largest sequence numbers, we'll update this to a
798 1 : // flush.
799 1 : s.kind = ingestStepKind
800 1 : }
801 1 : var newFiles []base.DiskFileNum
802 1 : for _, nf := range ve.NewFiles {
803 1 : newFiles = append(newFiles, nf.Meta.FileBacking.DiskFileNum)
804 1 : if s.kind == ingestStepKind && (nf.Meta.SmallestSeqNum != nf.Meta.LargestSeqNum || nf.Level != 0) {
805 1 : s.kind = flushStepKind
806 1 : }
807 : }
808 : // Add the current reference *Version to the step. This provides
809 : // access to, for example, the read-amplification of the
810 : // database at this point when the workload was collected. This
811 : // can be useful for pacing.
812 1 : if s.v, err = currentVersion(); err != nil {
813 0 : return err
814 0 : }
815 : // On the first time through, we set the previous version to the current
816 : // version otherwise we set it to the actual previous version.
817 1 : if previousVersion == nil {
818 1 : previousVersion = s.v
819 1 : }
820 1 : s.pv = previousVersion
821 1 : previousVersion = s.v
822 1 :
823 1 : // It's possible that the workload collector captured this
824 1 : // version edit, but wasn't able to collect all of the
825 1 : // corresponding sstables before being terminated.
826 1 : if s.kind == flushStepKind || s.kind == ingestStepKind {
827 1 : for _, fileNum := range newFiles {
828 1 : if _, ok := r.workload.sstables[base.PhysicalTableFileNum(fileNum)]; !ok {
829 0 : // TODO(jackson,leon): This isn't exactly an error
830 0 : // condition. Give this more thought; do we want to
831 0 : // require graceful exiting of workload collection,
832 0 : // such that the last version edit must have had its
833 0 : // corresponding sstables collected?
834 0 : return errors.Newf("sstable %s not found", fileNum)
835 0 : }
836 : }
837 : }
838 :
839 1 : switch s.kind {
840 1 : case flushStepKind:
841 1 : // Load all of the flushed sstables' keys into a batch.
842 1 : s.flushBatch = r.d.NewBatch()
843 1 : if err := loadFlushedSSTableKeys(s.flushBatch, r.WorkloadFS, r.WorkloadPath, newFiles, r.readerOpts, &flushBufs); err != nil {
844 0 : return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset())
845 0 : }
846 1 : cumulativeWriteBytes += uint64(s.flushBatch.Len())
847 0 : case ingestStepKind:
848 0 : // Copy the ingested sstables into a staging area within the
849 0 : // run dir. This is necessary for two reasons:
850 0 : // a) Ingest will remove the source file, and we don't want
851 0 : // to mutate the workload.
852 0 : // b) If the workload stored on another volume, Ingest
853 0 : // would need to fall back to copying the file since
854 0 : // it's not possible to link across volumes. The true
855 0 : // workload likely linked the file. Staging the file
856 0 : // ahead of time ensures that we're able to Link the
857 0 : // file like the original workload did.
858 0 : for _, fileNum := range newFiles {
859 0 : src := base.MakeFilepath(r.WorkloadFS, r.WorkloadPath, base.FileTypeTable, fileNum)
860 0 : dst := base.MakeFilepath(r.Opts.FS, r.stagingDir, base.FileTypeTable, fileNum)
861 0 : if err := vfs.CopyAcrossFS(r.WorkloadFS, src, r.Opts.FS, dst); err != nil {
862 0 : return errors.Wrapf(err, "ingest in %q at offset %d", manifestName, rr.Offset())
863 0 : }
864 0 : finfo, err := r.Opts.FS.Stat(dst)
865 0 : if err != nil {
866 0 : return errors.Wrapf(err, "stating %q", dst)
867 0 : }
868 0 : cumulativeWriteBytes += uint64(finfo.Size())
869 0 : s.tablesToIngest = append(s.tablesToIngest, dst)
870 : }
871 1 : case compactionStepKind:
872 : // Nothing to do.
873 : }
874 1 : s.cumulativeWriteBytes = cumulativeWriteBytes
875 1 :
876 1 : select {
877 0 : case <-ctx.Done():
878 0 : return ctx.Err()
879 1 : case r.steps <- s:
880 : }
881 :
882 1 : if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
883 0 : break
884 : }
885 : }
886 1 : return nil
887 : }()
888 1 : if err != nil {
889 0 : return err
890 0 : }
891 : }
892 1 : return nil
893 : }
894 :
895 : // findWorkloadFiles finds all manifests and tables in the provided path on fs.
896 : func findWorkloadFiles(
897 : path string, fs vfs.FS,
898 1 : ) (manifests []string, sstables map[base.FileNum]struct{}, err error) {
899 1 : dirents, err := fs.List(path)
900 1 : if err != nil {
901 0 : return nil, nil, err
902 0 : }
903 1 : sstables = make(map[base.FileNum]struct{})
904 1 : for _, dirent := range dirents {
905 1 : typ, fileNum, ok := base.ParseFilename(fs, dirent)
906 1 : if !ok {
907 1 : continue
908 : }
909 1 : switch typ {
910 1 : case base.FileTypeManifest:
911 1 : manifests = append(manifests, dirent)
912 1 : case base.FileTypeTable:
913 1 : sstables[base.PhysicalTableFileNum(fileNum)] = struct{}{}
914 : }
915 : }
916 1 : if len(manifests) == 0 {
917 1 : return nil, nil, errors.Newf("no manifests found")
918 1 : }
919 1 : sort.Strings(manifests)
920 1 : return manifests, sstables, err
921 : }
922 :
923 : // findManifestStart takes a database directory and FS containing the initial
924 : // database state that a workload will be run against, and a list of a workloads
925 : // manifests. It examines the database's current manifest to determine where
926 : // workload replay should begin, so as to not duplicate already-applied version
927 : // edits.
928 : //
929 : // It returns the index of the starting manifest, and the database's current
930 : // offset within the manifest.
931 : func findManifestStart(
932 : dbDir string, dbFS vfs.FS, manifests []string,
933 1 : ) (index int, offset int64, err error) {
934 1 : // Identify the database's current manifest.
935 1 : dbDesc, err := pebble.Peek(dbDir, dbFS)
936 1 : if err != nil {
937 0 : return 0, 0, err
938 0 : }
939 1 : dbManifest := dbFS.PathBase(dbDesc.ManifestFilename)
940 1 : // If there is no initial database state, begin workload replay from the
941 1 : // beginning of the first manifest.
942 1 : if !dbDesc.Exists {
943 1 : return 0, 0, nil
944 1 : }
945 1 : for index = 0; index < len(manifests); index++ {
946 1 : if manifests[index] == dbManifest {
947 1 : break
948 : }
949 : }
950 1 : if index == len(manifests) {
951 1 : // The initial database state has a manifest that does not appear within
952 1 : // the workload's set of manifests. This is possible if we began
953 1 : // recording the workload at the same time as a manifest rotation, but
954 1 : // more likely we're applying a workload to a different initial database
955 1 : // state than the one from which the workload was collected. Either way,
956 1 : // start from the beginning of the first manifest.
957 1 : return 0, 0, nil
958 1 : }
959 : // Find the initial database's offset within the manifest.
960 1 : info, err := dbFS.Stat(dbFS.PathJoin(dbDir, dbManifest))
961 1 : if err != nil {
962 0 : return 0, 0, err
963 0 : }
964 1 : return index, info.Size(), nil
965 : }
966 :
967 : // loadFlushedSSTableKeys copies keys from the sstables specified by `fileNums`
968 : // in the directory specified by `path` into the provided the batch. Keys are
969 : // applied to the batch in the order dictated by their sequence numbers within
970 : // the sstables, ensuring the relative relationship between sequence numbers is
971 : // maintained.
972 : //
973 : // Preserving the relative relationship between sequence numbers is not strictly
974 : // necessary, but it ensures we accurately exercise some microoptimizations (eg,
975 : // detecting user key changes by descending trailer). There may be additional
976 : // dependencies on sequence numbers in the future.
977 : func loadFlushedSSTableKeys(
978 : b *pebble.Batch,
979 : fs vfs.FS,
980 : path string,
981 : fileNums []base.DiskFileNum,
982 : readOpts sstable.ReaderOptions,
983 : bufs *flushBuffers,
984 1 : ) error {
985 1 : // Load all the keys across all the sstables.
986 1 : for _, fileNum := range fileNums {
987 1 : if err := func() error {
988 1 : filePath := base.MakeFilepath(fs, path, base.FileTypeTable, fileNum)
989 1 : f, err := fs.Open(filePath)
990 1 : if err != nil {
991 0 : return err
992 0 : }
993 1 : readable, err := sstable.NewSimpleReadable(f)
994 1 : if err != nil {
995 0 : f.Close()
996 0 : return err
997 0 : }
998 1 : r, err := sstable.NewReader(context.Background(), readable, readOpts)
999 1 : if err != nil {
1000 0 : return err
1001 0 : }
1002 1 : defer r.Close()
1003 1 :
1004 1 : // Load all the point keys.
1005 1 : iter, err := r.NewIter(sstable.NoTransforms, nil, nil)
1006 1 : if err != nil {
1007 0 : return err
1008 0 : }
1009 1 : defer iter.Close()
1010 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
1011 1 : var key flushedKey
1012 1 : key.Trailer = kv.K.Trailer
1013 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(kv.K.UserKey)
1014 1 : if v, callerOwned, err := kv.Value(nil); err != nil {
1015 0 : return err
1016 1 : } else if callerOwned {
1017 0 : key.value = v
1018 1 : } else {
1019 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1020 1 : }
1021 1 : bufs.keys = append(bufs.keys, key)
1022 : }
1023 :
1024 : // Load all the range tombstones.
1025 1 : if iter, err := r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms); err != nil {
1026 0 : return err
1027 1 : } else if iter != nil {
1028 1 : defer iter.Close()
1029 1 : s, err := iter.First()
1030 1 : for ; s != nil; s, err = iter.Next() {
1031 1 : if err := rangedel.Encode(*s, func(k base.InternalKey, v []byte) error {
1032 1 : var key flushedKey
1033 1 : key.Trailer = k.Trailer
1034 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
1035 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1036 1 : bufs.keys = append(bufs.keys, key)
1037 1 : return nil
1038 1 : }); err != nil {
1039 0 : return err
1040 0 : }
1041 : }
1042 1 : if err != nil {
1043 0 : return err
1044 0 : }
1045 : }
1046 :
1047 : // Load all the range keys.
1048 1 : if iter, err := r.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms); err != nil {
1049 0 : return err
1050 1 : } else if iter != nil {
1051 1 : defer iter.Close()
1052 1 : s, err := iter.First()
1053 1 : for ; s != nil; s, err = iter.Next() {
1054 1 : if err := rangekey.Encode(*s, func(k base.InternalKey, v []byte) error {
1055 1 : var key flushedKey
1056 1 : key.Trailer = k.Trailer
1057 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
1058 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1059 1 : bufs.keys = append(bufs.keys, key)
1060 1 : return nil
1061 1 : }); err != nil {
1062 0 : return err
1063 0 : }
1064 : }
1065 1 : if err != nil {
1066 0 : return err
1067 0 : }
1068 : }
1069 1 : return nil
1070 0 : }(); err != nil {
1071 0 : return err
1072 0 : }
1073 : }
1074 :
1075 : // Sort the flushed keys by their sequence numbers so that we can apply them
1076 : // to the batch in the same order, maintaining the relative relationship
1077 : // between keys.
1078 : // NB: We use a stable sort so that keys corresponding to span fragments
1079 : // (eg, range tombstones and range keys) have a deterministic ordering for
1080 : // testing.
1081 1 : sort.Stable(bufs.keys)
1082 1 :
1083 1 : // Add the keys to the batch in the order they were committed when the
1084 1 : // workload was captured.
1085 1 : for i := 0; i < len(bufs.keys); i++ {
1086 1 : var err error
1087 1 : switch bufs.keys[i].Kind() {
1088 1 : case base.InternalKeyKindDelete:
1089 1 : err = b.Delete(bufs.keys[i].UserKey, nil)
1090 0 : case base.InternalKeyKindDeleteSized:
1091 0 : v, _ := binary.Uvarint(bufs.keys[i].value)
1092 0 : // Batch.DeleteSized takes just the length of the value being
1093 0 : // deleted and adds the key's length to derive the overall entry
1094 0 : // size of the value being deleted. This has already been done to
1095 0 : // the key we're reading from the sstable, so we must subtract the
1096 0 : // key length from the encoded value before calling b.DeleteSized,
1097 0 : // which will again add the key length before encoding.
1098 0 : err = b.DeleteSized(bufs.keys[i].UserKey, uint32(v-uint64(len(bufs.keys[i].UserKey))), nil)
1099 1 : case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete:
1100 1 : err = b.Set(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1101 0 : case base.InternalKeyKindMerge:
1102 0 : err = b.Merge(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1103 1 : case base.InternalKeyKindSingleDelete:
1104 1 : err = b.SingleDelete(bufs.keys[i].UserKey, nil)
1105 1 : case base.InternalKeyKindRangeDelete:
1106 1 : err = b.DeleteRange(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1107 1 : case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
1108 1 : s, err := rangekey.Decode(bufs.keys[i].InternalKey, bufs.keys[i].value, nil)
1109 1 : if err != nil {
1110 0 : return err
1111 0 : }
1112 1 : if len(s.Keys) != 1 {
1113 0 : return errors.Newf("range key span unexpectedly contains %d keys", len(s.Keys))
1114 0 : }
1115 1 : switch bufs.keys[i].Kind() {
1116 1 : case base.InternalKeyKindRangeKeySet:
1117 1 : err = b.RangeKeySet(s.Start, s.End, s.Keys[0].Suffix, s.Keys[0].Value, nil)
1118 1 : case base.InternalKeyKindRangeKeyUnset:
1119 1 : err = b.RangeKeyUnset(s.Start, s.End, s.Keys[0].Suffix, nil)
1120 1 : case base.InternalKeyKindRangeKeyDelete:
1121 1 : err = b.RangeKeyDelete(s.Start, s.End, nil)
1122 0 : default:
1123 0 : err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
1124 : }
1125 1 : if err != nil {
1126 0 : return err
1127 0 : }
1128 0 : default:
1129 0 : err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
1130 : }
1131 1 : if err != nil {
1132 0 : return err
1133 0 : }
1134 : }
1135 :
1136 : // Done with the flushBuffers. Reset.
1137 1 : bufs.keys = bufs.keys[:0]
1138 1 : return nil
1139 : }
1140 :
1141 : type flushBuffers struct {
1142 : keys flushedKeysByTrailer
1143 : alloc bytealloc.A
1144 : }
1145 :
1146 : type flushedKeysByTrailer []flushedKey
1147 :
1148 1 : func (s flushedKeysByTrailer) Len() int { return len(s) }
1149 1 : func (s flushedKeysByTrailer) Less(i, j int) bool { return s[i].Trailer < s[j].Trailer }
1150 1 : func (s flushedKeysByTrailer) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
1151 :
1152 : type flushedKey struct {
1153 : base.InternalKey
1154 : value []byte
1155 : }
|