LCOV - code coverage report
Current view: top level - pebble/metamorphic - ops.go (source / functions) Hit Total Coverage
Test: 2023-11-21 08:16Z 407f8606 - tests + meta.lcov Lines: 867 976 88.8 %
Date: 2023-11-21 08:17:09 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "crypto/rand"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "path"
      15             :         "path/filepath"
      16             :         "strings"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble"
      20             :         "github.com/cockroachdb/pebble/internal/base"
      21             :         "github.com/cockroachdb/pebble/internal/keyspan"
      22             :         "github.com/cockroachdb/pebble/internal/private"
      23             :         "github.com/cockroachdb/pebble/internal/rangekey"
      24             :         "github.com/cockroachdb/pebble/internal/testkeys"
      25             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      26             :         "github.com/cockroachdb/pebble/sstable"
      27             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      28             : )
      29             : 
      30             : // op defines the interface for a single operation, such as creating a batch,
      31             : // or advancing an iterator.
      32             : type op interface {
      33             :         String() string
      34             :         run(t *test, h historyRecorder)
      35             : 
      36             :         // receiver returns the object ID of the object the operation is performed
      37             :         // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
      38             :         // its receiver). Receivers are used for synchronization when running with
      39             :         // concurrency.
      40             :         receiver() objID
      41             : 
      42             :         // syncObjs returns an additional set of object IDs—excluding the
      43             :         // receiver—that the operation must synchronize with. At execution time,
      44             :         // the operation will run serially with respect to all other operations
      45             :         // that return these objects from their own syncObjs or receiver methods.
      46             :         syncObjs() objIDSlice
      47             : }
      48             : 
      49             : // initOp performs test initialization
      50             : type initOp struct {
      51             :         dbSlots       uint32
      52             :         batchSlots    uint32
      53             :         iterSlots     uint32
      54             :         snapshotSlots uint32
      55             : }
      56             : 
      57           2 : func (o *initOp) run(t *test, h historyRecorder) {
      58           2 :         t.batches = make([]*pebble.Batch, o.batchSlots)
      59           2 :         t.iters = make([]*retryableIter, o.iterSlots)
      60           2 :         t.snapshots = make([]readerCloser, o.snapshotSlots)
      61           2 :         h.Recordf("%s", o)
      62           2 : }
      63             : 
      64           2 : func (o *initOp) String() string {
      65           2 :         return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */)",
      66           2 :                 o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots)
      67           2 : }
      68             : 
      69           2 : func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
      70           2 : func (o *initOp) syncObjs() objIDSlice {
      71           2 :         syncObjs := make([]objID, 0)
      72           2 :         // Add any additional DBs to syncObjs.
      73           2 :         for i := uint32(2); i < o.dbSlots+1; i++ {
      74           1 :                 syncObjs = append(syncObjs, makeObjID(dbTag, i))
      75           1 :         }
      76           2 :         return syncObjs
      77             : }
      78             : 
      79             : // applyOp models a Writer.Apply operation.
      80             : type applyOp struct {
      81             :         writerID objID
      82             :         batchID  objID
      83             : }
      84             : 
      85           2 : func (o *applyOp) run(t *test, h historyRecorder) {
      86           2 :         b := t.getBatch(o.batchID)
      87           2 :         w := t.getWriter(o.writerID)
      88           2 :         var err error
      89           2 :         if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
      90           1 :                 err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
      91           1 :                 if err == nil {
      92           1 :                         err = b.SyncWait()
      93           1 :                 }
      94           2 :         } else {
      95           2 :                 err = w.Apply(b, t.writeOpts)
      96           2 :         }
      97           2 :         h.Recordf("%s // %v", o, err)
      98             :         // batch will be closed by a closeOp which is guaranteed to be generated
      99             : }
     100             : 
     101           2 : func (o *applyOp) String() string  { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
     102           2 : func (o *applyOp) receiver() objID { return o.writerID }
     103           2 : func (o *applyOp) syncObjs() objIDSlice {
     104           2 :         // Apply should not be concurrent with operations that are mutating the
     105           2 :         // batch.
     106           2 :         return []objID{o.batchID}
     107           2 : }
     108             : 
     109             : // checkpointOp models a DB.Checkpoint operation.
     110             : type checkpointOp struct {
     111             :         dbID objID
     112             :         // If non-empty, the checkpoint is restricted to these spans.
     113             :         spans []pebble.CheckpointSpan
     114             : }
     115             : 
     116           2 : func (o *checkpointOp) run(t *test, h historyRecorder) {
     117           2 :         // TODO(josh): db.Checkpoint does not work with shared storage yet.
     118           2 :         // It would be better to filter out ahead of calling run on the op,
     119           2 :         // by setting the weight that generator.go uses to zero, or similar.
     120           2 :         // But IIUC the ops are shared for ALL the metamorphic test runs, so
     121           2 :         // not sure how to do that easily:
     122           2 :         // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
     123           2 :         if t.testOpts.sharedStorageEnabled {
     124           1 :                 h.Recordf("%s // %v", o, nil)
     125           1 :                 return
     126           1 :         }
     127           2 :         var opts []pebble.CheckpointOption
     128           2 :         if len(o.spans) > 0 {
     129           2 :                 opts = append(opts, pebble.WithRestrictToSpans(o.spans))
     130           2 :         }
     131           2 :         db := t.getDB(o.dbID)
     132           2 :         err := withRetries(func() error {
     133           2 :                 return db.Checkpoint(o.dir(t.dir, h.op), opts...)
     134           2 :         })
     135           2 :         h.Recordf("%s // %v", o, err)
     136             : }
     137             : 
     138           2 : func (o *checkpointOp) dir(dataDir string, idx int) string {
     139           2 :         return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
     140           2 : }
     141             : 
     142           2 : func (o *checkpointOp) String() string {
     143           2 :         var spanStr bytes.Buffer
     144           2 :         for i, span := range o.spans {
     145           2 :                 if i > 0 {
     146           2 :                         spanStr.WriteString(",")
     147           2 :                 }
     148           2 :                 fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
     149             :         }
     150           2 :         return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
     151             : }
     152             : 
     153           2 : func (o *checkpointOp) receiver() objID      { return o.dbID }
     154           2 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
     155             : 
     156             : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
     157             : type closeOp struct {
     158             :         objID       objID
     159             :         derivedDBID objID
     160             : }
     161             : 
     162           2 : func (o *closeOp) run(t *test, h historyRecorder) {
     163           2 :         c := t.getCloser(o.objID)
     164           2 :         if o.objID.tag() == dbTag && t.opts.DisableWAL {
     165           1 :                 // Special case: If WAL is disabled, do a flush right before DB Close. This
     166           1 :                 // allows us to reuse this run's data directory as initial state for
     167           1 :                 // future runs without losing any mutations.
     168           1 :                 _ = t.getDB(o.objID).Flush()
     169           1 :         }
     170           2 :         t.clearObj(o.objID)
     171           2 :         err := c.Close()
     172           2 :         h.Recordf("%s // %v", o, err)
     173             : }
     174             : 
     175           2 : func (o *closeOp) String() string  { return fmt.Sprintf("%s.Close()", o.objID) }
     176           2 : func (o *closeOp) receiver() objID { return o.objID }
     177           2 : func (o *closeOp) syncObjs() objIDSlice {
     178           2 :         // Synchronize on the database so that we don't close the database before
     179           2 :         // all its iterators, snapshots and batches are closed.
     180           2 :         // TODO(jackson): It would be nice to relax this so that Close calls can
     181           2 :         // execute in parallel.
     182           2 :         if o.objID.tag() == dbTag {
     183           2 :                 return nil
     184           2 :         }
     185           2 :         if o.derivedDBID != 0 {
     186           2 :                 return []objID{o.derivedDBID}
     187           2 :         }
     188           0 :         return nil
     189             : }
     190             : 
     191             : // compactOp models a DB.Compact operation.
     192             : type compactOp struct {
     193             :         dbID        objID
     194             :         start       []byte
     195             :         end         []byte
     196             :         parallelize bool
     197             : }
     198             : 
     199           2 : func (o *compactOp) run(t *test, h historyRecorder) {
     200           2 :         err := withRetries(func() error {
     201           2 :                 return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
     202           2 :         })
     203           2 :         h.Recordf("%s // %v", o, err)
     204             : }
     205             : 
     206           2 : func (o *compactOp) String() string {
     207           2 :         return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)", o.dbID, o.start, o.end, o.parallelize)
     208           2 : }
     209             : 
     210           2 : func (o *compactOp) receiver() objID      { return o.dbID }
     211           2 : func (o *compactOp) syncObjs() objIDSlice { return nil }
     212             : 
     213             : // deleteOp models a Write.Delete operation.
     214             : type deleteOp struct {
     215             :         writerID objID
     216             :         key      []byte
     217             : 
     218             :         derivedDBID objID
     219             : }
     220             : 
     221           2 : func (o *deleteOp) run(t *test, h historyRecorder) {
     222           2 :         w := t.getWriter(o.writerID)
     223           2 :         var err error
     224           2 :         if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
     225           1 :                 // Call DeleteSized with a deterministic size derived from the index.
     226           1 :                 // The size does not need to be accurate for correctness.
     227           1 :                 err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
     228           2 :         } else {
     229           2 :                 err = w.Delete(o.key, t.writeOpts)
     230           2 :         }
     231           2 :         h.Recordf("%s // %v", o, err)
     232             : }
     233             : 
     234           1 : func hashSize(index int) uint32 {
     235           1 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
     236           1 :         return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
     237           1 : }
     238             : 
     239           2 : func (o *deleteOp) String() string {
     240           2 :         return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
     241           2 : }
     242           2 : func (o *deleteOp) receiver() objID      { return o.writerID }
     243           2 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
     244             : 
     245             : // singleDeleteOp models a Write.SingleDelete operation.
     246             : type singleDeleteOp struct {
     247             :         writerID           objID
     248             :         key                []byte
     249             :         maybeReplaceDelete bool
     250             : }
     251             : 
     252           2 : func (o *singleDeleteOp) run(t *test, h historyRecorder) {
     253           2 :         w := t.getWriter(o.writerID)
     254           2 :         var err error
     255           2 :         if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
     256           1 :                 err = w.Delete(o.key, t.writeOpts)
     257           2 :         } else {
     258           2 :                 err = w.SingleDelete(o.key, t.writeOpts)
     259           2 :         }
     260             :         // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
     261             :         // write the former to the history log. The log line will indicate whether
     262             :         // or not the delete *could* have been replaced. The OPTIONS file should
     263             :         // also be consulted to determine what happened at runtime (i.e. by taking
     264             :         // the logical AND).
     265           2 :         h.Recordf("%s // %v", o, err)
     266             : }
     267             : 
     268           2 : func (o *singleDeleteOp) String() string {
     269           2 :         return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
     270           2 : }
     271             : 
     272           2 : func (o *singleDeleteOp) receiver() objID      { return o.writerID }
     273           2 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
     274             : 
     275             : // deleteRangeOp models a Write.DeleteRange operation.
     276             : type deleteRangeOp struct {
     277             :         writerID objID
     278             :         start    []byte
     279             :         end      []byte
     280             : }
     281             : 
     282           2 : func (o *deleteRangeOp) run(t *test, h historyRecorder) {
     283           2 :         w := t.getWriter(o.writerID)
     284           2 :         err := w.DeleteRange(o.start, o.end, t.writeOpts)
     285           2 :         h.Recordf("%s // %v", o, err)
     286           2 : }
     287             : 
     288           2 : func (o *deleteRangeOp) String() string {
     289           2 :         return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
     290           2 : }
     291             : 
     292           2 : func (o *deleteRangeOp) receiver() objID      { return o.writerID }
     293           2 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
     294             : 
     295             : // flushOp models a DB.Flush operation.
     296             : type flushOp struct {
     297             :         db objID
     298             : }
     299             : 
     300           2 : func (o *flushOp) run(t *test, h historyRecorder) {
     301           2 :         db := t.getDB(o.db)
     302           2 :         err := db.Flush()
     303           2 :         h.Recordf("%s // %v", o, err)
     304           2 : }
     305             : 
     306           2 : func (o *flushOp) String() string       { return fmt.Sprintf("%s.Flush()", o.db) }
     307           2 : func (o *flushOp) receiver() objID      { return o.db }
     308           2 : func (o *flushOp) syncObjs() objIDSlice { return nil }
     309             : 
     310             : // mergeOp models a Write.Merge operation.
     311             : type mergeOp struct {
     312             :         writerID objID
     313             :         key      []byte
     314             :         value    []byte
     315             : }
     316             : 
     317           2 : func (o *mergeOp) run(t *test, h historyRecorder) {
     318           2 :         w := t.getWriter(o.writerID)
     319           2 :         err := w.Merge(o.key, o.value, t.writeOpts)
     320           2 :         h.Recordf("%s // %v", o, err)
     321           2 : }
     322             : 
     323           2 : func (o *mergeOp) String() string       { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
     324           2 : func (o *mergeOp) receiver() objID      { return o.writerID }
     325           2 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
     326             : 
     327             : // setOp models a Write.Set operation.
     328             : type setOp struct {
     329             :         writerID objID
     330             :         key      []byte
     331             :         value    []byte
     332             : }
     333             : 
     334           2 : func (o *setOp) run(t *test, h historyRecorder) {
     335           2 :         w := t.getWriter(o.writerID)
     336           2 :         err := w.Set(o.key, o.value, t.writeOpts)
     337           2 :         h.Recordf("%s // %v", o, err)
     338           2 : }
     339             : 
     340           2 : func (o *setOp) String() string       { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
     341           2 : func (o *setOp) receiver() objID      { return o.writerID }
     342           2 : func (o *setOp) syncObjs() objIDSlice { return nil }
     343             : 
     344             : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
     345             : type rangeKeyDeleteOp struct {
     346             :         writerID objID
     347             :         start    []byte
     348             :         end      []byte
     349             : }
     350             : 
     351           2 : func (o *rangeKeyDeleteOp) run(t *test, h historyRecorder) {
     352           2 :         w := t.getWriter(o.writerID)
     353           2 :         err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
     354           2 :         h.Recordf("%s // %v", o, err)
     355           2 : }
     356             : 
     357           2 : func (o *rangeKeyDeleteOp) String() string {
     358           2 :         return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
     359           2 : }
     360             : 
     361           2 : func (o *rangeKeyDeleteOp) receiver() objID      { return o.writerID }
     362           2 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
     363             : 
     364             : // rangeKeySetOp models a Write.RangeKeySet operation.
     365             : type rangeKeySetOp struct {
     366             :         writerID objID
     367             :         start    []byte
     368             :         end      []byte
     369             :         suffix   []byte
     370             :         value    []byte
     371             : }
     372             : 
     373           2 : func (o *rangeKeySetOp) run(t *test, h historyRecorder) {
     374           2 :         w := t.getWriter(o.writerID)
     375           2 :         err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
     376           2 :         h.Recordf("%s // %v", o, err)
     377           2 : }
     378             : 
     379           2 : func (o *rangeKeySetOp) String() string {
     380           2 :         return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
     381           2 :                 o.writerID, o.start, o.end, o.suffix, o.value)
     382           2 : }
     383             : 
     384           2 : func (o *rangeKeySetOp) receiver() objID      { return o.writerID }
     385           2 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
     386             : 
     387             : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
     388             : type rangeKeyUnsetOp struct {
     389             :         writerID objID
     390             :         start    []byte
     391             :         end      []byte
     392             :         suffix   []byte
     393             : }
     394             : 
     395           2 : func (o *rangeKeyUnsetOp) run(t *test, h historyRecorder) {
     396           2 :         w := t.getWriter(o.writerID)
     397           2 :         err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
     398           2 :         h.Recordf("%s // %v", o, err)
     399           2 : }
     400             : 
     401           2 : func (o *rangeKeyUnsetOp) String() string {
     402           2 :         return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
     403           2 :                 o.writerID, o.start, o.end, o.suffix)
     404           2 : }
     405             : 
     406           2 : func (o *rangeKeyUnsetOp) receiver() objID      { return o.writerID }
     407           2 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
     408             : 
     409             : // newBatchOp models a Write.NewBatch operation.
     410             : type newBatchOp struct {
     411             :         dbID    objID
     412             :         batchID objID
     413             : }
     414             : 
     415           2 : func (o *newBatchOp) run(t *test, h historyRecorder) {
     416           2 :         b := t.getDB(o.dbID).NewBatch()
     417           2 :         t.setBatch(o.batchID, b)
     418           2 :         h.Recordf("%s", o)
     419           2 : }
     420             : 
     421           2 : func (o *newBatchOp) String() string  { return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID) }
     422           2 : func (o *newBatchOp) receiver() objID { return o.dbID }
     423           2 : func (o *newBatchOp) syncObjs() objIDSlice {
     424           2 :         // NewBatch should not be concurrent with operations that interact with that
     425           2 :         // same batch.
     426           2 :         return []objID{o.batchID}
     427           2 : }
     428             : 
     429             : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
     430             : type newIndexedBatchOp struct {
     431             :         dbID    objID
     432             :         batchID objID
     433             : }
     434             : 
     435           2 : func (o *newIndexedBatchOp) run(t *test, h historyRecorder) {
     436           2 :         b := t.getDB(o.dbID).NewIndexedBatch()
     437           2 :         t.setBatch(o.batchID, b)
     438           2 :         h.Recordf("%s", o)
     439           2 : }
     440             : 
     441           2 : func (o *newIndexedBatchOp) String() string {
     442           2 :         return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
     443           2 : }
     444           2 : func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
     445           2 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
     446           2 :         // NewIndexedBatch should not be concurrent with operations that interact
     447           2 :         // with that same batch.
     448           2 :         return []objID{o.batchID}
     449           2 : }
     450             : 
     451             : // batchCommitOp models a Batch.Commit operation.
     452             : type batchCommitOp struct {
     453             :         dbID    objID
     454             :         batchID objID
     455             : }
     456             : 
     457           2 : func (o *batchCommitOp) run(t *test, h historyRecorder) {
     458           2 :         b := t.getBatch(o.batchID)
     459           2 :         err := b.Commit(t.writeOpts)
     460           2 :         h.Recordf("%s // %v", o, err)
     461           2 : }
     462             : 
     463           2 : func (o *batchCommitOp) String() string  { return fmt.Sprintf("%s.Commit()", o.batchID) }
     464           2 : func (o *batchCommitOp) receiver() objID { return o.batchID }
     465           2 : func (o *batchCommitOp) syncObjs() objIDSlice {
     466           2 :         // Synchronize on the database so that NewIters wait for the commit.
     467           2 :         return []objID{o.dbID}
     468           2 : }
     469             : 
     470             : // ingestOp models a DB.Ingest operation.
     471             : type ingestOp struct {
     472             :         dbID     objID
     473             :         batchIDs []objID
     474             : 
     475             :         derivedDBIDs []objID
     476             : }
     477             : 
     478           2 : func (o *ingestOp) run(t *test, h historyRecorder) {
     479           2 :         // We can only use apply as an alternative for ingestion if we are ingesting
     480           2 :         // a single batch. If we are ingesting multiple batches, the batches may
     481           2 :         // overlap which would cause ingestion to fail but apply would succeed.
     482           2 :         if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
     483           1 :                 id := o.batchIDs[0]
     484           1 :                 b := t.getBatch(id)
     485           1 :                 iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     486           1 :                 db := t.getDB(o.dbID)
     487           1 :                 c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
     488           1 :                 if err == nil {
     489           1 :                         err = db.Apply(c, t.writeOpts)
     490           1 :                 }
     491           1 :                 _ = b.Close()
     492           1 :                 _ = c.Close()
     493           1 :                 t.clearObj(id)
     494           1 :                 h.Recordf("%s // %v", o, err)
     495           1 :                 return
     496             :         }
     497             : 
     498           2 :         var paths []string
     499           2 :         var err error
     500           2 :         for i, id := range o.batchIDs {
     501           2 :                 b := t.getBatch(id)
     502           2 :                 t.clearObj(id)
     503           2 :                 path, err2 := o.build(t, h, b, i)
     504           2 :                 if err2 != nil {
     505           0 :                         h.Recordf("Build(%s) // %v", id, err2)
     506           0 :                 }
     507           2 :                 err = firstError(err, err2)
     508           2 :                 if err2 == nil {
     509           2 :                         paths = append(paths, path)
     510           2 :                 }
     511           2 :                 err = firstError(err, b.Close())
     512             :         }
     513             : 
     514           2 :         err = firstError(err, withRetries(func() error {
     515           2 :                 return t.getDB(o.dbID).Ingest(paths)
     516           2 :         }))
     517             : 
     518           2 :         h.Recordf("%s // %v", o, err)
     519             : }
     520             : 
     521           2 : func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
     522           2 :         path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", o.dbID.slot(), i))
     523           2 :         f, err := t.opts.FS.Create(path)
     524           2 :         if err != nil {
     525           0 :                 return "", err
     526           0 :         }
     527           2 :         db := t.getDB(o.dbID)
     528           2 : 
     529           2 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     530           2 :         defer closeIters(iter, rangeDelIter, rangeKeyIter)
     531           2 : 
     532           2 :         equal := t.opts.Comparer.Equal
     533           2 :         tableFormat := db.FormatMajorVersion().MaxTableFormat()
     534           2 :         w := sstable.NewWriter(
     535           2 :                 objstorageprovider.NewFileWritable(f),
     536           2 :                 t.opts.MakeWriterOptions(0, tableFormat),
     537           2 :         )
     538           2 : 
     539           2 :         var lastUserKey []byte
     540           2 :         for key, value := iter.First(); key != nil; key, value = iter.Next() {
     541           2 :                 // Ignore duplicate keys.
     542           2 :                 if equal(lastUserKey, key.UserKey) {
     543           2 :                         continue
     544             :                 }
     545             :                 // NB: We don't have to copy the key or value since we're reading from a
     546             :                 // batch which doesn't do prefix compression.
     547           2 :                 lastUserKey = key.UserKey
     548           2 : 
     549           2 :                 key.SetSeqNum(base.SeqNumZero)
     550           2 :                 // It's possible that we wrote the key on a batch from a db that supported
     551           2 :                 // DeleteSized, but are now ingesting into a db that does not. Detect
     552           2 :                 // this case and translate the key to an InternalKeyKindDelete.
     553           2 :                 if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(o.dbID, pebble.FormatDeleteSizedAndObsolete) {
     554           1 :                         value = pebble.LazyValue{}
     555           1 :                         key.SetKind(pebble.InternalKeyKindDelete)
     556           1 :                 }
     557           2 :                 if err := w.Add(*key, value.InPlaceValue()); err != nil {
     558           0 :                         return "", err
     559           0 :                 }
     560             :         }
     561           2 :         if err := iter.Close(); err != nil {
     562           0 :                 return "", err
     563           0 :         }
     564           2 :         iter = nil
     565           2 : 
     566           2 :         if rangeDelIter != nil {
     567           2 :                 // NB: The range tombstones have already been fragmented by the Batch.
     568           2 :                 for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
     569           2 :                         // NB: We don't have to copy the key or value since we're reading from a
     570           2 :                         // batch which doesn't do prefix compression.
     571           2 :                         if err := w.DeleteRange(t.Start, t.End); err != nil {
     572           0 :                                 return "", err
     573           0 :                         }
     574             :                 }
     575           2 :                 if err := rangeDelIter.Close(); err != nil {
     576           0 :                         return "", err
     577           0 :                 }
     578           2 :                 rangeDelIter = nil
     579             :         }
     580             : 
     581           2 :         if rangeKeyIter != nil {
     582           2 :                 for span := rangeKeyIter.First(); span != nil; span = rangeKeyIter.Next() {
     583           2 :                         // Coalesce the keys of this span and then zero the sequence
     584           2 :                         // numbers. This is necessary in order to make the range keys within
     585           2 :                         // the ingested sstable internally consistent at the sequence number
     586           2 :                         // it's ingested at. The individual keys within a batch are
     587           2 :                         // committed at unique sequence numbers, whereas all the keys of an
     588           2 :                         // ingested sstable are given the same sequence number. A span
     589           2 :                         // contaning keys that both set and unset the same suffix at the
     590           2 :                         // same sequence number is nonsensical, so we "coalesce" or collapse
     591           2 :                         // the keys.
     592           2 :                         collapsed := keyspan.Span{
     593           2 :                                 Start: span.Start,
     594           2 :                                 End:   span.End,
     595           2 :                                 Keys:  make([]keyspan.Key, 0, len(span.Keys)),
     596           2 :                         }
     597           2 :                         err = rangekey.Coalesce(t.opts.Comparer.Compare, equal, span.Keys, &collapsed.Keys)
     598           2 :                         if err != nil {
     599           0 :                                 return "", err
     600           0 :                         }
     601           2 :                         for i := range collapsed.Keys {
     602           2 :                                 collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
     603           2 :                         }
     604           2 :                         keyspan.SortKeysByTrailer(&collapsed.Keys)
     605           2 :                         if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
     606           0 :                                 return "", err
     607           0 :                         }
     608             :                 }
     609           2 :                 if err := rangeKeyIter.Error(); err != nil {
     610           0 :                         return "", err
     611           0 :                 }
     612           2 :                 if err := rangeKeyIter.Close(); err != nil {
     613           0 :                         return "", err
     614           0 :                 }
     615           2 :                 rangeKeyIter = nil
     616             :         }
     617             : 
     618           2 :         if err := w.Close(); err != nil {
     619           0 :                 return "", err
     620           0 :         }
     621           2 :         return path, nil
     622             : }
     623             : 
     624           2 : func (o *ingestOp) receiver() objID { return o.dbID }
     625           2 : func (o *ingestOp) syncObjs() objIDSlice {
     626           2 :         // Ingest should not be concurrent with mutating the batches that will be
     627           2 :         // ingested as sstables.
     628           2 :         objs := make([]objID, 0, len(o.batchIDs)+1)
     629           2 :         objs = append(objs, o.batchIDs...)
     630           2 :         addedDBs := make(map[objID]struct{})
     631           2 :         for i := range o.derivedDBIDs {
     632           2 :                 _, ok := addedDBs[o.derivedDBIDs[i]]
     633           2 :                 if !ok && o.derivedDBIDs[i] != o.dbID {
     634           1 :                         objs = append(objs, o.derivedDBIDs[i])
     635           1 :                         addedDBs[o.derivedDBIDs[i]] = struct{}{}
     636           1 :                 }
     637             :         }
     638           2 :         return objs
     639             : }
     640             : 
     641             : func closeIters(
     642             :         pointIter base.InternalIterator,
     643             :         rangeDelIter keyspan.FragmentIterator,
     644             :         rangeKeyIter keyspan.FragmentIterator,
     645           2 : ) {
     646           2 :         if pointIter != nil {
     647           2 :                 pointIter.Close()
     648           2 :         }
     649           2 :         if rangeDelIter != nil {
     650           2 :                 rangeDelIter.Close()
     651           2 :         }
     652           2 :         if rangeKeyIter != nil {
     653           2 :                 rangeKeyIter.Close()
     654           2 :         }
     655             : }
     656             : 
     657             : // collapseBatch collapses the mutations in a batch to be equivalent to an
     658             : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
     659             : // so that only the latest update is performed. All range deletions are
     660             : // performed first in the batch to match the semantics of ingestion where a
     661             : // range deletion does not delete a point record contained in the sstable.
     662             : func (o *ingestOp) collapseBatch(
     663             :         t *test,
     664             :         db *pebble.DB,
     665             :         pointIter base.InternalIterator,
     666             :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     667             :         b *pebble.Batch,
     668           1 : ) (*pebble.Batch, error) {
     669           1 :         defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
     670           1 :         equal := t.opts.Comparer.Equal
     671           1 :         collapsed := db.NewBatch()
     672           1 : 
     673           1 :         if rangeDelIter != nil {
     674           1 :                 // NB: The range tombstones have already been fragmented by the Batch.
     675           1 :                 for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
     676           1 :                         // NB: We don't have to copy the key or value since we're reading from a
     677           1 :                         // batch which doesn't do prefix compression.
     678           1 :                         if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
     679           0 :                                 return nil, err
     680           0 :                         }
     681             :                 }
     682           1 :                 if err := rangeDelIter.Close(); err != nil {
     683           0 :                         return nil, err
     684           0 :                 }
     685           1 :                 rangeDelIter = nil
     686             :         }
     687             : 
     688           1 :         if pointIter != nil {
     689           1 :                 var lastUserKey []byte
     690           1 :                 for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
     691           1 :                         // Ignore duplicate keys.
     692           1 :                         //
     693           1 :                         // Note: this is necessary due to MERGE keys, otherwise it would be
     694           1 :                         // fine to include all the keys in the batch and let the normal
     695           1 :                         // sequence number precedence determine which of the keys "wins".
     696           1 :                         // But the code to build the ingested sstable will only keep the
     697           1 :                         // most recent internal key and will not merge across internal keys.
     698           1 :                         if equal(lastUserKey, key.UserKey) {
     699           1 :                                 continue
     700             :                         }
     701             :                         // NB: We don't have to copy the key or value since we're reading from a
     702             :                         // batch which doesn't do prefix compression.
     703           1 :                         lastUserKey = key.UserKey
     704           1 : 
     705           1 :                         var err error
     706           1 :                         switch key.Kind() {
     707           1 :                         case pebble.InternalKeyKindDelete:
     708           1 :                                 err = collapsed.Delete(key.UserKey, nil)
     709           1 :                         case pebble.InternalKeyKindDeleteSized:
     710           1 :                                 v, _ := binary.Uvarint(value.InPlaceValue())
     711           1 :                                 // Batch.DeleteSized takes just the length of the value being
     712           1 :                                 // deleted and adds the key's length to derive the overall entry
     713           1 :                                 // size of the value being deleted. This has already been done
     714           1 :                                 // to the key we're reading from the batch, so we must subtract
     715           1 :                                 // the key length from the encoded value before calling
     716           1 :                                 // collapsed.DeleteSized, which will again add the key length
     717           1 :                                 // before encoding.
     718           1 :                                 err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
     719           1 :                         case pebble.InternalKeyKindSingleDelete:
     720           1 :                                 err = collapsed.SingleDelete(key.UserKey, nil)
     721           1 :                         case pebble.InternalKeyKindSet:
     722           1 :                                 err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
     723           1 :                         case pebble.InternalKeyKindMerge:
     724           1 :                                 err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
     725           0 :                         case pebble.InternalKeyKindLogData:
     726           0 :                                 err = collapsed.LogData(key.UserKey, nil)
     727           0 :                         default:
     728           0 :                                 err = errors.Errorf("unknown batch record kind: %d", key.Kind())
     729             :                         }
     730           1 :                         if err != nil {
     731           0 :                                 return nil, err
     732           0 :                         }
     733             :                 }
     734           1 :                 if err := pointIter.Close(); err != nil {
     735           0 :                         return nil, err
     736           0 :                 }
     737           1 :                 pointIter = nil
     738             :         }
     739             : 
     740             :         // There's no equivalent of a MERGE operator for range keys, so there's no
     741             :         // need to collapse the range keys here. Rather than reading the range keys
     742             :         // from `rangeKeyIter`, which will already be fragmented, read the range
     743             :         // keys from the batch and copy them verbatim. This marginally improves our
     744             :         // test coverage over the alternative approach of pre-fragmenting and
     745             :         // pre-coalescing before writing to the batch.
     746             :         //
     747             :         // The `rangeKeyIter` is used only to determine if there are any range keys
     748             :         // in the batch at all, and only because we already have it handy from
     749             :         // private.BatchSort.
     750           1 :         if rangeKeyIter != nil {
     751           1 :                 for r := b.Reader(); ; {
     752           1 :                         kind, key, value, ok := r.Next()
     753           1 :                         if !ok {
     754           1 :                                 break
     755           1 :                         } else if !rangekey.IsRangeKey(kind) {
     756           1 :                                 continue
     757             :                         }
     758           1 :                         ik := base.MakeInternalKey(key, 0, kind)
     759           1 :                         if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
     760           0 :                                 return nil, err
     761           0 :                         }
     762             :                 }
     763           1 :                 if err := rangeKeyIter.Close(); err != nil {
     764           0 :                         return nil, err
     765           0 :                 }
     766           1 :                 rangeKeyIter = nil
     767             :         }
     768             : 
     769           1 :         return collapsed, nil
     770             : }
     771             : 
     772           2 : func (o *ingestOp) String() string {
     773           2 :         var buf strings.Builder
     774           2 :         buf.WriteString(o.dbID.String())
     775           2 :         buf.WriteString(".Ingest(")
     776           2 :         for i, id := range o.batchIDs {
     777           2 :                 if i > 0 {
     778           2 :                         buf.WriteString(", ")
     779           2 :                 }
     780           2 :                 buf.WriteString(id.String())
     781             :         }
     782           2 :         buf.WriteString(")")
     783           2 :         return buf.String()
     784             : }
     785             : 
     786             : // getOp models a Reader.Get operation.
     787             : type getOp struct {
     788             :         readerID    objID
     789             :         key         []byte
     790             :         derivedDBID objID
     791             : }
     792             : 
     793           2 : func (o *getOp) run(t *test, h historyRecorder) {
     794           2 :         r := t.getReader(o.readerID)
     795           2 :         var val []byte
     796           2 :         var closer io.Closer
     797           2 :         err := withRetries(func() (err error) {
     798           2 :                 val, closer, err = r.Get(o.key)
     799           2 :                 return err
     800           2 :         })
     801           2 :         h.Recordf("%s // [%q] %v", o, val, err)
     802           2 :         if closer != nil {
     803           2 :                 closer.Close()
     804           2 :         }
     805             : }
     806             : 
     807           2 : func (o *getOp) String() string  { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
     808           2 : func (o *getOp) receiver() objID { return o.readerID }
     809           2 : func (o *getOp) syncObjs() objIDSlice {
     810           2 :         if o.readerID.tag() == dbTag {
     811           2 :                 return nil
     812           2 :         }
     813             :         // batch.Get reads through to the current database state.
     814           2 :         if o.derivedDBID != 0 {
     815           2 :                 return []objID{o.derivedDBID}
     816           2 :         }
     817           0 :         return nil
     818             : }
     819             : 
     820             : // newIterOp models a Reader.NewIter operation.
     821             : type newIterOp struct {
     822             :         readerID objID
     823             :         iterID   objID
     824             :         iterOpts
     825             :         derivedDBID objID
     826             : }
     827             : 
     828           2 : func (o *newIterOp) run(t *test, h historyRecorder) {
     829           2 :         r := t.getReader(o.readerID)
     830           2 :         opts := iterOptions(o.iterOpts)
     831           2 : 
     832           2 :         var i *pebble.Iterator
     833           2 :         for {
     834           2 :                 i, _ = r.NewIter(opts)
     835           2 :                 if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
     836           2 :                         break
     837             :                 }
     838             :                 // close this iter and retry NewIter
     839           0 :                 _ = i.Close()
     840             :         }
     841           2 :         t.setIter(o.iterID, i)
     842           2 : 
     843           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     844           2 :         // the user-provided bounds.
     845           2 :         if opts != nil {
     846           2 :                 rand.Read(opts.LowerBound[:])
     847           2 :                 rand.Read(opts.UpperBound[:])
     848           2 :         }
     849           2 :         h.Recordf("%s // %v", o, i.Error())
     850             : }
     851             : 
     852           2 : func (o *newIterOp) String() string {
     853           2 :         return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     854           2 :                 o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     855           2 : }
     856             : 
     857           2 : func (o *newIterOp) receiver() objID { return o.readerID }
     858           2 : func (o *newIterOp) syncObjs() objIDSlice {
     859           2 :         // Prevent o.iterID ops from running before it exists.
     860           2 :         objs := []objID{o.iterID}
     861           2 :         // If reading through a batch or snapshot, the new iterator will also observe database
     862           2 :         // state, and we must synchronize on the database state for a consistent
     863           2 :         // view.
     864           2 :         if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
     865           2 :                 objs = append(objs, o.derivedDBID)
     866           2 :         }
     867           2 :         return objs
     868             : }
     869             : 
     870             : // newIterUsingCloneOp models a Iterator.Clone operation.
     871             : type newIterUsingCloneOp struct {
     872             :         existingIterID objID
     873             :         iterID         objID
     874             :         refreshBatch   bool
     875             :         iterOpts
     876             : 
     877             :         // derivedReaderID is the ID of the underlying reader that backs both the
     878             :         // existing iterator and the new iterator. The derivedReaderID is NOT
     879             :         // serialized by String and is derived from other operations during parse.
     880             :         derivedReaderID objID
     881             : }
     882             : 
     883           2 : func (o *newIterUsingCloneOp) run(t *test, h historyRecorder) {
     884           2 :         iter := t.getIter(o.existingIterID)
     885           2 :         cloneOpts := pebble.CloneOptions{
     886           2 :                 IterOptions:      iterOptions(o.iterOpts),
     887           2 :                 RefreshBatchView: o.refreshBatch,
     888           2 :         }
     889           2 :         i, err := iter.iter.Clone(cloneOpts)
     890           2 :         if err != nil {
     891           0 :                 panic(err)
     892             :         }
     893           2 :         t.setIter(o.iterID, i)
     894           2 :         h.Recordf("%s // %v", o, i.Error())
     895             : }
     896             : 
     897           2 : func (o *newIterUsingCloneOp) String() string {
     898           2 :         return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     899           2 :                 o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
     900           2 :                 o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     901           2 : }
     902             : 
     903           2 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
     904             : 
     905           2 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
     906           2 :         objIDs := []objID{o.iterID}
     907           2 :         // If the underlying reader is a batch, we must synchronize with the batch.
     908           2 :         // If refreshBatch=true, synchronizing is necessary to observe all the
     909           2 :         // mutations up to until this op and no more. Even when refreshBatch=false,
     910           2 :         // we must synchronize because iterator construction may access state cached
     911           2 :         // on the indexed batch to avoid refragmenting range tombstones or range
     912           2 :         // keys.
     913           2 :         if o.derivedReaderID.tag() == batchTag {
     914           0 :                 objIDs = append(objIDs, o.derivedReaderID)
     915           0 :         }
     916           2 :         return objIDs
     917             : }
     918             : 
     919             : // iterSetBoundsOp models an Iterator.SetBounds operation.
     920             : type iterSetBoundsOp struct {
     921             :         iterID objID
     922             :         lower  []byte
     923             :         upper  []byte
     924             : }
     925             : 
     926           2 : func (o *iterSetBoundsOp) run(t *test, h historyRecorder) {
     927           2 :         i := t.getIter(o.iterID)
     928           2 :         var lower, upper []byte
     929           2 :         if o.lower != nil {
     930           2 :                 lower = append(lower, o.lower...)
     931           2 :         }
     932           2 :         if o.upper != nil {
     933           2 :                 upper = append(upper, o.upper...)
     934           2 :         }
     935           2 :         i.SetBounds(lower, upper)
     936           2 : 
     937           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     938           2 :         // the user-provided bounds.
     939           2 :         rand.Read(lower[:])
     940           2 :         rand.Read(upper[:])
     941           2 : 
     942           2 :         h.Recordf("%s // %v", o, i.Error())
     943             : }
     944             : 
     945           2 : func (o *iterSetBoundsOp) String() string {
     946           2 :         return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
     947           2 : }
     948             : 
     949           2 : func (o *iterSetBoundsOp) receiver() objID      { return o.iterID }
     950           2 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
     951             : 
     952             : // iterSetOptionsOp models an Iterator.SetOptions operation.
     953             : type iterSetOptionsOp struct {
     954             :         iterID objID
     955             :         iterOpts
     956             : 
     957             :         // derivedReaderID is the ID of the underlying reader that backs the
     958             :         // iterator. The derivedReaderID is NOT serialized by String and is derived
     959             :         // from other operations during parse.
     960             :         derivedReaderID objID
     961             : }
     962             : 
     963           2 : func (o *iterSetOptionsOp) run(t *test, h historyRecorder) {
     964           2 :         i := t.getIter(o.iterID)
     965           2 : 
     966           2 :         opts := iterOptions(o.iterOpts)
     967           2 :         if opts == nil {
     968           1 :                 opts = &pebble.IterOptions{}
     969           1 :         }
     970           2 :         i.SetOptions(opts)
     971           2 : 
     972           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     973           2 :         // the user-provided bounds.
     974           2 :         rand.Read(opts.LowerBound[:])
     975           2 :         rand.Read(opts.UpperBound[:])
     976           2 : 
     977           2 :         h.Recordf("%s // %v", o, i.Error())
     978             : }
     979             : 
     980           2 : func (o *iterSetOptionsOp) String() string {
     981           2 :         return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     982           2 :                 o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     983           2 : }
     984             : 
     985           2 : func iterOptions(o iterOpts) *pebble.IterOptions {
     986           2 :         if o.IsZero() {
     987           2 :                 return nil
     988           2 :         }
     989           2 :         var lower, upper []byte
     990           2 :         if o.lower != nil {
     991           2 :                 lower = append(lower, o.lower...)
     992           2 :         }
     993           2 :         if o.upper != nil {
     994           2 :                 upper = append(upper, o.upper...)
     995           2 :         }
     996           2 :         opts := &pebble.IterOptions{
     997           2 :                 LowerBound: lower,
     998           2 :                 UpperBound: upper,
     999           2 :                 KeyTypes:   pebble.IterKeyType(o.keyTypes),
    1000           2 :                 RangeKeyMasking: pebble.RangeKeyMasking{
    1001           2 :                         Suffix: o.maskSuffix,
    1002           2 :                 },
    1003           2 :                 UseL6Filters: o.useL6Filters,
    1004           2 :         }
    1005           2 :         if opts.RangeKeyMasking.Suffix != nil {
    1006           2 :                 opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
    1007           2 :                         return sstable.NewTestKeysMaskingFilter()
    1008           2 :                 }
    1009             :         }
    1010           2 :         if o.filterMax > 0 {
    1011           2 :                 opts.PointKeyFilters = []pebble.BlockPropertyFilter{
    1012           2 :                         sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
    1013           2 :                 }
    1014           2 :                 // Enforce the timestamp bounds in SkipPoint, so that the iterator never
    1015           2 :                 // returns a key outside the filterMin, filterMax bounds. This provides
    1016           2 :                 // deterministic iteration.
    1017           2 :                 opts.SkipPoint = func(k []byte) (skip bool) {
    1018           2 :                         n := testkeys.Comparer.Split(k)
    1019           2 :                         if n == len(k) {
    1020           2 :                                 // No suffix, don't skip it.
    1021           2 :                                 return false
    1022           2 :                         }
    1023           2 :                         v, err := testkeys.ParseSuffix(k[n:])
    1024           2 :                         if err != nil {
    1025           0 :                                 panic(err)
    1026             :                         }
    1027           2 :                         ts := uint64(v)
    1028           2 :                         return ts < o.filterMin || ts >= o.filterMax
    1029             :                 }
    1030             :         }
    1031           2 :         return opts
    1032             : }
    1033             : 
    1034           2 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
    1035             : 
    1036           2 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
    1037           2 :         if o.derivedReaderID.tag() == batchTag {
    1038           0 :                 // If the underlying reader is a batch, we must synchronize with the
    1039           0 :                 // batch so that we observe all the mutations up until this operation
    1040           0 :                 // and no more.
    1041           0 :                 return []objID{o.derivedReaderID}
    1042           0 :         }
    1043           2 :         return nil
    1044             : }
    1045             : 
    1046             : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
    1047             : type iterSeekGEOp struct {
    1048             :         iterID objID
    1049             :         key    []byte
    1050             :         limit  []byte
    1051             : 
    1052             :         derivedReaderID objID
    1053             : }
    1054             : 
    1055           2 : func iteratorPos(i *retryableIter) string {
    1056           2 :         var buf bytes.Buffer
    1057           2 :         fmt.Fprintf(&buf, "%q", i.Key())
    1058           2 :         hasPoint, hasRange := i.HasPointAndRange()
    1059           2 :         if hasPoint {
    1060           2 :                 fmt.Fprintf(&buf, ",%q", i.Value())
    1061           2 :         } else {
    1062           2 :                 fmt.Fprint(&buf, ",<no point>")
    1063           2 :         }
    1064           2 :         if hasRange {
    1065           2 :                 start, end := i.RangeBounds()
    1066           2 :                 fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
    1067           2 :                 for i, rk := range i.RangeKeys() {
    1068           2 :                         if i > 0 {
    1069           2 :                                 fmt.Fprint(&buf, ",")
    1070           2 :                         }
    1071           2 :                         fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
    1072             :                 }
    1073           2 :                 fmt.Fprint(&buf, "}")
    1074           2 :         } else {
    1075           2 :                 fmt.Fprint(&buf, ",<no range>")
    1076           2 :         }
    1077           2 :         if i.RangeKeyChanged() {
    1078           2 :                 fmt.Fprint(&buf, "*")
    1079           2 :         }
    1080           2 :         return buf.String()
    1081             : }
    1082             : 
    1083           2 : func validBoolToStr(valid bool) string {
    1084           2 :         return fmt.Sprintf("%t", valid)
    1085           2 : }
    1086             : 
    1087           2 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
    1088           2 :         // We can't distinguish between IterExhausted and IterAtLimit in a
    1089           2 :         // deterministic manner.
    1090           2 :         switch validity {
    1091           2 :         case pebble.IterExhausted, pebble.IterAtLimit:
    1092           2 :                 return false, "invalid"
    1093           2 :         case pebble.IterValid:
    1094           2 :                 return true, "valid"
    1095           0 :         default:
    1096           0 :                 panic("unknown validity")
    1097             :         }
    1098             : }
    1099             : 
    1100           2 : func (o *iterSeekGEOp) run(t *test, h historyRecorder) {
    1101           2 :         i := t.getIter(o.iterID)
    1102           2 :         var valid bool
    1103           2 :         var validStr string
    1104           2 :         if o.limit == nil {
    1105           2 :                 valid = i.SeekGE(o.key)
    1106           2 :                 validStr = validBoolToStr(valid)
    1107           2 :         } else {
    1108           2 :                 valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
    1109           2 :         }
    1110           2 :         if valid {
    1111           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1112           2 :         } else {
    1113           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1114           2 :         }
    1115             : }
    1116             : 
    1117           2 : func (o *iterSeekGEOp) String() string {
    1118           2 :         return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
    1119           2 : }
    1120           2 : func (o *iterSeekGEOp) receiver() objID      { return o.iterID }
    1121           2 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1122             : 
    1123           2 : func onlyBatchIDs(ids ...objID) objIDSlice {
    1124           2 :         var ret objIDSlice
    1125           2 :         for _, id := range ids {
    1126           2 :                 if id.tag() == batchTag {
    1127           2 :                         ret = append(ret, id)
    1128           2 :                 }
    1129             :         }
    1130           2 :         return ret
    1131             : }
    1132             : 
    1133             : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
    1134             : type iterSeekPrefixGEOp struct {
    1135             :         iterID objID
    1136             :         key    []byte
    1137             : 
    1138             :         derivedReaderID objID
    1139             : }
    1140             : 
    1141           2 : func (o *iterSeekPrefixGEOp) run(t *test, h historyRecorder) {
    1142           2 :         i := t.getIter(o.iterID)
    1143           2 :         valid := i.SeekPrefixGE(o.key)
    1144           2 :         if valid {
    1145           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1146           2 :         } else {
    1147           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1148           2 :         }
    1149             : }
    1150             : 
    1151           2 : func (o *iterSeekPrefixGEOp) String() string {
    1152           2 :         return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
    1153           2 : }
    1154           2 : func (o *iterSeekPrefixGEOp) receiver() objID      { return o.iterID }
    1155           2 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1156             : 
    1157             : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
    1158             : type iterSeekLTOp struct {
    1159             :         iterID objID
    1160             :         key    []byte
    1161             :         limit  []byte
    1162             : 
    1163             :         derivedReaderID objID
    1164             : }
    1165             : 
    1166           2 : func (o *iterSeekLTOp) run(t *test, h historyRecorder) {
    1167           2 :         i := t.getIter(o.iterID)
    1168           2 :         var valid bool
    1169           2 :         var validStr string
    1170           2 :         if o.limit == nil {
    1171           2 :                 valid = i.SeekLT(o.key)
    1172           2 :                 validStr = validBoolToStr(valid)
    1173           2 :         } else {
    1174           2 :                 valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
    1175           2 :         }
    1176           2 :         if valid {
    1177           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1178           2 :         } else {
    1179           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1180           2 :         }
    1181             : }
    1182             : 
    1183           2 : func (o *iterSeekLTOp) String() string {
    1184           2 :         return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
    1185           2 : }
    1186             : 
    1187           2 : func (o *iterSeekLTOp) receiver() objID      { return o.iterID }
    1188           2 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1189             : 
    1190             : // iterFirstOp models an Iterator.First operation.
    1191             : type iterFirstOp struct {
    1192             :         iterID objID
    1193             : 
    1194             :         derivedReaderID objID
    1195             : }
    1196             : 
    1197           2 : func (o *iterFirstOp) run(t *test, h historyRecorder) {
    1198           2 :         i := t.getIter(o.iterID)
    1199           2 :         valid := i.First()
    1200           2 :         if valid {
    1201           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1202           2 :         } else {
    1203           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1204           2 :         }
    1205             : }
    1206             : 
    1207           2 : func (o *iterFirstOp) String() string       { return fmt.Sprintf("%s.First()", o.iterID) }
    1208           2 : func (o *iterFirstOp) receiver() objID      { return o.iterID }
    1209           2 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1210             : 
    1211             : // iterLastOp models an Iterator.Last operation.
    1212             : type iterLastOp struct {
    1213             :         iterID objID
    1214             : 
    1215             :         derivedReaderID objID
    1216             : }
    1217             : 
    1218           2 : func (o *iterLastOp) run(t *test, h historyRecorder) {
    1219           2 :         i := t.getIter(o.iterID)
    1220           2 :         valid := i.Last()
    1221           2 :         if valid {
    1222           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1223           2 :         } else {
    1224           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1225           2 :         }
    1226             : }
    1227             : 
    1228           2 : func (o *iterLastOp) String() string       { return fmt.Sprintf("%s.Last()", o.iterID) }
    1229           2 : func (o *iterLastOp) receiver() objID      { return o.iterID }
    1230           2 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1231             : 
    1232             : // iterNextOp models an Iterator.Next[WithLimit] operation.
    1233             : type iterNextOp struct {
    1234             :         iterID objID
    1235             :         limit  []byte
    1236             : 
    1237             :         derivedReaderID objID
    1238             : }
    1239             : 
    1240           2 : func (o *iterNextOp) run(t *test, h historyRecorder) {
    1241           2 :         i := t.getIter(o.iterID)
    1242           2 :         var valid bool
    1243           2 :         var validStr string
    1244           2 :         if o.limit == nil {
    1245           2 :                 valid = i.Next()
    1246           2 :                 validStr = validBoolToStr(valid)
    1247           2 :         } else {
    1248           2 :                 valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
    1249           2 :         }
    1250           2 :         if valid {
    1251           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1252           2 :         } else {
    1253           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1254           2 :         }
    1255             : }
    1256             : 
    1257           2 : func (o *iterNextOp) String() string       { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
    1258           2 : func (o *iterNextOp) receiver() objID      { return o.iterID }
    1259           2 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1260             : 
    1261             : // iterNextPrefixOp models an Iterator.NextPrefix operation.
    1262             : type iterNextPrefixOp struct {
    1263             :         iterID objID
    1264             : 
    1265             :         derivedReaderID objID
    1266             : }
    1267             : 
    1268           2 : func (o *iterNextPrefixOp) run(t *test, h historyRecorder) {
    1269           2 :         i := t.getIter(o.iterID)
    1270           2 :         valid := i.NextPrefix()
    1271           2 :         validStr := validBoolToStr(valid)
    1272           2 :         if valid {
    1273           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1274           2 :         } else {
    1275           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1276           2 :         }
    1277             : }
    1278             : 
    1279           2 : func (o *iterNextPrefixOp) String() string       { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
    1280           2 : func (o *iterNextPrefixOp) receiver() objID      { return o.iterID }
    1281           2 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1282             : 
    1283             : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
    1284             : type iterPrevOp struct {
    1285             :         iterID objID
    1286             :         limit  []byte
    1287             : 
    1288             :         derivedReaderID objID
    1289             : }
    1290             : 
    1291           2 : func (o *iterPrevOp) run(t *test, h historyRecorder) {
    1292           2 :         i := t.getIter(o.iterID)
    1293           2 :         var valid bool
    1294           2 :         var validStr string
    1295           2 :         if o.limit == nil {
    1296           2 :                 valid = i.Prev()
    1297           2 :                 validStr = validBoolToStr(valid)
    1298           2 :         } else {
    1299           2 :                 valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
    1300           2 :         }
    1301           2 :         if valid {
    1302           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1303           2 :         } else {
    1304           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1305           2 :         }
    1306             : }
    1307             : 
    1308           2 : func (o *iterPrevOp) String() string       { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
    1309           2 : func (o *iterPrevOp) receiver() objID      { return o.iterID }
    1310           2 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1311             : 
    1312             : // newSnapshotOp models a DB.NewSnapshot operation.
    1313             : type newSnapshotOp struct {
    1314             :         dbID   objID
    1315             :         snapID objID
    1316             :         // If nonempty, this snapshot must not be used to read any keys outside of
    1317             :         // the provided bounds. This allows some implementations to use 'Eventually
    1318             :         // file-only snapshots,' which require bounds.
    1319             :         bounds []pebble.KeyRange
    1320             : }
    1321             : 
    1322           2 : func (o *newSnapshotOp) run(t *test, h historyRecorder) {
    1323           2 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
    1324           2 :         if len(t.dbs) > 1 || (len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1) {
    1325           1 :                 s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(o.bounds)
    1326           1 :                 t.setSnapshot(o.snapID, s)
    1327           2 :         } else {
    1328           2 :                 s := t.getDB(o.dbID).NewSnapshot()
    1329           2 :                 t.setSnapshot(o.snapID, s)
    1330           2 :         }
    1331           2 :         h.Recordf("%s", o)
    1332             : }
    1333             : 
    1334           2 : func (o *newSnapshotOp) String() string {
    1335           2 :         var buf bytes.Buffer
    1336           2 :         fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
    1337           2 :         for i := range o.bounds {
    1338           2 :                 if i > 0 {
    1339           2 :                         fmt.Fprint(&buf, ", ")
    1340           2 :                 }
    1341           2 :                 fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
    1342             :         }
    1343           2 :         fmt.Fprint(&buf, ")")
    1344           2 :         return buf.String()
    1345             : }
    1346           2 : func (o *newSnapshotOp) receiver() objID      { return o.dbID }
    1347           2 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
    1348             : 
    1349             : type dbRatchetFormatMajorVersionOp struct {
    1350             :         dbID objID
    1351             :         vers pebble.FormatMajorVersion
    1352             : }
    1353             : 
    1354           2 : func (o *dbRatchetFormatMajorVersionOp) run(t *test, h historyRecorder) {
    1355           2 :         var err error
    1356           2 :         // NB: We no-op the operation if we're already at or above the provided
    1357           2 :         // format major version. Different runs start at different format major
    1358           2 :         // versions, making the presence of an error and the error message itself
    1359           2 :         // non-deterministic if we attempt to upgrade to an older version.
    1360           2 :         //
    1361           2 :         //Regardless, subsequent operations should behave identically, which is what
    1362           2 :         //we're really aiming to test by including this format major version ratchet
    1363           2 :         //operation.
    1364           2 :         if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
    1365           2 :                 err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
    1366           2 :         }
    1367           2 :         h.Recordf("%s // %v", o, err)
    1368             : }
    1369             : 
    1370           2 : func (o *dbRatchetFormatMajorVersionOp) String() string {
    1371           2 :         return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
    1372           2 : }
    1373           2 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID      { return o.dbID }
    1374           2 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
    1375             : 
    1376             : type dbRestartOp struct {
    1377             :         dbID objID
    1378             : }
    1379             : 
    1380           2 : func (o *dbRestartOp) run(t *test, h historyRecorder) {
    1381           2 :         if err := t.restartDB(o.dbID); err != nil {
    1382           0 :                 h.Recordf("%s // %v", o, err)
    1383           0 :                 h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
    1384           2 :         } else {
    1385           2 :                 h.Recordf("%s", o)
    1386           2 :         }
    1387             : }
    1388             : 
    1389           2 : func (o *dbRestartOp) String() string       { return fmt.Sprintf("%s.Restart()", o.dbID) }
    1390           2 : func (o *dbRestartOp) receiver() objID      { return o.dbID }
    1391           2 : func (o *dbRestartOp) syncObjs() objIDSlice { return nil }
    1392             : 
    1393           1 : func formatOps(ops []op) string {
    1394           1 :         var buf strings.Builder
    1395           1 :         for _, op := range ops {
    1396           1 :                 fmt.Fprintf(&buf, "%s\n", op)
    1397           1 :         }
    1398           1 :         return buf.String()
    1399             : }
    1400             : 
    1401             : // replicateOp models an operation that could copy keys from one db to
    1402             : // another through either an IngestAndExcise, or an Ingest.
    1403             : type replicateOp struct {
    1404             :         source, dest objID
    1405             :         start, end   []byte
    1406             : }
    1407             : 
    1408             : func (r *replicateOp) runSharedReplicate(
    1409             :         t *test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
    1410           0 : ) {
    1411           0 :         var sharedSSTs []pebble.SharedSSTMeta
    1412           0 :         var err error
    1413           0 :         err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
    1414           0 :                 func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
    1415           0 :                         val, _, err := value.Value(nil)
    1416           0 :                         if err != nil {
    1417           0 :                                 panic(err)
    1418             :                         }
    1419           0 :                         return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
    1420             :                 },
    1421           0 :                 func(start, end []byte, seqNum uint64) error {
    1422           0 :                         return w.DeleteRange(start, end)
    1423           0 :                 },
    1424           0 :                 func(start, end []byte, keys []keyspan.Key) error {
    1425           0 :                         s := keyspan.Span{
    1426           0 :                                 Start:     start,
    1427           0 :                                 End:       end,
    1428           0 :                                 Keys:      keys,
    1429           0 :                                 KeysOrder: 0,
    1430           0 :                         }
    1431           0 :                         return rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
    1432           0 :                                 return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
    1433           0 :                         })
    1434             :                 },
    1435           0 :                 func(sst *pebble.SharedSSTMeta) error {
    1436           0 :                         sharedSSTs = append(sharedSSTs, *sst)
    1437           0 :                         return nil
    1438           0 :                 },
    1439             :         )
    1440           0 :         if err != nil {
    1441           0 :                 h.Recordf("%s // %v", r, err)
    1442           0 :                 return
    1443           0 :         }
    1444             : 
    1445           0 :         _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end})
    1446           0 :         h.Recordf("%s // %v", r, err)
    1447             : }
    1448             : 
    1449           1 : func (r *replicateOp) run(t *test, h historyRecorder) {
    1450           1 :         // Shared replication only works if shared storage is enabled.
    1451           1 :         useSharedIngest := t.testOpts.useSharedReplicate
    1452           1 :         if !t.testOpts.sharedStorageEnabled {
    1453           1 :                 useSharedIngest = false
    1454           1 :         }
    1455             : 
    1456           1 :         source := t.getDB(r.source)
    1457           1 :         dest := t.getDB(r.dest)
    1458           1 :         sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
    1459           1 :         f, err := t.opts.FS.Create(sstPath)
    1460           1 :         if err != nil {
    1461           0 :                 h.Recordf("%s // %v", r, err)
    1462           0 :                 return
    1463           0 :         }
    1464           1 :         w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.FormatMajorVersion().MaxTableFormat()))
    1465           1 : 
    1466           1 :         if useSharedIngest {
    1467           0 :                 r.runSharedReplicate(t, h, source, dest, w, sstPath)
    1468           0 :                 return
    1469           0 :         }
    1470             : 
    1471           1 :         iter, err := source.NewIter(&pebble.IterOptions{
    1472           1 :                 LowerBound: r.start,
    1473           1 :                 UpperBound: r.end,
    1474           1 :                 KeyTypes:   pebble.IterKeyTypePointsAndRanges,
    1475           1 :         })
    1476           1 :         if err != nil {
    1477           0 :                 panic(err)
    1478             :         }
    1479           1 :         defer iter.Close()
    1480           1 : 
    1481           1 :         // Write rangedels and rangekeydels for the range. This mimics the Excise
    1482           1 :         // that runSharedReplicate would do.
    1483           1 :         if err := w.DeleteRange(r.start, r.end); err != nil {
    1484           0 :                 panic(err)
    1485             :         }
    1486           1 :         if err := w.RangeKeyDelete(r.start, r.end); err != nil {
    1487           0 :                 panic(err)
    1488             :         }
    1489             : 
    1490           1 :         for ok := iter.SeekGE(r.start); ok && iter.Error() != nil; ok = iter.Next() {
    1491           0 :                 hasPoint, hasRange := iter.HasPointAndRange()
    1492           0 :                 if hasPoint {
    1493           0 :                         val, err := iter.ValueAndErr()
    1494           0 :                         if err != nil {
    1495           0 :                                 panic(err)
    1496             :                         }
    1497           0 :                         if err := w.Set(iter.Key(), val); err != nil {
    1498           0 :                                 panic(err)
    1499             :                         }
    1500             :                 }
    1501           0 :                 if hasRange && iter.RangeKeyChanged() {
    1502           0 :                         rangeKeys := iter.RangeKeys()
    1503           0 :                         rkStart, rkEnd := iter.RangeBounds()
    1504           0 :                         for i := range rangeKeys {
    1505           0 :                                 if err := w.RangeKeySet(rkStart, rkEnd, rangeKeys[i].Suffix, rangeKeys[i].Value); err != nil {
    1506           0 :                                         panic(err)
    1507             :                                 }
    1508             :                         }
    1509             :                 }
    1510             :         }
    1511           1 :         if err := w.Close(); err != nil {
    1512           0 :                 panic(err)
    1513             :         }
    1514             : 
    1515           1 :         err = dest.Ingest([]string{sstPath})
    1516           1 :         h.Recordf("%s // %v", r, err)
    1517             : }
    1518             : 
    1519           2 : func (r *replicateOp) String() string {
    1520           2 :         return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest, r.start, r.end)
    1521           2 : }
    1522             : 
    1523           1 : func (r *replicateOp) receiver() objID      { return r.source }
    1524           1 : func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }

Generated by: LCOV version 1.14