Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package replay implements collection and replaying of compaction benchmarking
6 : // workloads. A workload is a collection of flushed and ingested sstables, along
7 : // with the corresponding manifests describing the order and grouping with which
8 : // they were applied. Replaying a workload flushes and ingests the same keys and
9 : // sstables to reproduce the write workload for the purpose of evaluating
10 : // compaction heuristics.
11 : package replay
12 :
13 : import (
14 : "context"
15 : "encoding/binary"
16 : "fmt"
17 : "io"
18 : "os"
19 : "sort"
20 : "strings"
21 : "sync"
22 : "sync/atomic"
23 : "time"
24 :
25 : "github.com/cockroachdb/errors"
26 : "github.com/cockroachdb/pebble"
27 : "github.com/cockroachdb/pebble/internal/base"
28 : "github.com/cockroachdb/pebble/internal/bytealloc"
29 : "github.com/cockroachdb/pebble/internal/manifest"
30 : "github.com/cockroachdb/pebble/internal/rangedel"
31 : "github.com/cockroachdb/pebble/internal/rangekey"
32 : "github.com/cockroachdb/pebble/record"
33 : "github.com/cockroachdb/pebble/sstable"
34 : "github.com/cockroachdb/pebble/vfs"
35 : "golang.org/x/perf/benchfmt"
36 : "golang.org/x/sync/errgroup"
37 : )
38 :
39 : // A Pacer paces replay of a workload, determining when to apply the next
40 : // incoming write.
41 : type Pacer interface {
42 : pace(r *Runner, step workloadStep) time.Duration
43 : }
44 :
45 : // computeReadAmp calculates the read amplification from a manifest.Version
46 0 : func computeReadAmp(v *manifest.Version) int {
47 0 : refRAmp := v.L0Sublevels.ReadAmplification()
48 0 : for _, lvl := range v.Levels[1:] {
49 0 : if !lvl.Empty() {
50 0 : refRAmp++
51 0 : }
52 : }
53 0 : return refRAmp
54 : }
55 :
56 : // waitForReadAmpLE is a common function used by PaceByReferenceReadAmp and
57 : // PaceByFixedReadAmp to wait on the dbMetricsNotifier condition variable if the
58 : // read amplification observed is greater than the specified target (refRAmp).
59 1 : func waitForReadAmpLE(r *Runner, rAmp int) {
60 1 : r.dbMetricsCond.L.Lock()
61 1 : m := r.dbMetrics
62 1 : ra := m.ReadAmp()
63 1 : for ra > rAmp {
64 1 : r.dbMetricsCond.Wait()
65 1 : ra = r.dbMetrics.ReadAmp()
66 1 : }
67 1 : r.dbMetricsCond.L.Unlock()
68 : }
69 :
70 : // Unpaced implements Pacer by applying each new write as soon as possible. It
71 : // may be useful for examining performance under high read amplification.
72 : type Unpaced struct{}
73 :
74 1 : func (Unpaced) pace(*Runner, workloadStep) (d time.Duration) { return }
75 :
76 : // PaceByReferenceReadAmp implements Pacer by applying each new write following
77 : // the collected workloads read amplification.
78 : type PaceByReferenceReadAmp struct{}
79 :
80 0 : func (PaceByReferenceReadAmp) pace(r *Runner, w workloadStep) time.Duration {
81 0 : startTime := time.Now()
82 0 : refRAmp := computeReadAmp(w.pv)
83 0 : waitForReadAmpLE(r, refRAmp)
84 0 : return time.Since(startTime)
85 0 : }
86 :
87 : // PaceByFixedReadAmp implements Pacer by applying each new write following a
88 : // fixed read amplification.
89 : type PaceByFixedReadAmp int
90 :
91 1 : func (pra PaceByFixedReadAmp) pace(r *Runner, _ workloadStep) time.Duration {
92 1 : startTime := time.Now()
93 1 : waitForReadAmpLE(r, int(pra))
94 1 : return time.Since(startTime)
95 1 : }
96 :
97 : // Metrics holds the various statistics on a replay run and its performance.
98 : type Metrics struct {
99 : CompactionCounts struct {
100 : Total int64
101 : Default int64
102 : DeleteOnly int64
103 : ElisionOnly int64
104 : Move int64
105 : Read int64
106 : Rewrite int64
107 : Copy int64
108 : MultiLevel int64
109 : }
110 : EstimatedDebt SampledMetric
111 : Final *pebble.Metrics
112 : Ingest struct {
113 : BytesIntoL0 uint64
114 : // BytesWeightedByLevel is calculated as the number of bytes ingested
115 : // into a level multiplied by the level's distance from the bottommost
116 : // level (L6), summed across all levels. It can be used to guage how
117 : // effective heuristics are at ingesting files into lower levels, saving
118 : // write amplification.
119 : BytesWeightedByLevel uint64
120 : }
121 : // PaceDuration is the time waiting for the pacer to allow the workload to
122 : // continue.
123 : PaceDuration time.Duration
124 : ReadAmp SampledMetric
125 : // QuiesceDuration is the time between completing application of the workload and
126 : // compactions quiescing.
127 : QuiesceDuration time.Duration
128 : TombstoneCount SampledMetric
129 : // TotalSize holds the total size of the database, sampled after each
130 : // workload step.
131 : TotalSize SampledMetric
132 : TotalWriteAmp float64
133 : WorkloadDuration time.Duration
134 : WriteBytes uint64
135 : WriteStalls map[string]int
136 : WriteStallsDuration map[string]time.Duration
137 : WriteThroughput SampledMetric
138 : }
139 :
140 : // Plot holds an ascii plot and its name.
141 : type Plot struct {
142 : Name string
143 : Plot string
144 : }
145 :
146 : // Plots returns a slice of ascii plots describing metrics change over time.
147 0 : func (m *Metrics) Plots(width, height int) []Plot {
148 0 : const scaleMB = 1.0 / float64(1<<20)
149 0 : return []Plot{
150 0 : {Name: "Write throughput (MB/s)", Plot: m.WriteThroughput.PlotIncreasingPerSec(width, height, scaleMB)},
151 0 : {Name: "Estimated compaction debt (MB)", Plot: m.EstimatedDebt.Plot(width, height, scaleMB)},
152 0 : {Name: "Total database size (MB)", Plot: m.TotalSize.Plot(width, height, scaleMB)},
153 0 : {Name: "ReadAmp", Plot: m.ReadAmp.Plot(width, height, 1.0)},
154 0 : }
155 0 : }
156 :
157 : // WriteBenchmarkString writes the metrics in the form of a series of
158 : // 'Benchmark' lines understandable by benchstat.
159 1 : func (m *Metrics) WriteBenchmarkString(name string, w io.Writer) error {
160 1 : type benchmarkSection struct {
161 1 : label string
162 1 : values []benchfmt.Value
163 1 : }
164 1 : groups := []benchmarkSection{
165 1 : {label: "CompactionCounts", values: []benchfmt.Value{
166 1 : {Value: float64(m.CompactionCounts.Total), Unit: "compactions"},
167 1 : {Value: float64(m.CompactionCounts.Default), Unit: "default"},
168 1 : {Value: float64(m.CompactionCounts.DeleteOnly), Unit: "delete"},
169 1 : {Value: float64(m.CompactionCounts.ElisionOnly), Unit: "elision"},
170 1 : {Value: float64(m.CompactionCounts.Move), Unit: "move"},
171 1 : {Value: float64(m.CompactionCounts.Read), Unit: "read"},
172 1 : {Value: float64(m.CompactionCounts.Rewrite), Unit: "rewrite"},
173 1 : {Value: float64(m.CompactionCounts.Copy), Unit: "copy"},
174 1 : {Value: float64(m.CompactionCounts.MultiLevel), Unit: "multilevel"},
175 1 : }},
176 1 : // Total database sizes sampled after every workload step and
177 1 : // compaction. This can be used to evaluate the relative LSM space
178 1 : // amplification between runs of the same workload. Calculating the true
179 1 : // space amplification continuously is prohibitvely expensive (it
180 1 : // requires totally compacting a copy of the LSM).
181 1 : {label: "DatabaseSize/mean", values: []benchfmt.Value{
182 1 : {Value: m.TotalSize.Mean(), Unit: "bytes"},
183 1 : }},
184 1 : {label: "DatabaseSize/max", values: []benchfmt.Value{
185 1 : {Value: float64(m.TotalSize.Max()), Unit: "bytes"},
186 1 : }},
187 1 : // Time applying the workload and time waiting for compactions to
188 1 : // quiesce after the workload has completed.
189 1 : {label: "DurationWorkload", values: []benchfmt.Value{
190 1 : {Value: m.WorkloadDuration.Seconds(), Unit: "sec/op"},
191 1 : }},
192 1 : {label: "DurationQuiescing", values: []benchfmt.Value{
193 1 : {Value: m.QuiesceDuration.Seconds(), Unit: "sec/op"},
194 1 : }},
195 1 : {label: "DurationPaceDelay", values: []benchfmt.Value{
196 1 : {Value: m.PaceDuration.Seconds(), Unit: "sec/op"},
197 1 : }},
198 1 : // Estimated compaction debt, sampled after every workload step and
199 1 : // compaction.
200 1 : {label: "EstimatedDebt/mean", values: []benchfmt.Value{
201 1 : {Value: m.EstimatedDebt.Mean(), Unit: "bytes"},
202 1 : }},
203 1 : {label: "EstimatedDebt/max", values: []benchfmt.Value{
204 1 : {Value: float64(m.EstimatedDebt.Max()), Unit: "bytes"},
205 1 : }},
206 1 : {label: "FlushUtilization", values: []benchfmt.Value{
207 1 : {Value: m.Final.Flush.WriteThroughput.Utilization(), Unit: "util"},
208 1 : }},
209 1 : {label: "IngestedIntoL0", values: []benchfmt.Value{
210 1 : {Value: float64(m.Ingest.BytesIntoL0), Unit: "bytes"},
211 1 : }},
212 1 : {label: "IngestWeightedByLevel", values: []benchfmt.Value{
213 1 : {Value: float64(m.Ingest.BytesWeightedByLevel), Unit: "bytes"},
214 1 : }},
215 1 : {label: "ReadAmp/mean", values: []benchfmt.Value{
216 1 : {Value: m.ReadAmp.Mean(), Unit: "files"},
217 1 : }},
218 1 : {label: "ReadAmp/max", values: []benchfmt.Value{
219 1 : {Value: float64(m.ReadAmp.Max()), Unit: "files"},
220 1 : }},
221 1 : {label: "TombstoneCount/mean", values: []benchfmt.Value{
222 1 : {Value: m.TombstoneCount.Mean(), Unit: "tombstones"},
223 1 : }},
224 1 : {label: "TombstoneCount/max", values: []benchfmt.Value{
225 1 : {Value: float64(m.TombstoneCount.Max()), Unit: "tombstones"},
226 1 : }},
227 1 : {label: "Throughput", values: []benchfmt.Value{
228 1 : {Value: float64(m.WriteBytes) / (m.WorkloadDuration + m.QuiesceDuration).Seconds(), Unit: "B/s"},
229 1 : }},
230 1 : {label: "WriteAmp", values: []benchfmt.Value{
231 1 : {Value: float64(m.TotalWriteAmp), Unit: "wamp"},
232 1 : }},
233 1 : }
234 1 :
235 1 : for _, reason := range []string{"L0", "memtable"} {
236 1 : groups = append(groups, benchmarkSection{
237 1 : label: fmt.Sprintf("WriteStall/%s", reason),
238 1 : values: []benchfmt.Value{
239 1 : {Value: float64(m.WriteStalls[reason]), Unit: "stalls"},
240 1 : {Value: float64(m.WriteStallsDuration[reason].Seconds()), Unit: "stallsec/op"},
241 1 : },
242 1 : })
243 1 : }
244 :
245 1 : bw := benchfmt.NewWriter(w)
246 1 : for _, grp := range groups {
247 1 : err := bw.Write(&benchfmt.Result{
248 1 : Name: benchfmt.Name(fmt.Sprintf("BenchmarkReplay/%s/%s", name, grp.label)),
249 1 : Iters: 1,
250 1 : Values: grp.values,
251 1 : })
252 1 : if err != nil {
253 0 : return err
254 0 : }
255 : }
256 1 : return nil
257 : }
258 :
259 : // Runner runs a captured workload against a test database, collecting
260 : // metrics on performance.
261 : type Runner struct {
262 : RunDir string
263 : WorkloadFS vfs.FS
264 : WorkloadPath string
265 : Pacer Pacer
266 : Opts *pebble.Options
267 : MaxWriteBytes uint64
268 :
269 : // Internal state.
270 :
271 : d *pebble.DB
272 : // dbMetrics and dbMetricsCond work in unison to update the metrics and
273 : // notify (broadcast) to any waiting clients that metrics have been updated.
274 : dbMetrics *pebble.Metrics
275 : dbMetricsCond sync.Cond
276 : cancel func()
277 : err atomic.Value
278 : errgroup *errgroup.Group
279 : readerOpts sstable.ReaderOptions
280 : stagingDir string
281 : steps chan workloadStep
282 : stepsApplied chan workloadStep
283 :
284 : metrics struct {
285 : estimatedDebt SampledMetric
286 : quiesceDuration time.Duration
287 : readAmp SampledMetric
288 : tombstoneCount SampledMetric
289 : totalSize SampledMetric
290 : paceDurationNano atomic.Uint64
291 : workloadDuration time.Duration
292 : writeBytes atomic.Uint64
293 : writeThroughput SampledMetric
294 : }
295 : writeStallMetrics struct {
296 : sync.Mutex
297 : countByReason map[string]int
298 : durationByReason map[string]time.Duration
299 : }
300 : // compactionMu holds state for tracking the number of compactions
301 : // started and completed and waking waiting goroutines when a new compaction
302 : // completes. See nextCompactionCompletes.
303 : compactionMu struct {
304 : sync.Mutex
305 : ch chan struct{}
306 : started int64
307 : completed int64
308 : }
309 : workload struct {
310 : manifests []string
311 : // manifest{Idx,Off} record the starting position of the workload
312 : // relative to the initial database state.
313 : manifestIdx int
314 : manifestOff int64
315 : // sstables records the set of captured workload sstables by file num.
316 : sstables map[base.FileNum]struct{}
317 : }
318 : }
319 :
320 : // Run begins executing the workload and returns.
321 : //
322 : // The workload application will respect the provided context's cancellation.
323 1 : func (r *Runner) Run(ctx context.Context) error {
324 1 : // Find the workload start relative to the RunDir's existing database state.
325 1 : // A prefix of the workload's manifest edits are expected to have already
326 1 : // been applied to the checkpointed existing database state.
327 1 : var err error
328 1 : r.workload.manifests, r.workload.sstables, err = findWorkloadFiles(r.WorkloadPath, r.WorkloadFS)
329 1 : if err != nil {
330 0 : return err
331 0 : }
332 1 : r.workload.manifestIdx, r.workload.manifestOff, err = findManifestStart(r.RunDir, r.Opts.FS, r.workload.manifests)
333 1 : if err != nil {
334 0 : return err
335 0 : }
336 :
337 : // Set up a staging dir for files that will be ingested.
338 1 : r.stagingDir = r.Opts.FS.PathJoin(r.RunDir, "staging")
339 1 : if err := r.Opts.FS.MkdirAll(r.stagingDir, os.ModePerm); err != nil {
340 0 : return err
341 0 : }
342 :
343 1 : r.dbMetricsCond = sync.Cond{
344 1 : L: &sync.Mutex{},
345 1 : }
346 1 :
347 1 : // Extend the user-provided Options with extensions necessary for replay
348 1 : // mechanics.
349 1 : r.compactionMu.ch = make(chan struct{})
350 1 : r.Opts.AddEventListener(r.eventListener())
351 1 : r.writeStallMetrics.countByReason = make(map[string]int)
352 1 : r.writeStallMetrics.durationByReason = make(map[string]time.Duration)
353 1 : r.Opts.EnsureDefaults()
354 1 : r.readerOpts = r.Opts.MakeReaderOptions()
355 1 : r.Opts.DisableWAL = true
356 1 : r.d, err = pebble.Open(r.RunDir, r.Opts)
357 1 : if err != nil {
358 0 : return err
359 0 : }
360 :
361 1 : r.dbMetrics = r.d.Metrics()
362 1 :
363 1 : // Use a buffered channel to allow the prepareWorkloadSteps to read ahead,
364 1 : // buffering up to cap(r.steps) steps ahead of the current applied state.
365 1 : // Flushes need to be buffered and ingested sstables need to be copied, so
366 1 : // pipelining this preparation makes it more likely the step will be ready
367 1 : // to apply when the pacer decides to apply it.
368 1 : r.steps = make(chan workloadStep, 5)
369 1 : r.stepsApplied = make(chan workloadStep, 5)
370 1 :
371 1 : ctx, r.cancel = context.WithCancel(ctx)
372 1 : r.errgroup, ctx = errgroup.WithContext(ctx)
373 1 : r.errgroup.Go(func() error { return r.prepareWorkloadSteps(ctx) })
374 1 : r.errgroup.Go(func() error { return r.applyWorkloadSteps(ctx) })
375 1 : r.errgroup.Go(func() error { return r.refreshMetrics(ctx) })
376 1 : return nil
377 : }
378 :
379 : // refreshMetrics runs in its own goroutine, collecting metrics from the Pebble
380 : // instance whenever a) a workload step completes, or b) a compaction completes.
381 : // The Pacer implementations that pace based on read-amplification rely on these
382 : // refreshed metrics to decide when to allow the workload to proceed.
383 1 : func (r *Runner) refreshMetrics(ctx context.Context) error {
384 1 : startAt := time.Now()
385 1 : var workloadExhausted bool
386 1 : var workloadExhaustedAt time.Time
387 1 : stepsApplied := r.stepsApplied
388 1 : compactionCount, alreadyCompleted, compactionCh := r.nextCompactionCompletes(0)
389 1 : for {
390 1 : if !alreadyCompleted {
391 1 : select {
392 0 : case <-ctx.Done():
393 0 : return ctx.Err()
394 1 : case <-compactionCh:
395 : // Fall through to refreshing dbMetrics.
396 1 : case _, ok := <-stepsApplied:
397 1 : if !ok {
398 1 : workloadExhausted = true
399 1 : workloadExhaustedAt = time.Now()
400 1 : // Set the [stepsApplied] channel to nil so that we'll never
401 1 : // hit this case again, and we don't busy loop.
402 1 : stepsApplied = nil
403 1 : // Record the replay time.
404 1 : r.metrics.workloadDuration = workloadExhaustedAt.Sub(startAt)
405 1 : }
406 : // Fall through to refreshing dbMetrics.
407 : }
408 : }
409 :
410 1 : m := r.d.Metrics()
411 1 : r.dbMetricsCond.L.Lock()
412 1 : r.dbMetrics = m
413 1 : r.dbMetricsCond.Broadcast()
414 1 : r.dbMetricsCond.L.Unlock()
415 1 :
416 1 : // Collect sample metrics. These metrics are calculated by sampling
417 1 : // every time we collect metrics.
418 1 : r.metrics.readAmp.record(int64(m.ReadAmp()))
419 1 : r.metrics.estimatedDebt.record(int64(m.Compact.EstimatedDebt))
420 1 : r.metrics.tombstoneCount.record(int64(m.Keys.TombstoneCount))
421 1 : r.metrics.totalSize.record(int64(m.DiskSpaceUsage()))
422 1 : r.metrics.writeThroughput.record(int64(r.metrics.writeBytes.Load()))
423 1 :
424 1 : compactionCount, alreadyCompleted, compactionCh = r.nextCompactionCompletes(compactionCount)
425 1 : // Consider whether replaying is complete. There are two necessary
426 1 : // conditions:
427 1 : //
428 1 : // 1. The workload must be exhausted.
429 1 : // 2. Compactions must have quiesced.
430 1 : //
431 1 : // The first condition is simple. The replay tool is responsible for
432 1 : // applying the workload. The goroutine responsible for applying the
433 1 : // workload closes the `stepsApplied` channel after the last step has
434 1 : // been applied, and we'll flip `workloadExhausted` to true.
435 1 : //
436 1 : // The second condition is tricky. The replay tool doesn't control
437 1 : // compactions and doesn't have visibility into whether the compaction
438 1 : // picker is about to schedule a new compaction. We can tell when
439 1 : // compactions are in progress or may be immeninent (eg, flushes in
440 1 : // progress). If it appears that compactions have quiesced, pause for a
441 1 : // fixed duration to see if a new one is scheduled. If not, consider
442 1 : // compactions quiesced.
443 1 : if workloadExhausted && !alreadyCompleted && r.compactionsAppearQuiesced(m) {
444 1 : select {
445 1 : case <-compactionCh:
446 1 : // A new compaction just finished; compactions have not
447 1 : // quiesced.
448 1 : continue
449 1 : case <-time.After(time.Second):
450 1 : // No compactions completed. If it still looks like they've
451 1 : // quiesced according to the metrics, consider them quiesced.
452 1 : if r.compactionsAppearQuiesced(r.d.Metrics()) {
453 1 : r.metrics.quiesceDuration = time.Since(workloadExhaustedAt)
454 1 : return nil
455 1 : }
456 : }
457 : }
458 : }
459 : }
460 :
461 : // compactionsAppearQuiesced returns true if the database may have quiesced, and
462 : // there likely won't be additional compactions scheduled. Detecting quiescence
463 : // is a bit fraught: The various signals that Pebble makes available are
464 : // adjusted at different points in the compaction lifecycle, and database
465 : // mutexes are dropped and acquired between them. This makes it difficult to
466 : // reliably identify when compactions quiesce.
467 : //
468 : // For example, our call to DB.Metrics() may acquire the DB.mu mutex when a
469 : // compaction has just successfully completed, but before it's managed to
470 : // schedule the next compaction (DB.mu is dropped while it attempts to acquire
471 : // the manifest lock).
472 1 : func (r *Runner) compactionsAppearQuiesced(m *pebble.Metrics) bool {
473 1 : r.compactionMu.Lock()
474 1 : defer r.compactionMu.Unlock()
475 1 : if m.Flush.NumInProgress > 0 {
476 1 : return false
477 1 : } else if m.Compact.NumInProgress > 0 && r.compactionMu.started != r.compactionMu.completed {
478 1 : return false
479 1 : }
480 1 : return true
481 : }
482 :
483 : // nextCompactionCompletes may be used to be notified when new compactions
484 : // complete. The caller is responsible for holding on to a monotonically
485 : // increasing count representing the number of compactions that have been
486 : // observed, beginning at zero.
487 : //
488 : // The caller passes their current count as an argument. If a new compaction has
489 : // already completed since their provided count, nextCompactionCompletes returns
490 : // the new count and a true boolean return value. If a new compaction has not
491 : // yet completed, it returns a channel that will be closed when the next
492 : // compaction completes. This scheme allows the caller to select{...},
493 : // performing some action on every compaction completion.
494 : func (r *Runner) nextCompactionCompletes(
495 : lastObserved int64,
496 1 : ) (count int64, alreadyOccurred bool, ch chan struct{}) {
497 1 : r.compactionMu.Lock()
498 1 : defer r.compactionMu.Unlock()
499 1 :
500 1 : if lastObserved < r.compactionMu.completed {
501 1 : // There has already been another compaction since the last one observed
502 1 : // by this caller. Return immediately.
503 1 : return r.compactionMu.completed, true, nil
504 1 : }
505 :
506 : // The last observed compaction is still the most recent compaction.
507 : // Return a channel that the caller can wait on to be notified when the
508 : // next compaction occurs.
509 1 : if r.compactionMu.ch == nil {
510 1 : r.compactionMu.ch = make(chan struct{})
511 1 : }
512 1 : return lastObserved, false, r.compactionMu.ch
513 : }
514 :
515 : // Wait waits for the workload replay to complete. Wait returns once the entire
516 : // workload has been replayed, and compactions have quiesced.
517 1 : func (r *Runner) Wait() (Metrics, error) {
518 1 : err := r.errgroup.Wait()
519 1 : if storedErr := r.err.Load(); storedErr != nil {
520 0 : err = storedErr.(error)
521 0 : }
522 1 : pm := r.d.Metrics()
523 1 : total := pm.Total()
524 1 : var ingestBytesWeighted uint64
525 1 : for l := 0; l < len(pm.Levels); l++ {
526 1 : ingestBytesWeighted += pm.Levels[l].BytesIngested * uint64(len(pm.Levels)-l-1)
527 1 : }
528 :
529 1 : m := Metrics{
530 1 : Final: pm,
531 1 : EstimatedDebt: r.metrics.estimatedDebt,
532 1 : PaceDuration: time.Duration(r.metrics.paceDurationNano.Load()),
533 1 : ReadAmp: r.metrics.readAmp,
534 1 : QuiesceDuration: r.metrics.quiesceDuration,
535 1 : TombstoneCount: r.metrics.tombstoneCount,
536 1 : TotalSize: r.metrics.totalSize,
537 1 : TotalWriteAmp: total.WriteAmp(),
538 1 : WorkloadDuration: r.metrics.workloadDuration,
539 1 : WriteBytes: r.metrics.writeBytes.Load(),
540 1 : WriteStalls: make(map[string]int),
541 1 : WriteStallsDuration: make(map[string]time.Duration),
542 1 : WriteThroughput: r.metrics.writeThroughput,
543 1 : }
544 1 :
545 1 : r.writeStallMetrics.Lock()
546 1 : for reason, count := range r.writeStallMetrics.countByReason {
547 1 : m.WriteStalls[reason] = count
548 1 : }
549 1 : for reason, duration := range r.writeStallMetrics.durationByReason {
550 1 : m.WriteStallsDuration[reason] = duration
551 1 : }
552 1 : r.writeStallMetrics.Unlock()
553 1 : m.CompactionCounts.Total = pm.Compact.Count
554 1 : m.CompactionCounts.Default = pm.Compact.DefaultCount
555 1 : m.CompactionCounts.DeleteOnly = pm.Compact.DeleteOnlyCount
556 1 : m.CompactionCounts.ElisionOnly = pm.Compact.ElisionOnlyCount
557 1 : m.CompactionCounts.Move = pm.Compact.MoveCount
558 1 : m.CompactionCounts.Read = pm.Compact.ReadCount
559 1 : m.CompactionCounts.Rewrite = pm.Compact.RewriteCount
560 1 : m.CompactionCounts.Copy = pm.Compact.CopyCount
561 1 : m.CompactionCounts.MultiLevel = pm.Compact.MultiLevelCount
562 1 : m.Ingest.BytesIntoL0 = pm.Levels[0].BytesIngested
563 1 : m.Ingest.BytesWeightedByLevel = ingestBytesWeighted
564 1 : return m, err
565 : }
566 :
567 : // Close closes remaining open resources, including the database. It must be
568 : // called after Wait.
569 1 : func (r *Runner) Close() error {
570 1 : return r.d.Close()
571 1 : }
572 :
573 : // A workloadStep describes a single manifest edit in the workload. It may be a
574 : // flush or ingest that should be applied to the test database, or it may be a
575 : // compaction that is surfaced to allow the replay logic to compare against the
576 : // state of the database at workload collection time.
577 : type workloadStep struct {
578 : kind stepKind
579 : ve manifest.VersionEdit
580 : // a Version describing the state of the LSM *before* the workload was
581 : // collected.
582 : pv *manifest.Version
583 : // a Version describing the state of the LSM when the workload was
584 : // collected.
585 : v *manifest.Version
586 : // non-nil for flushStepKind
587 : flushBatch *pebble.Batch
588 : tablesToIngest []string
589 : cumulativeWriteBytes uint64
590 : }
591 :
592 : type stepKind uint8
593 :
594 : const (
595 : flushStepKind stepKind = iota
596 : ingestStepKind
597 : compactionStepKind
598 : )
599 :
600 : // eventListener returns a Pebble EventListener that is installed on the replay
601 : // database so that the replay runner has access to internal Pebble events.
602 1 : func (r *Runner) eventListener() pebble.EventListener {
603 1 : var writeStallBegin time.Time
604 1 : var writeStallReason string
605 1 : l := pebble.EventListener{
606 1 : BackgroundError: func(err error) {
607 0 : r.err.Store(err)
608 0 : r.cancel()
609 0 : },
610 1 : WriteStallBegin: func(info pebble.WriteStallBeginInfo) {
611 1 : r.writeStallMetrics.Lock()
612 1 : defer r.writeStallMetrics.Unlock()
613 1 : writeStallReason = info.Reason
614 1 : // Take just the first word of the reason.
615 1 : if j := strings.IndexByte(writeStallReason, ' '); j != -1 {
616 1 : writeStallReason = writeStallReason[:j]
617 1 : }
618 1 : switch writeStallReason {
619 1 : case "L0", "memtable":
620 1 : r.writeStallMetrics.countByReason[writeStallReason]++
621 0 : default:
622 0 : panic(fmt.Sprintf("unrecognized write stall reason %q", info.Reason))
623 : }
624 1 : writeStallBegin = time.Now()
625 : },
626 1 : WriteStallEnd: func() {
627 1 : r.writeStallMetrics.Lock()
628 1 : defer r.writeStallMetrics.Unlock()
629 1 : r.writeStallMetrics.durationByReason[writeStallReason] += time.Since(writeStallBegin)
630 1 : },
631 1 : CompactionBegin: func(_ pebble.CompactionInfo) {
632 1 : r.compactionMu.Lock()
633 1 : defer r.compactionMu.Unlock()
634 1 : r.compactionMu.started++
635 1 : },
636 1 : CompactionEnd: func(_ pebble.CompactionInfo) {
637 1 : // Keep track of the number of compactions that complete and notify
638 1 : // anyone waiting for a compaction to complete. See the function
639 1 : // nextCompactionCompletes for the corresponding receiver side.
640 1 : r.compactionMu.Lock()
641 1 : defer r.compactionMu.Unlock()
642 1 : r.compactionMu.completed++
643 1 : if r.compactionMu.ch != nil {
644 1 : // Signal that a compaction has completed.
645 1 : close(r.compactionMu.ch)
646 1 : r.compactionMu.ch = nil
647 1 : }
648 : },
649 : }
650 1 : l.EnsureDefaults(nil)
651 1 : return l
652 : }
653 :
654 : // applyWorkloadSteps runs in its own goroutine, reading workload steps off the
655 : // r.steps channel and applying them to the test database.
656 1 : func (r *Runner) applyWorkloadSteps(ctx context.Context) error {
657 1 : for {
658 1 : var ok bool
659 1 : var step workloadStep
660 1 : select {
661 0 : case <-ctx.Done():
662 0 : return ctx.Err()
663 1 : case step, ok = <-r.steps:
664 1 : if !ok {
665 1 : // Exhausted the workload. Exit.
666 1 : close(r.stepsApplied)
667 1 : return nil
668 1 : }
669 : }
670 :
671 1 : paceDur := r.Pacer.pace(r, step)
672 1 : r.metrics.paceDurationNano.Add(uint64(paceDur))
673 1 :
674 1 : switch step.kind {
675 1 : case flushStepKind:
676 1 : if err := step.flushBatch.Commit(&pebble.WriteOptions{Sync: false}); err != nil {
677 0 : return err
678 0 : }
679 1 : _, err := r.d.AsyncFlush()
680 1 : if err != nil {
681 0 : return err
682 0 : }
683 1 : r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
684 1 : r.stepsApplied <- step
685 0 : case ingestStepKind:
686 0 : if err := r.d.Ingest(step.tablesToIngest); err != nil {
687 0 : return err
688 0 : }
689 0 : r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
690 0 : r.stepsApplied <- step
691 1 : case compactionStepKind:
692 : // No-op.
693 : // TODO(jackson): Should we elide this earlier?
694 0 : default:
695 0 : panic("unreachable")
696 : }
697 : }
698 : }
699 :
700 : // prepareWorkloadSteps runs in its own goroutine, reading the workload
701 : // manifests in order to reconstruct the workload and prepare each step to be
702 : // applied. It sends each workload step to the r.steps channel.
703 1 : func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
704 1 : defer func() { close(r.steps) }()
705 :
706 1 : idx := r.workload.manifestIdx
707 1 :
708 1 : var cumulativeWriteBytes uint64
709 1 : var flushBufs flushBuffers
710 1 : var v *manifest.Version
711 1 : var previousVersion *manifest.Version
712 1 : var bve manifest.BulkVersionEdit
713 1 : bve.AddedByFileNum = make(map[base.FileNum]*manifest.FileMetadata)
714 1 : applyVE := func(ve *manifest.VersionEdit) error {
715 1 : return bve.Accumulate(ve)
716 1 : }
717 1 : currentVersion := func() (*manifest.Version, error) {
718 1 : var err error
719 1 : v, err = bve.Apply(v,
720 1 : r.Opts.Comparer,
721 1 : r.Opts.FlushSplitBytes,
722 1 : r.Opts.Experimental.ReadCompactionRate)
723 1 : bve = manifest.BulkVersionEdit{AddedByFileNum: bve.AddedByFileNum}
724 1 : return v, err
725 1 : }
726 :
727 1 : for ; idx < len(r.workload.manifests); idx++ {
728 1 : if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
729 0 : break
730 : }
731 :
732 1 : err := func() error {
733 1 : manifestName := r.workload.manifests[idx]
734 1 : f, err := r.WorkloadFS.Open(r.WorkloadFS.PathJoin(r.WorkloadPath, manifestName))
735 1 : if err != nil {
736 0 : return err
737 0 : }
738 1 : defer f.Close()
739 1 :
740 1 : rr := record.NewReader(f, 0 /* logNum */)
741 1 : // A manifest's first record always holds the initial version state.
742 1 : // If this is the first manifest we're examining, we load it in
743 1 : // order to seed `metas` with the file metadata of the existing
744 1 : // files. Otherwise, we can skip it because we already know all the
745 1 : // file metadatas up to this point.
746 1 : rec, err := rr.Next()
747 1 : if err != nil {
748 0 : return err
749 0 : }
750 1 : if idx == r.workload.manifestIdx {
751 1 : var ve manifest.VersionEdit
752 1 : if err := ve.Decode(rec); err != nil {
753 0 : return err
754 0 : }
755 1 : if err := applyVE(&ve); err != nil {
756 0 : return err
757 0 : }
758 : }
759 :
760 : // Read the remaining of the manifests version edits, one-by-one.
761 1 : for {
762 1 : rec, err := rr.Next()
763 1 : if err == io.EOF || record.IsInvalidRecord(err) {
764 1 : break
765 1 : } else if err != nil {
766 0 : return err
767 0 : }
768 1 : var ve manifest.VersionEdit
769 1 : if err = ve.Decode(rec); err == io.EOF || record.IsInvalidRecord(err) {
770 0 : break
771 1 : } else if err != nil {
772 0 : return err
773 0 : }
774 1 : if err := applyVE(&ve); err != nil {
775 0 : return err
776 0 : }
777 1 : if idx == r.workload.manifestIdx && rr.Offset() <= r.workload.manifestOff {
778 1 : // The record rec began at an offset strictly less than
779 1 : // rr.Offset(), which means it's strictly less than
780 1 : // r.workload.manifestOff, and we should skip it.
781 1 : continue
782 : }
783 1 : if len(ve.NewFiles) == 0 && len(ve.DeletedFiles) == 0 {
784 1 : // Skip WAL rotations and other events that don't affect the
785 1 : // files of the LSM.
786 1 : continue
787 : }
788 :
789 1 : s := workloadStep{ve: ve}
790 1 : if len(ve.DeletedFiles) > 0 {
791 1 : // If a version edit deletes files, we assume it's a compaction.
792 1 : s.kind = compactionStepKind
793 1 : } else {
794 1 : // Default to ingest. If any files have unequal
795 1 : // smallest,largest sequence numbers, we'll update this to a
796 1 : // flush.
797 1 : s.kind = ingestStepKind
798 1 : }
799 1 : var newFiles []base.DiskFileNum
800 1 : for _, nf := range ve.NewFiles {
801 1 : newFiles = append(newFiles, nf.Meta.FileBacking.DiskFileNum)
802 1 : if s.kind == ingestStepKind && (nf.Meta.SmallestSeqNum != nf.Meta.LargestSeqNum || nf.Level != 0) {
803 1 : s.kind = flushStepKind
804 1 : }
805 : }
806 : // Add the current reference *Version to the step. This provides
807 : // access to, for example, the read-amplification of the
808 : // database at this point when the workload was collected. This
809 : // can be useful for pacing.
810 1 : if s.v, err = currentVersion(); err != nil {
811 0 : return err
812 0 : }
813 : // On the first time through, we set the previous version to the current
814 : // version otherwise we set it to the actual previous version.
815 1 : if previousVersion == nil {
816 1 : previousVersion = s.v
817 1 : }
818 1 : s.pv = previousVersion
819 1 : previousVersion = s.v
820 1 :
821 1 : // It's possible that the workload collector captured this
822 1 : // version edit, but wasn't able to collect all of the
823 1 : // corresponding sstables before being terminated.
824 1 : if s.kind == flushStepKind || s.kind == ingestStepKind {
825 1 : for _, fileNum := range newFiles {
826 1 : if _, ok := r.workload.sstables[base.PhysicalTableFileNum(fileNum)]; !ok {
827 0 : // TODO(jackson,leon): This isn't exactly an error
828 0 : // condition. Give this more thought; do we want to
829 0 : // require graceful exiting of workload collection,
830 0 : // such that the last version edit must have had its
831 0 : // corresponding sstables collected?
832 0 : return errors.Newf("sstable %s not found", fileNum)
833 0 : }
834 : }
835 : }
836 :
837 1 : switch s.kind {
838 1 : case flushStepKind:
839 1 : // Load all of the flushed sstables' keys into a batch.
840 1 : s.flushBatch = r.d.NewBatch()
841 1 : if err := loadFlushedSSTableKeys(s.flushBatch, r.WorkloadFS, r.WorkloadPath, newFiles, r.readerOpts, &flushBufs); err != nil {
842 0 : return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset())
843 0 : }
844 1 : cumulativeWriteBytes += uint64(s.flushBatch.Len())
845 0 : case ingestStepKind:
846 0 : // Copy the ingested sstables into a staging area within the
847 0 : // run dir. This is necessary for two reasons:
848 0 : // a) Ingest will remove the source file, and we don't want
849 0 : // to mutate the workload.
850 0 : // b) If the workload stored on another volume, Ingest
851 0 : // would need to fall back to copying the file since
852 0 : // it's not possible to link across volumes. The true
853 0 : // workload likely linked the file. Staging the file
854 0 : // ahead of time ensures that we're able to Link the
855 0 : // file like the original workload did.
856 0 : for _, fileNum := range newFiles {
857 0 : src := base.MakeFilepath(r.WorkloadFS, r.WorkloadPath, base.FileTypeTable, fileNum)
858 0 : dst := base.MakeFilepath(r.Opts.FS, r.stagingDir, base.FileTypeTable, fileNum)
859 0 : if err := vfs.CopyAcrossFS(r.WorkloadFS, src, r.Opts.FS, dst); err != nil {
860 0 : return errors.Wrapf(err, "ingest in %q at offset %d", manifestName, rr.Offset())
861 0 : }
862 0 : finfo, err := r.Opts.FS.Stat(dst)
863 0 : if err != nil {
864 0 : return errors.Wrapf(err, "stating %q", dst)
865 0 : }
866 0 : cumulativeWriteBytes += uint64(finfo.Size())
867 0 : s.tablesToIngest = append(s.tablesToIngest, dst)
868 : }
869 1 : case compactionStepKind:
870 : // Nothing to do.
871 : }
872 1 : s.cumulativeWriteBytes = cumulativeWriteBytes
873 1 :
874 1 : select {
875 0 : case <-ctx.Done():
876 0 : return ctx.Err()
877 1 : case r.steps <- s:
878 : }
879 :
880 1 : if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
881 0 : break
882 : }
883 : }
884 1 : return nil
885 : }()
886 1 : if err != nil {
887 0 : return err
888 0 : }
889 : }
890 1 : return nil
891 : }
892 :
893 : // findWorkloadFiles finds all manifests and tables in the provided path on fs.
894 : func findWorkloadFiles(
895 : path string, fs vfs.FS,
896 1 : ) (manifests []string, sstables map[base.FileNum]struct{}, err error) {
897 1 : dirents, err := fs.List(path)
898 1 : if err != nil {
899 0 : return nil, nil, err
900 0 : }
901 1 : sstables = make(map[base.FileNum]struct{})
902 1 : for _, dirent := range dirents {
903 1 : typ, fileNum, ok := base.ParseFilename(fs, dirent)
904 1 : if !ok {
905 1 : continue
906 : }
907 1 : switch typ {
908 1 : case base.FileTypeManifest:
909 1 : manifests = append(manifests, dirent)
910 1 : case base.FileTypeTable:
911 1 : sstables[base.PhysicalTableFileNum(fileNum)] = struct{}{}
912 : }
913 : }
914 1 : if len(manifests) == 0 {
915 1 : return nil, nil, errors.Newf("no manifests found")
916 1 : }
917 1 : sort.Strings(manifests)
918 1 : return manifests, sstables, err
919 : }
920 :
921 : // findManifestStart takes a database directory and FS containing the initial
922 : // database state that a workload will be run against, and a list of a workloads
923 : // manifests. It examines the database's current manifest to determine where
924 : // workload replay should begin, so as to not duplicate already-applied version
925 : // edits.
926 : //
927 : // It returns the index of the starting manifest, and the database's current
928 : // offset within the manifest.
929 : func findManifestStart(
930 : dbDir string, dbFS vfs.FS, manifests []string,
931 1 : ) (index int, offset int64, err error) {
932 1 : // Identify the database's current manifest.
933 1 : dbDesc, err := pebble.Peek(dbDir, dbFS)
934 1 : if err != nil {
935 0 : return 0, 0, err
936 0 : }
937 1 : dbManifest := dbFS.PathBase(dbDesc.ManifestFilename)
938 1 : // If there is no initial database state, begin workload replay from the
939 1 : // beginning of the first manifest.
940 1 : if !dbDesc.Exists {
941 1 : return 0, 0, nil
942 1 : }
943 1 : for index = 0; index < len(manifests); index++ {
944 1 : if manifests[index] == dbManifest {
945 1 : break
946 : }
947 : }
948 1 : if index == len(manifests) {
949 1 : // The initial database state has a manifest that does not appear within
950 1 : // the workload's set of manifests. This is possible if we began
951 1 : // recording the workload at the same time as a manifest rotation, but
952 1 : // more likely we're applying a workload to a different initial database
953 1 : // state than the one from which the workload was collected. Either way,
954 1 : // start from the beginning of the first manifest.
955 1 : return 0, 0, nil
956 1 : }
957 : // Find the initial database's offset within the manifest.
958 1 : info, err := dbFS.Stat(dbFS.PathJoin(dbDir, dbManifest))
959 1 : if err != nil {
960 0 : return 0, 0, err
961 0 : }
962 1 : return index, info.Size(), nil
963 : }
964 :
965 : // loadFlushedSSTableKeys copies keys from the sstables specified by `fileNums`
966 : // in the directory specified by `path` into the provided the batch. Keys are
967 : // applied to the batch in the order dictated by their sequence numbers within
968 : // the sstables, ensuring the relative relationship between sequence numbers is
969 : // maintained.
970 : //
971 : // Preserving the relative relationship between sequence numbers is not strictly
972 : // necessary, but it ensures we accurately exercise some microoptimizations (eg,
973 : // detecting user key changes by descending trailer). There may be additional
974 : // dependencies on sequence numbers in the future.
975 : func loadFlushedSSTableKeys(
976 : b *pebble.Batch,
977 : fs vfs.FS,
978 : path string,
979 : fileNums []base.DiskFileNum,
980 : readOpts sstable.ReaderOptions,
981 : bufs *flushBuffers,
982 1 : ) error {
983 1 : // Load all the keys across all the sstables.
984 1 : for _, fileNum := range fileNums {
985 1 : if err := func() error {
986 1 : filePath := base.MakeFilepath(fs, path, base.FileTypeTable, fileNum)
987 1 : f, err := fs.Open(filePath)
988 1 : if err != nil {
989 0 : return err
990 0 : }
991 1 : readable, err := sstable.NewSimpleReadable(f)
992 1 : if err != nil {
993 0 : f.Close()
994 0 : return err
995 0 : }
996 1 : r, err := sstable.NewReader(readable, readOpts)
997 1 : if err != nil {
998 0 : return err
999 0 : }
1000 1 : defer r.Close()
1001 1 :
1002 1 : // Load all the point keys.
1003 1 : iter, err := r.NewIter(sstable.NoTransforms, nil, nil)
1004 1 : if err != nil {
1005 0 : return err
1006 0 : }
1007 1 : defer iter.Close()
1008 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
1009 1 : var key flushedKey
1010 1 : key.Trailer = kv.K.Trailer
1011 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(kv.K.UserKey)
1012 1 : if v, callerOwned, err := kv.Value(nil); err != nil {
1013 0 : return err
1014 1 : } else if callerOwned {
1015 0 : key.value = v
1016 1 : } else {
1017 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1018 1 : }
1019 1 : bufs.keys = append(bufs.keys, key)
1020 : }
1021 :
1022 : // Load all the range tombstones.
1023 1 : if iter, err := r.NewRawRangeDelIter(sstable.NoTransforms); err != nil {
1024 0 : return err
1025 1 : } else if iter != nil {
1026 1 : defer iter.Close()
1027 1 : s, err := iter.First()
1028 1 : for ; s != nil; s, err = iter.Next() {
1029 1 : if err := rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
1030 1 : var key flushedKey
1031 1 : key.Trailer = k.Trailer
1032 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
1033 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1034 1 : bufs.keys = append(bufs.keys, key)
1035 1 : return nil
1036 1 : }); err != nil {
1037 0 : return err
1038 0 : }
1039 : }
1040 1 : if err != nil {
1041 0 : return err
1042 0 : }
1043 : }
1044 :
1045 : // Load all the range keys.
1046 1 : if iter, err := r.NewRawRangeKeyIter(sstable.NoTransforms); err != nil {
1047 0 : return err
1048 1 : } else if iter != nil {
1049 1 : defer iter.Close()
1050 1 : s, err := iter.First()
1051 1 : for ; s != nil; s, err = iter.Next() {
1052 1 : if err := rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
1053 1 : var key flushedKey
1054 1 : key.Trailer = k.Trailer
1055 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
1056 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1057 1 : bufs.keys = append(bufs.keys, key)
1058 1 : return nil
1059 1 : }); err != nil {
1060 0 : return err
1061 0 : }
1062 : }
1063 1 : if err != nil {
1064 0 : return err
1065 0 : }
1066 : }
1067 1 : return nil
1068 0 : }(); err != nil {
1069 0 : return err
1070 0 : }
1071 : }
1072 :
1073 : // Sort the flushed keys by their sequence numbers so that we can apply them
1074 : // to the batch in the same order, maintaining the relative relationship
1075 : // between keys.
1076 : // NB: We use a stable sort so that keys corresponding to span fragments
1077 : // (eg, range tombstones and range keys) have a deterministic ordering for
1078 : // testing.
1079 1 : sort.Stable(bufs.keys)
1080 1 :
1081 1 : // Add the keys to the batch in the order they were committed when the
1082 1 : // workload was captured.
1083 1 : for i := 0; i < len(bufs.keys); i++ {
1084 1 : var err error
1085 1 : switch bufs.keys[i].Kind() {
1086 1 : case base.InternalKeyKindDelete:
1087 1 : err = b.Delete(bufs.keys[i].UserKey, nil)
1088 0 : case base.InternalKeyKindDeleteSized:
1089 0 : v, _ := binary.Uvarint(bufs.keys[i].value)
1090 0 : // Batch.DeleteSized takes just the length of the value being
1091 0 : // deleted and adds the key's length to derive the overall entry
1092 0 : // size of the value being deleted. This has already been done to
1093 0 : // the key we're reading from the sstable, so we must subtract the
1094 0 : // key length from the encoded value before calling b.DeleteSized,
1095 0 : // which will again add the key length before encoding.
1096 0 : err = b.DeleteSized(bufs.keys[i].UserKey, uint32(v-uint64(len(bufs.keys[i].UserKey))), nil)
1097 1 : case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete:
1098 1 : err = b.Set(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1099 0 : case base.InternalKeyKindMerge:
1100 0 : err = b.Merge(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1101 1 : case base.InternalKeyKindSingleDelete:
1102 1 : err = b.SingleDelete(bufs.keys[i].UserKey, nil)
1103 1 : case base.InternalKeyKindRangeDelete:
1104 1 : err = b.DeleteRange(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1105 1 : case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
1106 1 : s, err := rangekey.Decode(bufs.keys[i].InternalKey, bufs.keys[i].value, nil)
1107 1 : if err != nil {
1108 0 : return err
1109 0 : }
1110 1 : if len(s.Keys) != 1 {
1111 0 : return errors.Newf("range key span unexpectedly contains %d keys", len(s.Keys))
1112 0 : }
1113 1 : switch bufs.keys[i].Kind() {
1114 1 : case base.InternalKeyKindRangeKeySet:
1115 1 : err = b.RangeKeySet(s.Start, s.End, s.Keys[0].Suffix, s.Keys[0].Value, nil)
1116 1 : case base.InternalKeyKindRangeKeyUnset:
1117 1 : err = b.RangeKeyUnset(s.Start, s.End, s.Keys[0].Suffix, nil)
1118 1 : case base.InternalKeyKindRangeKeyDelete:
1119 1 : err = b.RangeKeyDelete(s.Start, s.End, nil)
1120 0 : default:
1121 0 : err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
1122 : }
1123 1 : if err != nil {
1124 0 : return err
1125 0 : }
1126 0 : default:
1127 0 : err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
1128 : }
1129 1 : if err != nil {
1130 0 : return err
1131 0 : }
1132 : }
1133 :
1134 : // Done with the flushBuffers. Reset.
1135 1 : bufs.keys = bufs.keys[:0]
1136 1 : return nil
1137 : }
1138 :
1139 : type flushBuffers struct {
1140 : keys flushedKeysByTrailer
1141 : alloc bytealloc.A
1142 : }
1143 :
1144 : type flushedKeysByTrailer []flushedKey
1145 :
1146 1 : func (s flushedKeysByTrailer) Len() int { return len(s) }
1147 1 : func (s flushedKeysByTrailer) Less(i, j int) bool { return s[i].Trailer < s[j].Trailer }
1148 1 : func (s flushedKeysByTrailer) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
1149 :
1150 : type flushedKey struct {
1151 : base.InternalKey
1152 : value []byte
1153 : }
|