Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package replay implements collection and replaying of compaction benchmarking
6 : // workloads. A workload is a collection of flushed and ingested sstables, along
7 : // with the corresponding manifests describing the order and grouping with which
8 : // they were applied. Replaying a workload flushes and ingests the same keys and
9 : // sstables to reproduce the write workload for the purpose of evaluating
10 : // compaction heuristics.
11 : package replay
12 :
13 : import (
14 : "context"
15 : "encoding/binary"
16 : "fmt"
17 : "io"
18 : "os"
19 : "sort"
20 : "strings"
21 : "sync"
22 : "sync/atomic"
23 : "time"
24 :
25 : "github.com/cockroachdb/errors"
26 : "github.com/cockroachdb/pebble"
27 : "github.com/cockroachdb/pebble/internal/base"
28 : "github.com/cockroachdb/pebble/internal/bytealloc"
29 : "github.com/cockroachdb/pebble/internal/manifest"
30 : "github.com/cockroachdb/pebble/internal/rangedel"
31 : "github.com/cockroachdb/pebble/internal/rangekey"
32 : "github.com/cockroachdb/pebble/record"
33 : "github.com/cockroachdb/pebble/sstable"
34 : "github.com/cockroachdb/pebble/vfs"
35 : "golang.org/x/perf/benchfmt"
36 : "golang.org/x/sync/errgroup"
37 : )
38 :
39 : // A Pacer paces replay of a workload, determining when to apply the next
40 : // incoming write.
41 : type Pacer interface {
42 : pace(r *Runner, step workloadStep) time.Duration
43 : }
44 :
45 : // computeReadAmp calculates the read amplification from a manifest.Version
46 0 : func computeReadAmp(v *manifest.Version) int {
47 0 : refRAmp := v.L0Sublevels.ReadAmplification()
48 0 : for _, lvl := range v.Levels[1:] {
49 0 : if !lvl.Empty() {
50 0 : refRAmp++
51 0 : }
52 : }
53 0 : return refRAmp
54 : }
55 :
56 : // waitForReadAmpLE is a common function used by PaceByReferenceReadAmp and
57 : // PaceByFixedReadAmp to wait on the dbMetricsNotifier condition variable if the
58 : // read amplification observed is greater than the specified target (refRAmp).
59 1 : func waitForReadAmpLE(r *Runner, rAmp int) {
60 1 : r.dbMetricsCond.L.Lock()
61 1 : m := r.dbMetrics
62 1 : ra := m.ReadAmp()
63 1 : for ra > rAmp {
64 1 : r.dbMetricsCond.Wait()
65 1 : ra = r.dbMetrics.ReadAmp()
66 1 : }
67 1 : r.dbMetricsCond.L.Unlock()
68 : }
69 :
70 : // Unpaced implements Pacer by applying each new write as soon as possible. It
71 : // may be useful for examining performance under high read amplification.
72 : type Unpaced struct{}
73 :
74 1 : func (Unpaced) pace(*Runner, workloadStep) (d time.Duration) { return }
75 :
76 : // PaceByReferenceReadAmp implements Pacer by applying each new write following
77 : // the collected workloads read amplification.
78 : type PaceByReferenceReadAmp struct{}
79 :
80 0 : func (PaceByReferenceReadAmp) pace(r *Runner, w workloadStep) time.Duration {
81 0 : startTime := time.Now()
82 0 : refRAmp := computeReadAmp(w.pv)
83 0 : waitForReadAmpLE(r, refRAmp)
84 0 : return time.Since(startTime)
85 0 : }
86 :
87 : // PaceByFixedReadAmp implements Pacer by applying each new write following a
88 : // fixed read amplification.
89 : type PaceByFixedReadAmp int
90 :
91 1 : func (pra PaceByFixedReadAmp) pace(r *Runner, _ workloadStep) time.Duration {
92 1 : startTime := time.Now()
93 1 : waitForReadAmpLE(r, int(pra))
94 1 : return time.Since(startTime)
95 1 : }
96 :
97 : // Metrics holds the various statistics on a replay run and its performance.
98 : type Metrics struct {
99 : CompactionCounts struct {
100 : Total int64
101 : Default int64
102 : DeleteOnly int64
103 : ElisionOnly int64
104 : Move int64
105 : Read int64
106 : Rewrite int64
107 : MultiLevel int64
108 : }
109 : EstimatedDebt SampledMetric
110 : Final *pebble.Metrics
111 : Ingest struct {
112 : BytesIntoL0 uint64
113 : // BytesWeightedByLevel is calculated as the number of bytes ingested
114 : // into a level multiplied by the level's distance from the bottommost
115 : // level (L6), summed across all levels. It can be used to guage how
116 : // effective heuristics are at ingesting files into lower levels, saving
117 : // write amplification.
118 : BytesWeightedByLevel uint64
119 : }
120 : // PaceDuration is the time waiting for the pacer to allow the workload to
121 : // continue.
122 : PaceDuration time.Duration
123 : ReadAmp SampledMetric
124 : // QuiesceDuration is the time between completing application of the workload and
125 : // compactions quiescing.
126 : QuiesceDuration time.Duration
127 : TombstoneCount SampledMetric
128 : // TotalSize holds the total size of the database, sampled after each
129 : // workload step.
130 : TotalSize SampledMetric
131 : TotalWriteAmp float64
132 : WorkloadDuration time.Duration
133 : WriteBytes uint64
134 : WriteStalls map[string]int
135 : WriteStallsDuration map[string]time.Duration
136 : WriteThroughput SampledMetric
137 : }
138 :
139 : // Plot holds an ascii plot and its name.
140 : type Plot struct {
141 : Name string
142 : Plot string
143 : }
144 :
145 : // Plots returns a slice of ascii plots describing metrics change over time.
146 0 : func (m *Metrics) Plots(width, height int) []Plot {
147 0 : const scaleMB = 1.0 / float64(1<<20)
148 0 : return []Plot{
149 0 : {Name: "Write throughput (MB/s)", Plot: m.WriteThroughput.PlotIncreasingPerSec(width, height, scaleMB)},
150 0 : {Name: "Estimated compaction debt (MB)", Plot: m.EstimatedDebt.Plot(width, height, scaleMB)},
151 0 : {Name: "Total database size (MB)", Plot: m.TotalSize.Plot(width, height, scaleMB)},
152 0 : {Name: "ReadAmp", Plot: m.ReadAmp.Plot(width, height, 1.0)},
153 0 : }
154 0 : }
155 :
156 : // WriteBenchmarkString writes the metrics in the form of a series of
157 : // 'Benchmark' lines understandable by benchstat.
158 1 : func (m *Metrics) WriteBenchmarkString(name string, w io.Writer) error {
159 1 : type benchmarkSection struct {
160 1 : label string
161 1 : values []benchfmt.Value
162 1 : }
163 1 : groups := []benchmarkSection{
164 1 : {label: "CompactionCounts", values: []benchfmt.Value{
165 1 : {Value: float64(m.CompactionCounts.Total), Unit: "compactions"},
166 1 : {Value: float64(m.CompactionCounts.Default), Unit: "default"},
167 1 : {Value: float64(m.CompactionCounts.DeleteOnly), Unit: "delete"},
168 1 : {Value: float64(m.CompactionCounts.ElisionOnly), Unit: "elision"},
169 1 : {Value: float64(m.CompactionCounts.Move), Unit: "move"},
170 1 : {Value: float64(m.CompactionCounts.Read), Unit: "read"},
171 1 : {Value: float64(m.CompactionCounts.Rewrite), Unit: "rewrite"},
172 1 : {Value: float64(m.CompactionCounts.MultiLevel), Unit: "multilevel"},
173 1 : }},
174 1 : // Total database sizes sampled after every workload step and
175 1 : // compaction. This can be used to evaluate the relative LSM space
176 1 : // amplification between runs of the same workload. Calculating the true
177 1 : // space amplification continuously is prohibitvely expensive (it
178 1 : // requires totally compacting a copy of the LSM).
179 1 : {label: "DatabaseSize/mean", values: []benchfmt.Value{
180 1 : {Value: m.TotalSize.Mean(), Unit: "bytes"},
181 1 : }},
182 1 : {label: "DatabaseSize/max", values: []benchfmt.Value{
183 1 : {Value: float64(m.TotalSize.Max()), Unit: "bytes"},
184 1 : }},
185 1 : // Time applying the workload and time waiting for compactions to
186 1 : // quiesce after the workload has completed.
187 1 : {label: "DurationWorkload", values: []benchfmt.Value{
188 1 : {Value: m.WorkloadDuration.Seconds(), Unit: "sec/op"},
189 1 : }},
190 1 : {label: "DurationQuiescing", values: []benchfmt.Value{
191 1 : {Value: m.QuiesceDuration.Seconds(), Unit: "sec/op"},
192 1 : }},
193 1 : {label: "DurationPaceDelay", values: []benchfmt.Value{
194 1 : {Value: m.PaceDuration.Seconds(), Unit: "sec/op"},
195 1 : }},
196 1 : // Estimated compaction debt, sampled after every workload step and
197 1 : // compaction.
198 1 : {label: "EstimatedDebt/mean", values: []benchfmt.Value{
199 1 : {Value: m.EstimatedDebt.Mean(), Unit: "bytes"},
200 1 : }},
201 1 : {label: "EstimatedDebt/max", values: []benchfmt.Value{
202 1 : {Value: float64(m.EstimatedDebt.Max()), Unit: "bytes"},
203 1 : }},
204 1 : {label: "FlushUtilization", values: []benchfmt.Value{
205 1 : {Value: m.Final.Flush.WriteThroughput.Utilization(), Unit: "util"},
206 1 : }},
207 1 : {label: "IngestedIntoL0", values: []benchfmt.Value{
208 1 : {Value: float64(m.Ingest.BytesIntoL0), Unit: "bytes"},
209 1 : }},
210 1 : {label: "IngestWeightedByLevel", values: []benchfmt.Value{
211 1 : {Value: float64(m.Ingest.BytesWeightedByLevel), Unit: "bytes"},
212 1 : }},
213 1 : {label: "ReadAmp/mean", values: []benchfmt.Value{
214 1 : {Value: m.ReadAmp.Mean(), Unit: "files"},
215 1 : }},
216 1 : {label: "ReadAmp/max", values: []benchfmt.Value{
217 1 : {Value: float64(m.ReadAmp.Max()), Unit: "files"},
218 1 : }},
219 1 : {label: "TombstoneCount/mean", values: []benchfmt.Value{
220 1 : {Value: m.TombstoneCount.Mean(), Unit: "tombstones"},
221 1 : }},
222 1 : {label: "TombstoneCount/max", values: []benchfmt.Value{
223 1 : {Value: float64(m.TombstoneCount.Max()), Unit: "tombstones"},
224 1 : }},
225 1 : {label: "Throughput", values: []benchfmt.Value{
226 1 : {Value: float64(m.WriteBytes) / (m.WorkloadDuration + m.QuiesceDuration).Seconds(), Unit: "B/s"},
227 1 : }},
228 1 : {label: "WriteAmp", values: []benchfmt.Value{
229 1 : {Value: float64(m.TotalWriteAmp), Unit: "wamp"},
230 1 : }},
231 1 : }
232 1 :
233 1 : for _, reason := range []string{"L0", "memtable"} {
234 1 : groups = append(groups, benchmarkSection{
235 1 : label: fmt.Sprintf("WriteStall/%s", reason),
236 1 : values: []benchfmt.Value{
237 1 : {Value: float64(m.WriteStalls[reason]), Unit: "stalls"},
238 1 : {Value: float64(m.WriteStallsDuration[reason].Seconds()), Unit: "stallsec/op"},
239 1 : },
240 1 : })
241 1 : }
242 :
243 1 : bw := benchfmt.NewWriter(w)
244 1 : for _, grp := range groups {
245 1 : err := bw.Write(&benchfmt.Result{
246 1 : Name: benchfmt.Name(fmt.Sprintf("BenchmarkReplay/%s/%s", name, grp.label)),
247 1 : Iters: 1,
248 1 : Values: grp.values,
249 1 : })
250 1 : if err != nil {
251 0 : return err
252 0 : }
253 : }
254 1 : return nil
255 : }
256 :
257 : // Runner runs a captured workload against a test database, collecting
258 : // metrics on performance.
259 : type Runner struct {
260 : RunDir string
261 : WorkloadFS vfs.FS
262 : WorkloadPath string
263 : Pacer Pacer
264 : Opts *pebble.Options
265 : MaxWriteBytes uint64
266 :
267 : // Internal state.
268 :
269 : d *pebble.DB
270 : // dbMetrics and dbMetricsCond work in unison to update the metrics and
271 : // notify (broadcast) to any waiting clients that metrics have been updated.
272 : dbMetrics *pebble.Metrics
273 : dbMetricsCond sync.Cond
274 : cancel func()
275 : err atomic.Value
276 : errgroup *errgroup.Group
277 : readerOpts sstable.ReaderOptions
278 : stagingDir string
279 : steps chan workloadStep
280 : stepsApplied chan workloadStep
281 :
282 : metrics struct {
283 : estimatedDebt SampledMetric
284 : quiesceDuration time.Duration
285 : readAmp SampledMetric
286 : tombstoneCount SampledMetric
287 : totalSize SampledMetric
288 : paceDurationNano atomic.Uint64
289 : workloadDuration time.Duration
290 : writeBytes atomic.Uint64
291 : writeThroughput SampledMetric
292 : }
293 : writeStallMetrics struct {
294 : sync.Mutex
295 : countByReason map[string]int
296 : durationByReason map[string]time.Duration
297 : }
298 : // compactionMu holds state for tracking the number of compactions
299 : // started and completed and waking waiting goroutines when a new compaction
300 : // completes. See nextCompactionCompletes.
301 : compactionMu struct {
302 : sync.Mutex
303 : ch chan struct{}
304 : started int64
305 : completed int64
306 : }
307 : workload struct {
308 : manifests []string
309 : // manifest{Idx,Off} record the starting position of the workload
310 : // relative to the initial database state.
311 : manifestIdx int
312 : manifestOff int64
313 : // sstables records the set of captured workload sstables by file num.
314 : sstables map[base.FileNum]struct{}
315 : }
316 : }
317 :
318 : // Run begins executing the workload and returns.
319 : //
320 : // The workload application will respect the provided context's cancellation.
321 1 : func (r *Runner) Run(ctx context.Context) error {
322 1 : // Find the workload start relative to the RunDir's existing database state.
323 1 : // A prefix of the workload's manifest edits are expected to have already
324 1 : // been applied to the checkpointed existing database state.
325 1 : var err error
326 1 : r.workload.manifests, r.workload.sstables, err = findWorkloadFiles(r.WorkloadPath, r.WorkloadFS)
327 1 : if err != nil {
328 0 : return err
329 0 : }
330 1 : r.workload.manifestIdx, r.workload.manifestOff, err = findManifestStart(r.RunDir, r.Opts.FS, r.workload.manifests)
331 1 : if err != nil {
332 0 : return err
333 0 : }
334 :
335 : // Set up a staging dir for files that will be ingested.
336 1 : r.stagingDir = r.Opts.FS.PathJoin(r.RunDir, "staging")
337 1 : if err := r.Opts.FS.MkdirAll(r.stagingDir, os.ModePerm); err != nil {
338 0 : return err
339 0 : }
340 :
341 1 : r.dbMetricsCond = sync.Cond{
342 1 : L: &sync.Mutex{},
343 1 : }
344 1 :
345 1 : // Extend the user-provided Options with extensions necessary for replay
346 1 : // mechanics.
347 1 : r.compactionMu.ch = make(chan struct{})
348 1 : r.Opts.AddEventListener(r.eventListener())
349 1 : r.writeStallMetrics.countByReason = make(map[string]int)
350 1 : r.writeStallMetrics.durationByReason = make(map[string]time.Duration)
351 1 : r.Opts.EnsureDefaults()
352 1 : r.readerOpts = r.Opts.MakeReaderOptions()
353 1 : r.Opts.DisableWAL = true
354 1 : r.d, err = pebble.Open(r.RunDir, r.Opts)
355 1 : if err != nil {
356 0 : return err
357 0 : }
358 :
359 1 : r.dbMetrics = r.d.Metrics()
360 1 :
361 1 : // Use a buffered channel to allow the prepareWorkloadSteps to read ahead,
362 1 : // buffering up to cap(r.steps) steps ahead of the current applied state.
363 1 : // Flushes need to be buffered and ingested sstables need to be copied, so
364 1 : // pipelining this preparation makes it more likely the step will be ready
365 1 : // to apply when the pacer decides to apply it.
366 1 : r.steps = make(chan workloadStep, 5)
367 1 : r.stepsApplied = make(chan workloadStep, 5)
368 1 :
369 1 : ctx, r.cancel = context.WithCancel(ctx)
370 1 : r.errgroup, ctx = errgroup.WithContext(ctx)
371 1 : r.errgroup.Go(func() error { return r.prepareWorkloadSteps(ctx) })
372 1 : r.errgroup.Go(func() error { return r.applyWorkloadSteps(ctx) })
373 1 : r.errgroup.Go(func() error { return r.refreshMetrics(ctx) })
374 1 : return nil
375 : }
376 :
377 : // refreshMetrics runs in its own goroutine, collecting metrics from the Pebble
378 : // instance whenever a) a workload step completes, or b) a compaction completes.
379 : // The Pacer implementations that pace based on read-amplification rely on these
380 : // refreshed metrics to decide when to allow the workload to proceed.
381 1 : func (r *Runner) refreshMetrics(ctx context.Context) error {
382 1 : startAt := time.Now()
383 1 : var workloadExhausted bool
384 1 : var workloadExhaustedAt time.Time
385 1 : stepsApplied := r.stepsApplied
386 1 : compactionCount, alreadyCompleted, compactionCh := r.nextCompactionCompletes(0)
387 1 : for {
388 1 : if !alreadyCompleted {
389 1 : select {
390 0 : case <-ctx.Done():
391 0 : return ctx.Err()
392 1 : case <-compactionCh:
393 : // Fall through to refreshing dbMetrics.
394 1 : case _, ok := <-stepsApplied:
395 1 : if !ok {
396 1 : workloadExhausted = true
397 1 : workloadExhaustedAt = time.Now()
398 1 : // Set the [stepsApplied] channel to nil so that we'll never
399 1 : // hit this case again, and we don't busy loop.
400 1 : stepsApplied = nil
401 1 : // Record the replay time.
402 1 : r.metrics.workloadDuration = workloadExhaustedAt.Sub(startAt)
403 1 : }
404 : // Fall through to refreshing dbMetrics.
405 : }
406 : }
407 :
408 1 : m := r.d.Metrics()
409 1 : r.dbMetricsCond.L.Lock()
410 1 : r.dbMetrics = m
411 1 : r.dbMetricsCond.Broadcast()
412 1 : r.dbMetricsCond.L.Unlock()
413 1 :
414 1 : // Collect sample metrics. These metrics are calculated by sampling
415 1 : // every time we collect metrics.
416 1 : r.metrics.readAmp.record(int64(m.ReadAmp()))
417 1 : r.metrics.estimatedDebt.record(int64(m.Compact.EstimatedDebt))
418 1 : r.metrics.tombstoneCount.record(int64(m.Keys.TombstoneCount))
419 1 : r.metrics.totalSize.record(int64(m.DiskSpaceUsage()))
420 1 : r.metrics.writeThroughput.record(int64(r.metrics.writeBytes.Load()))
421 1 :
422 1 : compactionCount, alreadyCompleted, compactionCh = r.nextCompactionCompletes(compactionCount)
423 1 : // Consider whether replaying is complete. There are two necessary
424 1 : // conditions:
425 1 : //
426 1 : // 1. The workload must be exhausted.
427 1 : // 2. Compactions must have quiesced.
428 1 : //
429 1 : // The first condition is simple. The replay tool is responsible for
430 1 : // applying the workload. The goroutine responsible for applying the
431 1 : // workload closes the `stepsApplied` channel after the last step has
432 1 : // been applied, and we'll flip `workloadExhausted` to true.
433 1 : //
434 1 : // The second condition is tricky. The replay tool doesn't control
435 1 : // compactions and doesn't have visibility into whether the compaction
436 1 : // picker is about to schedule a new compaction. We can tell when
437 1 : // compactions are in progress or may be immeninent (eg, flushes in
438 1 : // progress). If it appears that compactions have quiesced, pause for a
439 1 : // fixed duration to see if a new one is scheduled. If not, consider
440 1 : // compactions quiesced.
441 1 : if workloadExhausted && !alreadyCompleted && r.compactionsAppearQuiesced(m) {
442 1 : select {
443 1 : case <-compactionCh:
444 1 : // A new compaction just finished; compactions have not
445 1 : // quiesced.
446 1 : continue
447 1 : case <-time.After(time.Second):
448 1 : // No compactions completed. If it still looks like they've
449 1 : // quiesced according to the metrics, consider them quiesced.
450 1 : if r.compactionsAppearQuiesced(r.d.Metrics()) {
451 1 : r.metrics.quiesceDuration = time.Since(workloadExhaustedAt)
452 1 : return nil
453 1 : }
454 : }
455 : }
456 : }
457 : }
458 :
459 : // compactionsAppearQuiesced returns true if the database may have quiesced, and
460 : // there likely won't be additional compactions scheduled. Detecting quiescence
461 : // is a bit fraught: The various signals that Pebble makes available are
462 : // adjusted at different points in the compaction lifecycle, and database
463 : // mutexes are dropped and acquired between them. This makes it difficult to
464 : // reliably identify when compactions quiesce.
465 : //
466 : // For example, our call to DB.Metrics() may acquire the DB.mu mutex when a
467 : // compaction has just successfully completed, but before it's managed to
468 : // schedule the next compaction (DB.mu is dropped while it attempts to acquire
469 : // the manifest lock).
470 1 : func (r *Runner) compactionsAppearQuiesced(m *pebble.Metrics) bool {
471 1 : r.compactionMu.Lock()
472 1 : defer r.compactionMu.Unlock()
473 1 : if m.Flush.NumInProgress > 0 {
474 1 : return false
475 1 : } else if m.Compact.NumInProgress > 0 && r.compactionMu.started != r.compactionMu.completed {
476 1 : return false
477 1 : }
478 1 : return true
479 : }
480 :
481 : // nextCompactionCompletes may be used to be notified when new compactions
482 : // complete. The caller is responsible for holding on to a monotonically
483 : // increasing count representing the number of compactions that have been
484 : // observed, beginning at zero.
485 : //
486 : // The caller passes their current count as an argument. If a new compaction has
487 : // already completed since their provided count, nextCompactionCompletes returns
488 : // the new count and a true boolean return value. If a new compaction has not
489 : // yet completed, it returns a channel that will be closed when the next
490 : // compaction completes. This scheme allows the caller to select{...},
491 : // performing some action on every compaction completion.
492 : func (r *Runner) nextCompactionCompletes(
493 : lastObserved int64,
494 1 : ) (count int64, alreadyOccurred bool, ch chan struct{}) {
495 1 : r.compactionMu.Lock()
496 1 : defer r.compactionMu.Unlock()
497 1 :
498 1 : if lastObserved < r.compactionMu.completed {
499 1 : // There has already been another compaction since the last one observed
500 1 : // by this caller. Return immediately.
501 1 : return r.compactionMu.completed, true, nil
502 1 : }
503 :
504 : // The last observed compaction is still the most recent compaction.
505 : // Return a channel that the caller can wait on to be notified when the
506 : // next compaction occurs.
507 1 : if r.compactionMu.ch == nil {
508 1 : r.compactionMu.ch = make(chan struct{})
509 1 : }
510 1 : return lastObserved, false, r.compactionMu.ch
511 : }
512 :
513 : // Wait waits for the workload replay to complete. Wait returns once the entire
514 : // workload has been replayed, and compactions have quiesced.
515 1 : func (r *Runner) Wait() (Metrics, error) {
516 1 : err := r.errgroup.Wait()
517 1 : if storedErr := r.err.Load(); storedErr != nil {
518 0 : err = storedErr.(error)
519 0 : }
520 1 : pm := r.d.Metrics()
521 1 : total := pm.Total()
522 1 : var ingestBytesWeighted uint64
523 1 : for l := 0; l < len(pm.Levels); l++ {
524 1 : ingestBytesWeighted += pm.Levels[l].BytesIngested * uint64(len(pm.Levels)-l-1)
525 1 : }
526 :
527 1 : m := Metrics{
528 1 : Final: pm,
529 1 : EstimatedDebt: r.metrics.estimatedDebt,
530 1 : PaceDuration: time.Duration(r.metrics.paceDurationNano.Load()),
531 1 : ReadAmp: r.metrics.readAmp,
532 1 : QuiesceDuration: r.metrics.quiesceDuration,
533 1 : TombstoneCount: r.metrics.tombstoneCount,
534 1 : TotalSize: r.metrics.totalSize,
535 1 : TotalWriteAmp: total.WriteAmp(),
536 1 : WorkloadDuration: r.metrics.workloadDuration,
537 1 : WriteBytes: r.metrics.writeBytes.Load(),
538 1 : WriteStalls: make(map[string]int),
539 1 : WriteStallsDuration: make(map[string]time.Duration),
540 1 : WriteThroughput: r.metrics.writeThroughput,
541 1 : }
542 1 :
543 1 : r.writeStallMetrics.Lock()
544 1 : for reason, count := range r.writeStallMetrics.countByReason {
545 1 : m.WriteStalls[reason] = count
546 1 : }
547 1 : for reason, duration := range r.writeStallMetrics.durationByReason {
548 1 : m.WriteStallsDuration[reason] = duration
549 1 : }
550 1 : r.writeStallMetrics.Unlock()
551 1 : m.CompactionCounts.Total = pm.Compact.Count
552 1 : m.CompactionCounts.Default = pm.Compact.DefaultCount
553 1 : m.CompactionCounts.DeleteOnly = pm.Compact.DeleteOnlyCount
554 1 : m.CompactionCounts.ElisionOnly = pm.Compact.ElisionOnlyCount
555 1 : m.CompactionCounts.Move = pm.Compact.MoveCount
556 1 : m.CompactionCounts.Read = pm.Compact.ReadCount
557 1 : m.CompactionCounts.Rewrite = pm.Compact.RewriteCount
558 1 : m.CompactionCounts.MultiLevel = pm.Compact.MultiLevelCount
559 1 : m.Ingest.BytesIntoL0 = pm.Levels[0].BytesIngested
560 1 : m.Ingest.BytesWeightedByLevel = ingestBytesWeighted
561 1 : return m, err
562 : }
563 :
564 : // Close closes remaining open resources, including the database. It must be
565 : // called after Wait.
566 1 : func (r *Runner) Close() error {
567 1 : return r.d.Close()
568 1 : }
569 :
570 : // A workloadStep describes a single manifest edit in the workload. It may be a
571 : // flush or ingest that should be applied to the test database, or it may be a
572 : // compaction that is surfaced to allow the replay logic to compare against the
573 : // state of the database at workload collection time.
574 : type workloadStep struct {
575 : kind stepKind
576 : ve manifest.VersionEdit
577 : // a Version describing the state of the LSM *before* the workload was
578 : // collected.
579 : pv *manifest.Version
580 : // a Version describing the state of the LSM when the workload was
581 : // collected.
582 : v *manifest.Version
583 : // non-nil for flushStepKind
584 : flushBatch *pebble.Batch
585 : tablesToIngest []string
586 : cumulativeWriteBytes uint64
587 : }
588 :
589 : type stepKind uint8
590 :
591 : const (
592 : flushStepKind stepKind = iota
593 : ingestStepKind
594 : compactionStepKind
595 : )
596 :
597 : // eventListener returns a Pebble EventListener that is installed on the replay
598 : // database so that the replay runner has access to internal Pebble events.
599 1 : func (r *Runner) eventListener() pebble.EventListener {
600 1 : var writeStallBegin time.Time
601 1 : var writeStallReason string
602 1 : l := pebble.EventListener{
603 1 : BackgroundError: func(err error) {
604 0 : r.err.Store(err)
605 0 : r.cancel()
606 0 : },
607 1 : WriteStallBegin: func(info pebble.WriteStallBeginInfo) {
608 1 : r.writeStallMetrics.Lock()
609 1 : defer r.writeStallMetrics.Unlock()
610 1 : writeStallReason = info.Reason
611 1 : // Take just the first word of the reason.
612 1 : if j := strings.IndexByte(writeStallReason, ' '); j != -1 {
613 1 : writeStallReason = writeStallReason[:j]
614 1 : }
615 1 : switch writeStallReason {
616 1 : case "L0", "memtable":
617 1 : r.writeStallMetrics.countByReason[writeStallReason]++
618 0 : default:
619 0 : panic(fmt.Sprintf("unrecognized write stall reason %q", info.Reason))
620 : }
621 1 : writeStallBegin = time.Now()
622 : },
623 1 : WriteStallEnd: func() {
624 1 : r.writeStallMetrics.Lock()
625 1 : defer r.writeStallMetrics.Unlock()
626 1 : r.writeStallMetrics.durationByReason[writeStallReason] += time.Since(writeStallBegin)
627 1 : },
628 1 : CompactionBegin: func(_ pebble.CompactionInfo) {
629 1 : r.compactionMu.Lock()
630 1 : defer r.compactionMu.Unlock()
631 1 : r.compactionMu.started++
632 1 : },
633 1 : CompactionEnd: func(_ pebble.CompactionInfo) {
634 1 : // Keep track of the number of compactions that complete and notify
635 1 : // anyone waiting for a compaction to complete. See the function
636 1 : // nextCompactionCompletes for the corresponding receiver side.
637 1 : r.compactionMu.Lock()
638 1 : defer r.compactionMu.Unlock()
639 1 : r.compactionMu.completed++
640 1 : if r.compactionMu.ch != nil {
641 1 : // Signal that a compaction has completed.
642 1 : close(r.compactionMu.ch)
643 1 : r.compactionMu.ch = nil
644 1 : }
645 : },
646 : }
647 1 : l.EnsureDefaults(nil)
648 1 : return l
649 : }
650 :
651 : // applyWorkloadSteps runs in its own goroutine, reading workload steps off the
652 : // r.steps channel and applying them to the test database.
653 1 : func (r *Runner) applyWorkloadSteps(ctx context.Context) error {
654 1 : for {
655 1 : var ok bool
656 1 : var step workloadStep
657 1 : select {
658 0 : case <-ctx.Done():
659 0 : return ctx.Err()
660 1 : case step, ok = <-r.steps:
661 1 : if !ok {
662 1 : // Exhausted the workload. Exit.
663 1 : close(r.stepsApplied)
664 1 : return nil
665 1 : }
666 : }
667 :
668 1 : paceDur := r.Pacer.pace(r, step)
669 1 : r.metrics.paceDurationNano.Add(uint64(paceDur))
670 1 :
671 1 : switch step.kind {
672 1 : case flushStepKind:
673 1 : if err := step.flushBatch.Commit(&pebble.WriteOptions{Sync: false}); err != nil {
674 0 : return err
675 0 : }
676 1 : _, err := r.d.AsyncFlush()
677 1 : if err != nil {
678 0 : return err
679 0 : }
680 1 : r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
681 1 : r.stepsApplied <- step
682 0 : case ingestStepKind:
683 0 : if err := r.d.Ingest(step.tablesToIngest); err != nil {
684 0 : return err
685 0 : }
686 0 : r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
687 0 : r.stepsApplied <- step
688 1 : case compactionStepKind:
689 : // No-op.
690 : // TODO(jackson): Should we elide this earlier?
691 0 : default:
692 0 : panic("unreachable")
693 : }
694 : }
695 : }
696 :
697 : // prepareWorkloadSteps runs in its own goroutine, reading the workload
698 : // manifests in order to reconstruct the workload and prepare each step to be
699 : // applied. It sends each workload step to the r.steps channel.
700 1 : func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
701 1 : defer func() { close(r.steps) }()
702 :
703 1 : idx := r.workload.manifestIdx
704 1 :
705 1 : var cumulativeWriteBytes uint64
706 1 : var flushBufs flushBuffers
707 1 : var v *manifest.Version
708 1 : var previousVersion *manifest.Version
709 1 : var bve manifest.BulkVersionEdit
710 1 : bve.AddedByFileNum = make(map[base.FileNum]*manifest.FileMetadata)
711 1 : applyVE := func(ve *manifest.VersionEdit) error {
712 1 : return bve.Accumulate(ve)
713 1 : }
714 1 : currentVersion := func() (*manifest.Version, error) {
715 1 : var err error
716 1 : v, err = bve.Apply(v,
717 1 : r.Opts.Comparer.Compare,
718 1 : r.Opts.Comparer.FormatKey,
719 1 : r.Opts.FlushSplitBytes,
720 1 : r.Opts.Experimental.ReadCompactionRate,
721 1 : nil /* zombies */)
722 1 : bve = manifest.BulkVersionEdit{AddedByFileNum: bve.AddedByFileNum}
723 1 : return v, err
724 1 : }
725 :
726 1 : for ; idx < len(r.workload.manifests); idx++ {
727 1 : if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
728 0 : break
729 : }
730 :
731 1 : err := func() error {
732 1 : manifestName := r.workload.manifests[idx]
733 1 : f, err := r.WorkloadFS.Open(r.WorkloadFS.PathJoin(r.WorkloadPath, manifestName))
734 1 : if err != nil {
735 0 : return err
736 0 : }
737 1 : defer f.Close()
738 1 :
739 1 : rr := record.NewReader(f, 0 /* logNum */)
740 1 : // A manifest's first record always holds the initial version state.
741 1 : // If this is the first manifest we're examining, we load it in
742 1 : // order to seed `metas` with the file metadata of the existing
743 1 : // files. Otherwise, we can skip it because we already know all the
744 1 : // file metadatas up to this point.
745 1 : rec, err := rr.Next()
746 1 : if err != nil {
747 0 : return err
748 0 : }
749 1 : if idx == r.workload.manifestIdx {
750 1 : var ve manifest.VersionEdit
751 1 : if err := ve.Decode(rec); err != nil {
752 0 : return err
753 0 : }
754 1 : if err := applyVE(&ve); err != nil {
755 0 : return err
756 0 : }
757 : }
758 :
759 : // Read the remaining of the manifests version edits, one-by-one.
760 1 : for {
761 1 : rec, err := rr.Next()
762 1 : if err == io.EOF || record.IsInvalidRecord(err) {
763 1 : break
764 1 : } else if err != nil {
765 0 : return err
766 0 : }
767 1 : var ve manifest.VersionEdit
768 1 : if err = ve.Decode(rec); err == io.EOF || record.IsInvalidRecord(err) {
769 0 : break
770 1 : } else if err != nil {
771 0 : return err
772 0 : }
773 1 : if err := applyVE(&ve); err != nil {
774 0 : return err
775 0 : }
776 1 : if idx == r.workload.manifestIdx && rr.Offset() <= r.workload.manifestOff {
777 1 : // The record rec began at an offset strictly less than
778 1 : // rr.Offset(), which means it's strictly less than
779 1 : // r.workload.manifestOff, and we should skip it.
780 1 : continue
781 : }
782 1 : if len(ve.NewFiles) == 0 && len(ve.DeletedFiles) == 0 {
783 1 : // Skip WAL rotations and other events that don't affect the
784 1 : // files of the LSM.
785 1 : continue
786 : }
787 :
788 1 : s := workloadStep{ve: ve}
789 1 : if len(ve.DeletedFiles) > 0 {
790 1 : // If a version edit deletes files, we assume it's a compaction.
791 1 : s.kind = compactionStepKind
792 1 : } else {
793 1 : // Default to ingest. If any files have unequal
794 1 : // smallest,largest sequence numbers, we'll update this to a
795 1 : // flush.
796 1 : s.kind = ingestStepKind
797 1 : }
798 1 : var newFiles []base.DiskFileNum
799 1 : for _, nf := range ve.NewFiles {
800 1 : newFiles = append(newFiles, nf.Meta.FileBacking.DiskFileNum)
801 1 : if s.kind == ingestStepKind && (nf.Meta.SmallestSeqNum != nf.Meta.LargestSeqNum || nf.Level != 0) {
802 1 : s.kind = flushStepKind
803 1 : }
804 : }
805 : // Add the current reference *Version to the step. This provides
806 : // access to, for example, the read-amplification of the
807 : // database at this point when the workload was collected. This
808 : // can be useful for pacing.
809 1 : if s.v, err = currentVersion(); err != nil {
810 0 : return err
811 0 : }
812 : // On the first time through, we set the previous version to the current
813 : // version otherwise we set it to the actual previous version.
814 1 : if previousVersion == nil {
815 1 : previousVersion = s.v
816 1 : }
817 1 : s.pv = previousVersion
818 1 : previousVersion = s.v
819 1 :
820 1 : // It's possible that the workload collector captured this
821 1 : // version edit, but wasn't able to collect all of the
822 1 : // corresponding sstables before being terminated.
823 1 : if s.kind == flushStepKind || s.kind == ingestStepKind {
824 1 : for _, fileNum := range newFiles {
825 1 : if _, ok := r.workload.sstables[fileNum.FileNum()]; !ok {
826 0 : // TODO(jackson,leon): This isn't exactly an error
827 0 : // condition. Give this more thought; do we want to
828 0 : // require graceful exiting of workload collection,
829 0 : // such that the last version edit must have had its
830 0 : // corresponding sstables collected?
831 0 : return errors.Newf("sstable %s not found", fileNum)
832 0 : }
833 : }
834 : }
835 :
836 1 : switch s.kind {
837 1 : case flushStepKind:
838 1 : // Load all of the flushed sstables' keys into a batch.
839 1 : s.flushBatch = r.d.NewBatch()
840 1 : if err := loadFlushedSSTableKeys(s.flushBatch, r.WorkloadFS, r.WorkloadPath, newFiles, r.readerOpts, &flushBufs); err != nil {
841 0 : return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset())
842 0 : }
843 1 : cumulativeWriteBytes += uint64(s.flushBatch.Len())
844 0 : case ingestStepKind:
845 0 : // Copy the ingested sstables into a staging area within the
846 0 : // run dir. This is necessary for two reasons:
847 0 : // a) Ingest will remove the source file, and we don't want
848 0 : // to mutate the workload.
849 0 : // b) If the workload stored on another volume, Ingest
850 0 : // would need to fall back to copying the file since
851 0 : // it's not possible to link across volumes. The true
852 0 : // workload likely linked the file. Staging the file
853 0 : // ahead of time ensures that we're able to Link the
854 0 : // file like the original workload did.
855 0 : for _, fileNum := range newFiles {
856 0 : src := base.MakeFilepath(r.WorkloadFS, r.WorkloadPath, base.FileTypeTable, fileNum)
857 0 : dst := base.MakeFilepath(r.Opts.FS, r.stagingDir, base.FileTypeTable, fileNum)
858 0 : if err := vfs.CopyAcrossFS(r.WorkloadFS, src, r.Opts.FS, dst); err != nil {
859 0 : return errors.Wrapf(err, "ingest in %q at offset %d", manifestName, rr.Offset())
860 0 : }
861 0 : finfo, err := r.Opts.FS.Stat(dst)
862 0 : if err != nil {
863 0 : return errors.Wrapf(err, "stating %q", dst)
864 0 : }
865 0 : cumulativeWriteBytes += uint64(finfo.Size())
866 0 : s.tablesToIngest = append(s.tablesToIngest, dst)
867 : }
868 1 : case compactionStepKind:
869 : // Nothing to do.
870 : }
871 1 : s.cumulativeWriteBytes = cumulativeWriteBytes
872 1 :
873 1 : select {
874 0 : case <-ctx.Done():
875 0 : return ctx.Err()
876 1 : case r.steps <- s:
877 : }
878 :
879 1 : if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes {
880 0 : break
881 : }
882 : }
883 1 : return nil
884 : }()
885 1 : if err != nil {
886 0 : return err
887 0 : }
888 : }
889 1 : return nil
890 : }
891 :
892 : // findWorkloadFiles finds all manifests and tables in the provided path on fs.
893 : func findWorkloadFiles(
894 : path string, fs vfs.FS,
895 1 : ) (manifests []string, sstables map[base.FileNum]struct{}, err error) {
896 1 : dirents, err := fs.List(path)
897 1 : if err != nil {
898 0 : return nil, nil, err
899 0 : }
900 1 : sstables = make(map[base.FileNum]struct{})
901 1 : for _, dirent := range dirents {
902 1 : typ, fileNum, ok := base.ParseFilename(fs, dirent)
903 1 : if !ok {
904 1 : continue
905 : }
906 1 : switch typ {
907 1 : case base.FileTypeManifest:
908 1 : manifests = append(manifests, dirent)
909 1 : case base.FileTypeTable:
910 1 : sstables[fileNum.FileNum()] = struct{}{}
911 : }
912 : }
913 1 : if len(manifests) == 0 {
914 1 : return nil, nil, errors.Newf("no manifests found")
915 1 : }
916 1 : sort.Strings(manifests)
917 1 : return manifests, sstables, err
918 : }
919 :
920 : // findManifestStart takes a database directory and FS containing the initial
921 : // database state that a workload will be run against, and a list of a workloads
922 : // manifests. It examines the database's current manifest to determine where
923 : // workload replay should begin, so as to not duplicate already-applied version
924 : // edits.
925 : //
926 : // It returns the index of the starting manifest, and the database's current
927 : // offset within the manifest.
928 : func findManifestStart(
929 : dbDir string, dbFS vfs.FS, manifests []string,
930 1 : ) (index int, offset int64, err error) {
931 1 : // Identify the database's current manifest.
932 1 : dbDesc, err := pebble.Peek(dbDir, dbFS)
933 1 : if err != nil {
934 0 : return 0, 0, err
935 0 : }
936 1 : dbManifest := dbFS.PathBase(dbDesc.ManifestFilename)
937 1 : // If there is no initial database state, begin workload replay from the
938 1 : // beginning of the first manifest.
939 1 : if !dbDesc.Exists {
940 1 : return 0, 0, nil
941 1 : }
942 1 : for index = 0; index < len(manifests); index++ {
943 1 : if manifests[index] == dbManifest {
944 1 : break
945 : }
946 : }
947 1 : if index == len(manifests) {
948 1 : // The initial database state has a manifest that does not appear within
949 1 : // the workload's set of manifests. This is possible if we began
950 1 : // recording the workload at the same time as a manifest rotation, but
951 1 : // more likely we're applying a workload to a different initial database
952 1 : // state than the one from which the workload was collected. Either way,
953 1 : // start from the beginning of the first manifest.
954 1 : return 0, 0, nil
955 1 : }
956 : // Find the initial database's offset within the manifest.
957 1 : info, err := dbFS.Stat(dbFS.PathJoin(dbDir, dbManifest))
958 1 : if err != nil {
959 0 : return 0, 0, err
960 0 : }
961 1 : return index, info.Size(), nil
962 : }
963 :
964 : // loadFlushedSSTableKeys copies keys from the sstables specified by `fileNums`
965 : // in the directory specified by `path` into the provided the batch. Keys are
966 : // applied to the batch in the order dictated by their sequence numbers within
967 : // the sstables, ensuring the relative relationship between sequence numbers is
968 : // maintained.
969 : //
970 : // Preserving the relative relationship between sequence numbers is not strictly
971 : // necessary, but it ensures we accurately exercise some microoptimizations (eg,
972 : // detecting user key changes by descending trailer). There may be additional
973 : // dependencies on sequence numbers in the future.
974 : func loadFlushedSSTableKeys(
975 : b *pebble.Batch,
976 : fs vfs.FS,
977 : path string,
978 : fileNums []base.DiskFileNum,
979 : readOpts sstable.ReaderOptions,
980 : bufs *flushBuffers,
981 1 : ) error {
982 1 : // Load all the keys across all the sstables.
983 1 : for _, fileNum := range fileNums {
984 1 : if err := func() error {
985 1 : filePath := base.MakeFilepath(fs, path, base.FileTypeTable, fileNum)
986 1 : f, err := fs.Open(filePath)
987 1 : if err != nil {
988 0 : return err
989 0 : }
990 1 : readable, err := sstable.NewSimpleReadable(f)
991 1 : if err != nil {
992 0 : f.Close()
993 0 : return err
994 0 : }
995 1 : r, err := sstable.NewReader(readable, readOpts)
996 1 : if err != nil {
997 0 : return err
998 0 : }
999 1 : defer r.Close()
1000 1 :
1001 1 : // Load all the point keys.
1002 1 : iter, err := r.NewIter(nil, nil)
1003 1 : if err != nil {
1004 0 : return err
1005 0 : }
1006 1 : defer iter.Close()
1007 1 : for k, lv := iter.First(); k != nil; k, lv = iter.Next() {
1008 1 : var key flushedKey
1009 1 : key.Trailer = k.Trailer
1010 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
1011 1 : if v, callerOwned, err := lv.Value(nil); err != nil {
1012 0 : return err
1013 1 : } else if callerOwned {
1014 0 : key.value = v
1015 1 : } else {
1016 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1017 1 : }
1018 1 : bufs.keys = append(bufs.keys, key)
1019 : }
1020 :
1021 : // Load all the range tombstones.
1022 1 : if iter, err := r.NewRawRangeDelIter(); err != nil {
1023 0 : return err
1024 1 : } else if iter != nil {
1025 1 : defer iter.Close()
1026 1 : for s := iter.First(); s != nil; s = iter.Next() {
1027 1 : if err := rangedel.Encode(s, func(k base.InternalKey, v []byte) error {
1028 1 : var key flushedKey
1029 1 : key.Trailer = k.Trailer
1030 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
1031 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1032 1 : bufs.keys = append(bufs.keys, key)
1033 1 : return nil
1034 1 : }); err != nil {
1035 0 : return err
1036 0 : }
1037 : }
1038 : }
1039 :
1040 : // Load all the range keys.
1041 1 : if iter, err := r.NewRawRangeKeyIter(); err != nil {
1042 0 : return err
1043 1 : } else if iter != nil {
1044 1 : defer iter.Close()
1045 1 : for s := iter.First(); s != nil; s = iter.Next() {
1046 1 : if err := rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
1047 1 : var key flushedKey
1048 1 : key.Trailer = k.Trailer
1049 1 : bufs.alloc, key.UserKey = bufs.alloc.Copy(k.UserKey)
1050 1 : bufs.alloc, key.value = bufs.alloc.Copy(v)
1051 1 : bufs.keys = append(bufs.keys, key)
1052 1 : return nil
1053 1 : }); err != nil {
1054 0 : return err
1055 0 : }
1056 : }
1057 : }
1058 1 : return nil
1059 0 : }(); err != nil {
1060 0 : return err
1061 0 : }
1062 : }
1063 :
1064 : // Sort the flushed keys by their sequence numbers so that we can apply them
1065 : // to the batch in the same order, maintaining the relative relationship
1066 : // between keys.
1067 : // NB: We use a stable sort so that keys corresponding to span fragments
1068 : // (eg, range tombstones and range keys) have a deterministic ordering for
1069 : // testing.
1070 1 : sort.Stable(bufs.keys)
1071 1 :
1072 1 : // Add the keys to the batch in the order they were committed when the
1073 1 : // workload was captured.
1074 1 : for i := 0; i < len(bufs.keys); i++ {
1075 1 : var err error
1076 1 : switch bufs.keys[i].Kind() {
1077 1 : case base.InternalKeyKindDelete:
1078 1 : err = b.Delete(bufs.keys[i].UserKey, nil)
1079 0 : case base.InternalKeyKindDeleteSized:
1080 0 : v, _ := binary.Uvarint(bufs.keys[i].value)
1081 0 : // Batch.DeleteSized takes just the length of the value being
1082 0 : // deleted and adds the key's length to derive the overall entry
1083 0 : // size of the value being deleted. This has already been done to
1084 0 : // the key we're reading from the sstable, so we must subtract the
1085 0 : // key length from the encoded value before calling b.DeleteSized,
1086 0 : // which will again add the key length before encoding.
1087 0 : err = b.DeleteSized(bufs.keys[i].UserKey, uint32(v-uint64(len(bufs.keys[i].UserKey))), nil)
1088 1 : case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete:
1089 1 : err = b.Set(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1090 0 : case base.InternalKeyKindMerge:
1091 0 : err = b.Merge(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1092 1 : case base.InternalKeyKindSingleDelete:
1093 1 : err = b.SingleDelete(bufs.keys[i].UserKey, nil)
1094 1 : case base.InternalKeyKindRangeDelete:
1095 1 : err = b.DeleteRange(bufs.keys[i].UserKey, bufs.keys[i].value, nil)
1096 1 : case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
1097 1 : s, err := rangekey.Decode(bufs.keys[i].InternalKey, bufs.keys[i].value, nil)
1098 1 : if err != nil {
1099 0 : return err
1100 0 : }
1101 1 : if len(s.Keys) != 1 {
1102 0 : return errors.Newf("range key span unexpectedly contains %d keys", len(s.Keys))
1103 0 : }
1104 1 : switch bufs.keys[i].Kind() {
1105 1 : case base.InternalKeyKindRangeKeySet:
1106 1 : err = b.RangeKeySet(s.Start, s.End, s.Keys[0].Suffix, s.Keys[0].Value, nil)
1107 1 : case base.InternalKeyKindRangeKeyUnset:
1108 1 : err = b.RangeKeyUnset(s.Start, s.End, s.Keys[0].Suffix, nil)
1109 1 : case base.InternalKeyKindRangeKeyDelete:
1110 1 : err = b.RangeKeyDelete(s.Start, s.End, nil)
1111 0 : default:
1112 0 : err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
1113 : }
1114 1 : if err != nil {
1115 0 : return err
1116 0 : }
1117 0 : default:
1118 0 : err = errors.Newf("unexpected key kind %q", bufs.keys[i].Kind())
1119 : }
1120 1 : if err != nil {
1121 0 : return err
1122 0 : }
1123 : }
1124 :
1125 : // Done with the flushBuffers. Reset.
1126 1 : bufs.keys = bufs.keys[:0]
1127 1 : return nil
1128 : }
1129 :
1130 : type flushBuffers struct {
1131 : keys flushedKeysByTrailer
1132 : alloc bytealloc.A
1133 : }
1134 :
1135 : type flushedKeysByTrailer []flushedKey
1136 :
1137 1 : func (s flushedKeysByTrailer) Len() int { return len(s) }
1138 1 : func (s flushedKeysByTrailer) Less(i, j int) bool { return s[i].Trailer < s[j].Trailer }
1139 1 : func (s flushedKeysByTrailer) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
1140 :
1141 : type flushedKey struct {
1142 : base.InternalKey
1143 : value []byte
1144 : }
|