LCOV - code coverage report
Current view: top level - pebble/metamorphic - ops.go (source / functions) Hit Total Coverage
Test: 2023-09-08 08:15Z 5093058d - tests only.lcov Lines: 106 796 13.3 %
Date: 2023-09-08 08:15:46 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "crypto/rand"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "path/filepath"
      14             :         "strings"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/keyspan"
      20             :         "github.com/cockroachdb/pebble/internal/private"
      21             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      22             :         "github.com/cockroachdb/pebble/sstable"
      23             :         "github.com/cockroachdb/pebble/vfs"
      24             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      25             : )
      26             : 
      27             : // op defines the interface for a single operation, such as creating a batch,
      28             : // or advancing an iterator.
      29             : type op interface {
      30             :         String() string
      31             :         run(t *test, h historyRecorder)
      32             : 
      33             :         // receiver returns the object ID of the object the operation is performed
      34             :         // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
      35             :         // its receiver). Receivers are used for synchronization when running with
      36             :         // concurrency.
      37             :         receiver() objID
      38             : 
      39             :         // syncObjs returns an additional set of object IDs—excluding the
      40             :         // receiver—that the operation must synchronize with. At execution time,
      41             :         // the operation will run serially with respect to all other operations
      42             :         // that return these objects from their own syncObjs or receiver methods.
      43             :         syncObjs() objIDSlice
      44             : }
      45             : 
      46             : // initOp performs test initialization
      47             : type initOp struct {
      48             :         batchSlots    uint32
      49             :         iterSlots     uint32
      50             :         snapshotSlots uint32
      51             : }
      52             : 
      53           0 : func (o *initOp) run(t *test, h historyRecorder) {
      54           0 :         t.batches = make([]*pebble.Batch, o.batchSlots)
      55           0 :         t.iters = make([]*retryableIter, o.iterSlots)
      56           0 :         t.snapshots = make([]readerCloser, o.snapshotSlots)
      57           0 :         h.Recordf("%s", o)
      58           0 : }
      59             : 
      60           1 : func (o *initOp) String() string {
      61           1 :         return fmt.Sprintf("Init(%d /* batches */, %d /* iters */, %d /* snapshots */)",
      62           1 :                 o.batchSlots, o.iterSlots, o.snapshotSlots)
      63           1 : }
      64             : 
      65           0 : func (o *initOp) receiver() objID      { return dbObjID }
      66           0 : func (o *initOp) syncObjs() objIDSlice { return nil }
      67             : 
      68             : // applyOp models a Writer.Apply operation.
      69             : type applyOp struct {
      70             :         writerID objID
      71             :         batchID  objID
      72             : }
      73             : 
      74           0 : func (o *applyOp) run(t *test, h historyRecorder) {
      75           0 :         b := t.getBatch(o.batchID)
      76           0 :         w := t.getWriter(o.writerID)
      77           0 :         var err error
      78           0 :         if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
      79           0 :                 err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
      80           0 :                 if err == nil {
      81           0 :                         err = b.SyncWait()
      82           0 :                 }
      83           0 :         } else {
      84           0 :                 err = w.Apply(b, t.writeOpts)
      85           0 :         }
      86           0 :         h.Recordf("%s // %v", o, err)
      87             :         // batch will be closed by a closeOp which is guaranteed to be generated
      88             : }
      89             : 
      90           1 : func (o *applyOp) String() string  { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
      91           0 : func (o *applyOp) receiver() objID { return o.writerID }
      92           0 : func (o *applyOp) syncObjs() objIDSlice {
      93           0 :         // Apply should not be concurrent with operations that are mutating the
      94           0 :         // batch.
      95           0 :         return []objID{o.batchID}
      96           0 : }
      97             : 
      98             : // checkpointOp models a DB.Checkpoint operation.
      99             : type checkpointOp struct {
     100             :         // If non-empty, the checkpoint is restricted to these spans.
     101             :         spans []pebble.CheckpointSpan
     102             : }
     103             : 
     104           0 : func (o *checkpointOp) run(t *test, h historyRecorder) {
     105           0 :         // TODO(josh): db.Checkpoint does not work with shared storage yet.
     106           0 :         // It would be better to filter out ahead of calling run on the op,
     107           0 :         // by setting the weight that generator.go uses to zero, or similar.
     108           0 :         // But IIUC the ops are shared for ALL the metamorphic test runs, so
     109           0 :         // not sure how to do that easily:
     110           0 :         // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
     111           0 :         if t.testOpts.sharedStorageEnabled {
     112           0 :                 h.Recordf("%s // %v", o, nil)
     113           0 :                 return
     114           0 :         }
     115           0 :         var opts []pebble.CheckpointOption
     116           0 :         if len(o.spans) > 0 {
     117           0 :                 opts = append(opts, pebble.WithRestrictToSpans(o.spans))
     118           0 :         }
     119           0 :         err := withRetries(func() error {
     120           0 :                 return t.db.Checkpoint(o.dir(t.dir, h.op), opts...)
     121           0 :         })
     122           0 :         h.Recordf("%s // %v", o, err)
     123             : }
     124             : 
     125           0 : func (o *checkpointOp) dir(dataDir string, idx int) string {
     126           0 :         return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
     127           0 : }
     128             : 
     129           1 : func (o *checkpointOp) String() string {
     130           1 :         var spanStr bytes.Buffer
     131           1 :         for i, span := range o.spans {
     132           1 :                 if i > 0 {
     133           1 :                         spanStr.WriteString(",")
     134           1 :                 }
     135           1 :                 fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
     136             :         }
     137           1 :         return fmt.Sprintf("db.Checkpoint(%s)", spanStr.String())
     138             : }
     139             : 
     140           0 : func (o *checkpointOp) receiver() objID      { return dbObjID }
     141           0 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
     142             : 
     143             : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
     144             : type closeOp struct {
     145             :         objID objID
     146             : }
     147             : 
     148           0 : func (o *closeOp) run(t *test, h historyRecorder) {
     149           0 :         c := t.getCloser(o.objID)
     150           0 :         if o.objID.tag() == dbTag && t.opts.DisableWAL {
     151           0 :                 // Special case: If WAL is disabled, do a flush right before DB Close. This
     152           0 :                 // allows us to reuse this run's data directory as initial state for
     153           0 :                 // future runs without losing any mutations.
     154           0 :                 _ = t.db.Flush()
     155           0 :         }
     156           0 :         t.clearObj(o.objID)
     157           0 :         err := c.Close()
     158           0 :         h.Recordf("%s // %v", o, err)
     159             : }
     160             : 
     161           1 : func (o *closeOp) String() string  { return fmt.Sprintf("%s.Close()", o.objID) }
     162           0 : func (o *closeOp) receiver() objID { return o.objID }
     163           0 : func (o *closeOp) syncObjs() objIDSlice {
     164           0 :         // Synchronize on the database so that we don't close the database before
     165           0 :         // all its iterators, snapshots and batches are closed.
     166           0 :         // TODO(jackson): It would be nice to relax this so that Close calls can
     167           0 :         // execute in parallel.
     168           0 :         if o.objID == dbObjID {
     169           0 :                 return nil
     170           0 :         }
     171           0 :         return []objID{dbObjID}
     172             : }
     173             : 
     174             : // compactOp models a DB.Compact operation.
     175             : type compactOp struct {
     176             :         start       []byte
     177             :         end         []byte
     178             :         parallelize bool
     179             : }
     180             : 
     181           0 : func (o *compactOp) run(t *test, h historyRecorder) {
     182           0 :         err := withRetries(func() error {
     183           0 :                 return t.db.Compact(o.start, o.end, o.parallelize)
     184           0 :         })
     185           0 :         h.Recordf("%s // %v", o, err)
     186             : }
     187             : 
     188           1 : func (o *compactOp) String() string {
     189           1 :         return fmt.Sprintf("db.Compact(%q, %q, %t /* parallelize */)", o.start, o.end, o.parallelize)
     190           1 : }
     191             : 
     192           0 : func (o *compactOp) receiver() objID      { return dbObjID }
     193           0 : func (o *compactOp) syncObjs() objIDSlice { return nil }
     194             : 
     195             : // deleteOp models a Write.Delete operation.
     196             : type deleteOp struct {
     197             :         writerID objID
     198             :         key      []byte
     199             : }
     200             : 
     201           0 : func (o *deleteOp) run(t *test, h historyRecorder) {
     202           0 :         w := t.getWriter(o.writerID)
     203           0 :         var err error
     204           0 :         if t.testOpts.deleteSized && t.isFMV(pebble.ExperimentalFormatDeleteSizedAndObsolete) {
     205           0 :                 // Call DeleteSized with a deterministic size derived from the index.
     206           0 :                 // The size does not need to be accurate for correctness.
     207           0 :                 err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
     208           0 :         } else {
     209           0 :                 err = w.Delete(o.key, t.writeOpts)
     210           0 :         }
     211           0 :         h.Recordf("%s // %v", o, err)
     212             : }
     213             : 
     214           0 : func hashSize(index int) uint32 {
     215           0 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
     216           0 :         return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
     217           0 : }
     218             : 
     219           1 : func (o *deleteOp) String() string {
     220           1 :         return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
     221           1 : }
     222           0 : func (o *deleteOp) receiver() objID      { return o.writerID }
     223           0 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
     224             : 
     225             : // singleDeleteOp models a Write.SingleDelete operation.
     226             : type singleDeleteOp struct {
     227             :         writerID           objID
     228             :         key                []byte
     229             :         maybeReplaceDelete bool
     230             : }
     231             : 
     232           0 : func (o *singleDeleteOp) run(t *test, h historyRecorder) {
     233           0 :         w := t.getWriter(o.writerID)
     234           0 :         var err error
     235           0 :         if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
     236           0 :                 err = w.Delete(o.key, t.writeOpts)
     237           0 :         } else {
     238           0 :                 err = w.SingleDelete(o.key, t.writeOpts)
     239           0 :         }
     240             :         // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
     241             :         // write the former to the history log. The log line will indicate whether
     242             :         // or not the delete *could* have been replaced. The OPTIONS file should
     243             :         // also be consulted to determine what happened at runtime (i.e. by taking
     244             :         // the logical AND).
     245           0 :         h.Recordf("%s // %v", o, err)
     246             : }
     247             : 
     248           1 : func (o *singleDeleteOp) String() string {
     249           1 :         return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
     250           1 : }
     251             : 
     252           0 : func (o *singleDeleteOp) receiver() objID      { return o.writerID }
     253           0 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
     254             : 
     255             : // deleteRangeOp models a Write.DeleteRange operation.
     256             : type deleteRangeOp struct {
     257             :         writerID objID
     258             :         start    []byte
     259             :         end      []byte
     260             : }
     261             : 
     262           0 : func (o *deleteRangeOp) run(t *test, h historyRecorder) {
     263           0 :         w := t.getWriter(o.writerID)
     264           0 :         err := w.DeleteRange(o.start, o.end, t.writeOpts)
     265           0 :         h.Recordf("%s // %v", o, err)
     266           0 : }
     267             : 
     268           1 : func (o *deleteRangeOp) String() string {
     269           1 :         return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
     270           1 : }
     271             : 
     272           0 : func (o *deleteRangeOp) receiver() objID      { return o.writerID }
     273           0 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
     274             : 
     275             : // flushOp models a DB.Flush operation.
     276             : type flushOp struct {
     277             : }
     278             : 
     279           0 : func (o *flushOp) run(t *test, h historyRecorder) {
     280           0 :         err := t.db.Flush()
     281           0 :         h.Recordf("%s // %v", o, err)
     282           0 : }
     283             : 
     284           1 : func (o *flushOp) String() string       { return "db.Flush()" }
     285           0 : func (o *flushOp) receiver() objID      { return dbObjID }
     286           0 : func (o *flushOp) syncObjs() objIDSlice { return nil }
     287             : 
     288             : // mergeOp models a Write.Merge operation.
     289             : type mergeOp struct {
     290             :         writerID objID
     291             :         key      []byte
     292             :         value    []byte
     293             : }
     294             : 
     295           0 : func (o *mergeOp) run(t *test, h historyRecorder) {
     296           0 :         w := t.getWriter(o.writerID)
     297           0 :         err := w.Merge(o.key, o.value, t.writeOpts)
     298           0 :         h.Recordf("%s // %v", o, err)
     299           0 : }
     300             : 
     301           1 : func (o *mergeOp) String() string       { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
     302           0 : func (o *mergeOp) receiver() objID      { return o.writerID }
     303           0 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
     304             : 
     305             : // setOp models a Write.Set operation.
     306             : type setOp struct {
     307             :         writerID objID
     308             :         key      []byte
     309             :         value    []byte
     310             : }
     311             : 
     312           0 : func (o *setOp) run(t *test, h historyRecorder) {
     313           0 :         w := t.getWriter(o.writerID)
     314           0 :         err := w.Set(o.key, o.value, t.writeOpts)
     315           0 :         h.Recordf("%s // %v", o, err)
     316           0 : }
     317             : 
     318           1 : func (o *setOp) String() string       { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
     319           0 : func (o *setOp) receiver() objID      { return o.writerID }
     320           0 : func (o *setOp) syncObjs() objIDSlice { return nil }
     321             : 
     322             : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
     323             : type rangeKeyDeleteOp struct {
     324             :         writerID objID
     325             :         start    []byte
     326             :         end      []byte
     327             : }
     328             : 
     329           0 : func (o *rangeKeyDeleteOp) run(t *test, h historyRecorder) {
     330           0 :         w := t.getWriter(o.writerID)
     331           0 :         err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
     332           0 :         h.Recordf("%s // %v", o, err)
     333           0 : }
     334             : 
     335           1 : func (o *rangeKeyDeleteOp) String() string {
     336           1 :         return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
     337           1 : }
     338             : 
     339           0 : func (o *rangeKeyDeleteOp) receiver() objID      { return o.writerID }
     340           0 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
     341             : 
     342             : // rangeKeySetOp models a Write.RangeKeySet operation.
     343             : type rangeKeySetOp struct {
     344             :         writerID objID
     345             :         start    []byte
     346             :         end      []byte
     347             :         suffix   []byte
     348             :         value    []byte
     349             : }
     350             : 
     351           0 : func (o *rangeKeySetOp) run(t *test, h historyRecorder) {
     352           0 :         w := t.getWriter(o.writerID)
     353           0 :         err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
     354           0 :         h.Recordf("%s // %v", o, err)
     355           0 : }
     356             : 
     357           1 : func (o *rangeKeySetOp) String() string {
     358           1 :         return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
     359           1 :                 o.writerID, o.start, o.end, o.suffix, o.value)
     360           1 : }
     361             : 
     362           0 : func (o *rangeKeySetOp) receiver() objID      { return o.writerID }
     363           0 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
     364             : 
     365             : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
     366             : type rangeKeyUnsetOp struct {
     367             :         writerID objID
     368             :         start    []byte
     369             :         end      []byte
     370             :         suffix   []byte
     371             : }
     372             : 
     373           0 : func (o *rangeKeyUnsetOp) run(t *test, h historyRecorder) {
     374           0 :         w := t.getWriter(o.writerID)
     375           0 :         err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
     376           0 :         h.Recordf("%s // %v", o, err)
     377           0 : }
     378             : 
     379           1 : func (o *rangeKeyUnsetOp) String() string {
     380           1 :         return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
     381           1 :                 o.writerID, o.start, o.end, o.suffix)
     382           1 : }
     383             : 
     384           0 : func (o *rangeKeyUnsetOp) receiver() objID      { return o.writerID }
     385           0 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
     386             : 
     387             : // newBatchOp models a Write.NewBatch operation.
     388             : type newBatchOp struct {
     389             :         batchID objID
     390             : }
     391             : 
     392           0 : func (o *newBatchOp) run(t *test, h historyRecorder) {
     393           0 :         b := t.db.NewBatch()
     394           0 :         t.setBatch(o.batchID, b)
     395           0 :         h.Recordf("%s", o)
     396           0 : }
     397             : 
     398           1 : func (o *newBatchOp) String() string  { return fmt.Sprintf("%s = db.NewBatch()", o.batchID) }
     399           0 : func (o *newBatchOp) receiver() objID { return dbObjID }
     400           0 : func (o *newBatchOp) syncObjs() objIDSlice {
     401           0 :         // NewBatch should not be concurrent with operations that interact with that
     402           0 :         // same batch.
     403           0 :         return []objID{o.batchID}
     404           0 : }
     405             : 
     406             : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
     407             : type newIndexedBatchOp struct {
     408             :         batchID objID
     409             : }
     410             : 
     411           0 : func (o *newIndexedBatchOp) run(t *test, h historyRecorder) {
     412           0 :         b := t.db.NewIndexedBatch()
     413           0 :         t.setBatch(o.batchID, b)
     414           0 :         h.Recordf("%s", o)
     415           0 : }
     416             : 
     417           1 : func (o *newIndexedBatchOp) String() string {
     418           1 :         return fmt.Sprintf("%s = db.NewIndexedBatch()", o.batchID)
     419           1 : }
     420           0 : func (o *newIndexedBatchOp) receiver() objID { return dbObjID }
     421           0 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
     422           0 :         // NewIndexedBatch should not be concurrent with operations that interact
     423           0 :         // with that same batch.
     424           0 :         return []objID{o.batchID}
     425           0 : }
     426             : 
     427             : // batchCommitOp models a Batch.Commit operation.
     428             : type batchCommitOp struct {
     429             :         batchID objID
     430             : }
     431             : 
     432           0 : func (o *batchCommitOp) run(t *test, h historyRecorder) {
     433           0 :         b := t.getBatch(o.batchID)
     434           0 :         err := b.Commit(t.writeOpts)
     435           0 :         h.Recordf("%s // %v", o, err)
     436           0 : }
     437             : 
     438           1 : func (o *batchCommitOp) String() string  { return fmt.Sprintf("%s.Commit()", o.batchID) }
     439           0 : func (o *batchCommitOp) receiver() objID { return o.batchID }
     440           0 : func (o *batchCommitOp) syncObjs() objIDSlice {
     441           0 :         // Synchronize on the database so that NewIters wait for the commit.
     442           0 :         return []objID{dbObjID}
     443           0 : }
     444             : 
     445             : // ingestOp models a DB.Ingest operation.
     446             : type ingestOp struct {
     447             :         batchIDs []objID
     448             : }
     449             : 
     450           0 : func (o *ingestOp) run(t *test, h historyRecorder) {
     451           0 :         // We can only use apply as an alternative for ingestion if we are ingesting
     452           0 :         // a single batch. If we are ingesting multiple batches, the batches may
     453           0 :         // overlap which would cause ingestion to fail but apply would succeed.
     454           0 :         if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 {
     455           0 :                 id := o.batchIDs[0]
     456           0 :                 b := t.getBatch(id)
     457           0 :                 iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     458           0 :                 c, err := o.collapseBatch(t, iter, rangeDelIter, rangeKeyIter)
     459           0 :                 if err == nil {
     460           0 :                         w := t.getWriter(makeObjID(dbTag, 0))
     461           0 :                         err = w.Apply(c, t.writeOpts)
     462           0 :                 }
     463           0 :                 _ = b.Close()
     464           0 :                 _ = c.Close()
     465           0 :                 t.clearObj(id)
     466           0 :                 h.Recordf("%s // %v", o, err)
     467           0 :                 return
     468             :         }
     469             : 
     470           0 :         var paths []string
     471           0 :         var err error
     472           0 :         for i, id := range o.batchIDs {
     473           0 :                 b := t.getBatch(id)
     474           0 :                 t.clearObj(id)
     475           0 :                 path, err2 := o.build(t, h, b, i)
     476           0 :                 if err2 != nil {
     477           0 :                         h.Recordf("Build(%s) // %v", id, err2)
     478           0 :                 }
     479           0 :                 err = firstError(err, err2)
     480           0 :                 if err2 == nil {
     481           0 :                         paths = append(paths, path)
     482           0 :                 }
     483           0 :                 err = firstError(err, b.Close())
     484             :         }
     485             : 
     486           0 :         err = firstError(err, withRetries(func() error {
     487           0 :                 return t.db.Ingest(paths)
     488           0 :         }))
     489             : 
     490           0 :         h.Recordf("%s // %v", o, err)
     491             : }
     492             : 
     493           0 : func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
     494           0 :         rootFS := vfs.Root(t.opts.FS)
     495           0 :         path := rootFS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d", i))
     496           0 :         f, err := rootFS.Create(path)
     497           0 :         if err != nil {
     498           0 :                 return "", err
     499           0 :         }
     500             : 
     501           0 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     502           0 :         defer closeIters(iter, rangeDelIter, rangeKeyIter)
     503           0 : 
     504           0 :         equal := t.opts.Comparer.Equal
     505           0 :         tableFormat := t.db.FormatMajorVersion().MaxTableFormat()
     506           0 :         w := sstable.NewWriter(
     507           0 :                 objstorageprovider.NewFileWritable(f),
     508           0 :                 t.opts.MakeWriterOptions(0, tableFormat),
     509           0 :         )
     510           0 : 
     511           0 :         var lastUserKey []byte
     512           0 :         for key, value := iter.First(); key != nil; key, value = iter.Next() {
     513           0 :                 // Ignore duplicate keys.
     514           0 :                 if equal(lastUserKey, key.UserKey) {
     515           0 :                         continue
     516             :                 }
     517             :                 // NB: We don't have to copy the key or value since we're reading from a
     518             :                 // batch which doesn't do prefix compression.
     519           0 :                 lastUserKey = key.UserKey
     520           0 : 
     521           0 :                 key.SetSeqNum(base.SeqNumZero)
     522           0 :                 if err := w.Add(*key, value.InPlaceValue()); err != nil {
     523           0 :                         return "", err
     524           0 :                 }
     525             :         }
     526           0 :         if err := iter.Close(); err != nil {
     527           0 :                 return "", err
     528           0 :         }
     529           0 :         iter = nil
     530           0 : 
     531           0 :         if rangeDelIter != nil {
     532           0 :                 // NB: The range tombstones have already been fragmented by the Batch.
     533           0 :                 for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
     534           0 :                         // NB: We don't have to copy the key or value since we're reading from a
     535           0 :                         // batch which doesn't do prefix compression.
     536           0 :                         if err := w.DeleteRange(t.Start, t.End); err != nil {
     537           0 :                                 return "", err
     538           0 :                         }
     539             :                 }
     540           0 :                 if err := rangeDelIter.Close(); err != nil {
     541           0 :                         return "", err
     542           0 :                 }
     543           0 :                 rangeDelIter = nil
     544             :         }
     545             : 
     546           0 :         if err := w.Close(); err != nil {
     547           0 :                 return "", err
     548           0 :         }
     549           0 :         return path, nil
     550             : }
     551             : 
     552           0 : func (o *ingestOp) receiver() objID { return dbObjID }
     553           0 : func (o *ingestOp) syncObjs() objIDSlice {
     554           0 :         // Ingest should not be concurrent with mutating the batches that will be
     555           0 :         // ingested as sstables.
     556           0 :         return o.batchIDs
     557           0 : }
     558             : 
     559             : func closeIters(
     560             :         pointIter base.InternalIterator,
     561             :         rangeDelIter keyspan.FragmentIterator,
     562             :         rangeKeyIter keyspan.FragmentIterator,
     563           0 : ) {
     564           0 :         if pointIter != nil {
     565           0 :                 pointIter.Close()
     566           0 :         }
     567           0 :         if rangeDelIter != nil {
     568           0 :                 rangeDelIter.Close()
     569           0 :         }
     570           0 :         if rangeKeyIter != nil {
     571           0 :                 rangeKeyIter.Close()
     572           0 :         }
     573             : }
     574             : 
     575             : // collapseBatch collapses the mutations in a batch to be equivalent to an
     576             : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
     577             : // so that only the latest update is performed. All range deletions are
     578             : // performed first in the batch to match the semantics of ingestion where a
     579             : // range deletion does not delete a point record contained in the sstable.
     580             : func (o *ingestOp) collapseBatch(
     581             :         t *test, pointIter base.InternalIterator, rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     582           0 : ) (*pebble.Batch, error) {
     583           0 :         defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
     584           0 :         equal := t.opts.Comparer.Equal
     585           0 :         collapsed := t.db.NewBatch()
     586           0 : 
     587           0 :         if rangeDelIter != nil {
     588           0 :                 // NB: The range tombstones have already been fragmented by the Batch.
     589           0 :                 for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
     590           0 :                         // NB: We don't have to copy the key or value since we're reading from a
     591           0 :                         // batch which doesn't do prefix compression.
     592           0 :                         if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
     593           0 :                                 return nil, err
     594           0 :                         }
     595             :                 }
     596           0 :                 if err := rangeDelIter.Close(); err != nil {
     597           0 :                         return nil, err
     598           0 :                 }
     599           0 :                 rangeDelIter = nil
     600             :         }
     601             : 
     602           0 :         if pointIter != nil {
     603           0 :                 var lastUserKey []byte
     604           0 :                 for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
     605           0 :                         // Ignore duplicate keys.
     606           0 :                         if equal(lastUserKey, key.UserKey) {
     607           0 :                                 continue
     608             :                         }
     609             :                         // NB: We don't have to copy the key or value since we're reading from a
     610             :                         // batch which doesn't do prefix compression.
     611           0 :                         lastUserKey = key.UserKey
     612           0 : 
     613           0 :                         var err error
     614           0 :                         switch key.Kind() {
     615           0 :                         case pebble.InternalKeyKindDelete:
     616           0 :                                 err = collapsed.Delete(key.UserKey, nil)
     617           0 :                         case pebble.InternalKeyKindDeleteSized:
     618           0 :                                 v, _ := binary.Uvarint(value.InPlaceValue())
     619           0 :                                 // Batch.DeleteSized takes just the length of the value being
     620           0 :                                 // deleted and adds the key's length to derive the overall entry
     621           0 :                                 // size of the value being deleted. This has already been done
     622           0 :                                 // to the key we're reading from the batch, so we must subtract
     623           0 :                                 // the key length from the encoded value before calling
     624           0 :                                 // collapsed.DeleteSized, which will again add the key length
     625           0 :                                 // before encoding.
     626           0 :                                 err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
     627           0 :                         case pebble.InternalKeyKindSingleDelete:
     628           0 :                                 err = collapsed.SingleDelete(key.UserKey, nil)
     629           0 :                         case pebble.InternalKeyKindSet:
     630           0 :                                 err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
     631           0 :                         case pebble.InternalKeyKindMerge:
     632           0 :                                 err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
     633           0 :                         case pebble.InternalKeyKindLogData:
     634           0 :                                 err = collapsed.LogData(key.UserKey, nil)
     635           0 :                         default:
     636           0 :                                 err = errors.Errorf("unknown batch record kind: %d", key.Kind())
     637             :                         }
     638           0 :                         if err != nil {
     639           0 :                                 return nil, err
     640           0 :                         }
     641             :                 }
     642           0 :                 if err := pointIter.Close(); err != nil {
     643           0 :                         return nil, err
     644           0 :                 }
     645           0 :                 pointIter = nil
     646             :         }
     647             : 
     648           0 :         return collapsed, nil
     649             : }
     650             : 
     651           1 : func (o *ingestOp) String() string {
     652           1 :         var buf strings.Builder
     653           1 :         buf.WriteString("db.Ingest(")
     654           1 :         for i, id := range o.batchIDs {
     655           1 :                 if i > 0 {
     656           1 :                         buf.WriteString(", ")
     657           1 :                 }
     658           1 :                 buf.WriteString(id.String())
     659             :         }
     660           1 :         buf.WriteString(")")
     661           1 :         return buf.String()
     662             : }
     663             : 
     664             : // getOp models a Reader.Get operation.
     665             : type getOp struct {
     666             :         readerID objID
     667             :         key      []byte
     668             : }
     669             : 
     670           0 : func (o *getOp) run(t *test, h historyRecorder) {
     671           0 :         r := t.getReader(o.readerID)
     672           0 :         var val []byte
     673           0 :         var closer io.Closer
     674           0 :         err := withRetries(func() (err error) {
     675           0 :                 val, closer, err = r.Get(o.key)
     676           0 :                 return err
     677           0 :         })
     678           0 :         h.Recordf("%s // [%q] %v", o, val, err)
     679           0 :         if closer != nil {
     680           0 :                 closer.Close()
     681           0 :         }
     682             : }
     683             : 
     684           1 : func (o *getOp) String() string  { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
     685           0 : func (o *getOp) receiver() objID { return o.readerID }
     686           0 : func (o *getOp) syncObjs() objIDSlice {
     687           0 :         if o.readerID == dbObjID {
     688           0 :                 return nil
     689           0 :         }
     690             :         // batch.Get reads through to the current database state.
     691           0 :         return []objID{dbObjID}
     692             : }
     693             : 
     694             : // newIterOp models a Reader.NewIter operation.
     695             : type newIterOp struct {
     696             :         readerID objID
     697             :         iterID   objID
     698             :         iterOpts
     699             : }
     700             : 
     701           0 : func (o *newIterOp) run(t *test, h historyRecorder) {
     702           0 :         r := t.getReader(o.readerID)
     703           0 :         opts := iterOptions(o.iterOpts)
     704           0 : 
     705           0 :         var i *pebble.Iterator
     706           0 :         for {
     707           0 :                 i, _ = r.NewIter(opts)
     708           0 :                 if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
     709           0 :                         break
     710             :                 }
     711             :                 // close this iter and retry NewIter
     712           0 :                 _ = i.Close()
     713             :         }
     714           0 :         t.setIter(o.iterID, i, o.filterMin, o.filterMax)
     715           0 : 
     716           0 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     717           0 :         // the user-provided bounds.
     718           0 :         if opts != nil {
     719           0 :                 rand.Read(opts.LowerBound[:])
     720           0 :                 rand.Read(opts.UpperBound[:])
     721           0 :         }
     722           0 :         h.Recordf("%s // %v", o, i.Error())
     723             : }
     724             : 
     725           1 : func (o *newIterOp) String() string {
     726           1 :         return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     727           1 :                 o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     728           1 : }
     729             : 
     730           0 : func (o *newIterOp) receiver() objID { return o.readerID }
     731           0 : func (o *newIterOp) syncObjs() objIDSlice {
     732           0 :         // Prevent o.iterID ops from running before it exists.
     733           0 :         objs := []objID{o.iterID}
     734           0 :         // If reading through a batch, the new iterator will also observe database
     735           0 :         // state, and we must synchronize on the database state for a consistent
     736           0 :         // view.
     737           0 :         if o.readerID.tag() == batchTag {
     738           0 :                 objs = append(objs, dbObjID)
     739           0 :         }
     740           0 :         return objs
     741             : }
     742             : 
     743             : // newIterUsingCloneOp models a Iterator.Clone operation.
     744             : type newIterUsingCloneOp struct {
     745             :         existingIterID objID
     746             :         iterID         objID
     747             :         refreshBatch   bool
     748             :         iterOpts
     749             : 
     750             :         // derivedReaderID is the ID of the underlying reader that backs both the
     751             :         // existing iterator and the new iterator. The derivedReaderID is NOT
     752             :         // serialized by String and is derived from other operations during parse.
     753             :         derivedReaderID objID
     754             : }
     755             : 
     756           0 : func (o *newIterUsingCloneOp) run(t *test, h historyRecorder) {
     757           0 :         iter := t.getIter(o.existingIterID)
     758           0 :         cloneOpts := pebble.CloneOptions{
     759           0 :                 IterOptions:      iterOptions(o.iterOpts),
     760           0 :                 RefreshBatchView: o.refreshBatch,
     761           0 :         }
     762           0 :         i, err := iter.iter.Clone(cloneOpts)
     763           0 :         if err != nil {
     764           0 :                 panic(err)
     765             :         }
     766           0 :         filterMin, filterMax := o.filterMin, o.filterMax
     767           0 :         if cloneOpts.IterOptions == nil {
     768           0 :                 // We're adopting the same block property filters as iter, so we need to
     769           0 :                 // adopt the same run-time filters to ensure determinism.
     770           0 :                 filterMin, filterMax = iter.filterMin, iter.filterMax
     771           0 :         }
     772           0 :         t.setIter(o.iterID, i, filterMin, filterMax)
     773           0 :         h.Recordf("%s // %v", o, i.Error())
     774             : }
     775             : 
     776           1 : func (o *newIterUsingCloneOp) String() string {
     777           1 :         return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     778           1 :                 o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
     779           1 :                 o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     780           1 : }
     781             : 
     782           0 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
     783             : 
     784           0 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
     785           0 :         objIDs := []objID{o.iterID}
     786           0 :         // If the underlying reader is a batch, we must synchronize with the batch.
     787           0 :         // If refreshBatch=true, synchronizing is necessary to observe all the
     788           0 :         // mutations up to until this op and no more. Even when refreshBatch=false,
     789           0 :         // we must synchronize because iterator construction may access state cached
     790           0 :         // on the indexed batch to avoid refragmenting range tombstones or range
     791           0 :         // keys.
     792           0 :         if o.derivedReaderID.tag() == batchTag {
     793           0 :                 objIDs = append(objIDs, o.derivedReaderID)
     794           0 :         }
     795           0 :         return objIDs
     796             : }
     797             : 
     798             : // iterSetBoundsOp models an Iterator.SetBounds operation.
     799             : type iterSetBoundsOp struct {
     800             :         iterID objID
     801             :         lower  []byte
     802             :         upper  []byte
     803             : }
     804             : 
     805           0 : func (o *iterSetBoundsOp) run(t *test, h historyRecorder) {
     806           0 :         i := t.getIter(o.iterID)
     807           0 :         var lower, upper []byte
     808           0 :         if o.lower != nil {
     809           0 :                 lower = append(lower, o.lower...)
     810           0 :         }
     811           0 :         if o.upper != nil {
     812           0 :                 upper = append(upper, o.upper...)
     813           0 :         }
     814           0 :         i.SetBounds(lower, upper)
     815           0 : 
     816           0 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     817           0 :         // the user-provided bounds.
     818           0 :         rand.Read(lower[:])
     819           0 :         rand.Read(upper[:])
     820           0 : 
     821           0 :         h.Recordf("%s // %v", o, i.Error())
     822             : }
     823             : 
     824           1 : func (o *iterSetBoundsOp) String() string {
     825           1 :         return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
     826           1 : }
     827             : 
     828           0 : func (o *iterSetBoundsOp) receiver() objID      { return o.iterID }
     829           0 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
     830             : 
     831             : // iterSetOptionsOp models an Iterator.SetOptions operation.
     832             : type iterSetOptionsOp struct {
     833             :         iterID objID
     834             :         iterOpts
     835             : 
     836             :         // derivedReaderID is the ID of the underlying reader that backs the
     837             :         // iterator. The derivedReaderID is NOT serialized by String and is derived
     838             :         // from other operations during parse.
     839             :         derivedReaderID objID
     840             : }
     841             : 
     842           0 : func (o *iterSetOptionsOp) run(t *test, h historyRecorder) {
     843           0 :         i := t.getIter(o.iterID)
     844           0 : 
     845           0 :         opts := iterOptions(o.iterOpts)
     846           0 :         if opts == nil {
     847           0 :                 opts = &pebble.IterOptions{}
     848           0 :         }
     849           0 :         i.SetOptions(opts)
     850           0 : 
     851           0 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     852           0 :         // the user-provided bounds.
     853           0 :         rand.Read(opts.LowerBound[:])
     854           0 :         rand.Read(opts.UpperBound[:])
     855           0 : 
     856           0 :         // Adjust the iterator's filters.
     857           0 :         i.filterMin, i.filterMax = o.filterMin, o.filterMax
     858           0 : 
     859           0 :         h.Recordf("%s // %v", o, i.Error())
     860             : }
     861             : 
     862           1 : func (o *iterSetOptionsOp) String() string {
     863           1 :         return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     864           1 :                 o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     865           1 : }
     866             : 
     867           0 : func iterOptions(o iterOpts) *pebble.IterOptions {
     868           0 :         if o.IsZero() {
     869           0 :                 return nil
     870           0 :         }
     871           0 :         var lower, upper []byte
     872           0 :         if o.lower != nil {
     873           0 :                 lower = append(lower, o.lower...)
     874           0 :         }
     875           0 :         if o.upper != nil {
     876           0 :                 upper = append(upper, o.upper...)
     877           0 :         }
     878           0 :         opts := &pebble.IterOptions{
     879           0 :                 LowerBound: lower,
     880           0 :                 UpperBound: upper,
     881           0 :                 KeyTypes:   pebble.IterKeyType(o.keyTypes),
     882           0 :                 RangeKeyMasking: pebble.RangeKeyMasking{
     883           0 :                         Suffix: o.maskSuffix,
     884           0 :                 },
     885           0 :                 UseL6Filters: o.useL6Filters,
     886           0 :         }
     887           0 :         if opts.RangeKeyMasking.Suffix != nil {
     888           0 :                 opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
     889           0 :                         return sstable.NewTestKeysMaskingFilter()
     890           0 :                 }
     891             :         }
     892           0 :         if o.filterMax > 0 {
     893           0 :                 opts.PointKeyFilters = []pebble.BlockPropertyFilter{
     894           0 :                         sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
     895           0 :                 }
     896           0 :         }
     897           0 :         return opts
     898             : }
     899             : 
     900           0 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
     901             : 
     902           0 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
     903           0 :         if o.derivedReaderID.tag() == batchTag {
     904           0 :                 // If the underlying reader is a batch, we must synchronize with the
     905           0 :                 // batch so that we observe all the mutations up until this operation
     906           0 :                 // and no more.
     907           0 :                 return []objID{o.derivedReaderID}
     908           0 :         }
     909           0 :         return nil
     910             : }
     911             : 
     912             : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
     913             : type iterSeekGEOp struct {
     914             :         iterID objID
     915             :         key    []byte
     916             :         limit  []byte
     917             : 
     918             :         derivedReaderID objID
     919             : }
     920             : 
     921           0 : func iteratorPos(i *retryableIter) string {
     922           0 :         var buf bytes.Buffer
     923           0 :         fmt.Fprintf(&buf, "%q", i.Key())
     924           0 :         hasPoint, hasRange := i.HasPointAndRange()
     925           0 :         if hasPoint {
     926           0 :                 fmt.Fprintf(&buf, ",%q", i.Value())
     927           0 :         } else {
     928           0 :                 fmt.Fprint(&buf, ",<no point>")
     929           0 :         }
     930           0 :         if hasRange {
     931           0 :                 start, end := i.RangeBounds()
     932           0 :                 fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
     933           0 :                 for i, rk := range i.RangeKeys() {
     934           0 :                         if i > 0 {
     935           0 :                                 fmt.Fprint(&buf, ",")
     936           0 :                         }
     937           0 :                         fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
     938             :                 }
     939           0 :                 fmt.Fprint(&buf, "}")
     940           0 :         } else {
     941           0 :                 fmt.Fprint(&buf, ",<no range>")
     942           0 :         }
     943           0 :         if i.RangeKeyChanged() {
     944           0 :                 fmt.Fprint(&buf, "*")
     945           0 :         }
     946           0 :         return buf.String()
     947             : }
     948             : 
     949           0 : func validBoolToStr(valid bool) string {
     950           0 :         return fmt.Sprintf("%t", valid)
     951           0 : }
     952             : 
     953           0 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
     954           0 :         // We can't distinguish between IterExhausted and IterAtLimit in a
     955           0 :         // deterministic manner.
     956           0 :         switch validity {
     957           0 :         case pebble.IterExhausted, pebble.IterAtLimit:
     958           0 :                 return false, "invalid"
     959           0 :         case pebble.IterValid:
     960           0 :                 return true, "valid"
     961           0 :         default:
     962           0 :                 panic("unknown validity")
     963             :         }
     964             : }
     965             : 
     966           0 : func (o *iterSeekGEOp) run(t *test, h historyRecorder) {
     967           0 :         i := t.getIter(o.iterID)
     968           0 :         var valid bool
     969           0 :         var validStr string
     970           0 :         if o.limit == nil {
     971           0 :                 valid = i.SeekGE(o.key)
     972           0 :                 validStr = validBoolToStr(valid)
     973           0 :         } else {
     974           0 :                 valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
     975           0 :         }
     976           0 :         if valid {
     977           0 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
     978           0 :         } else {
     979           0 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
     980           0 :         }
     981             : }
     982             : 
     983           1 : func (o *iterSeekGEOp) String() string {
     984           1 :         return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
     985           1 : }
     986           0 : func (o *iterSeekGEOp) receiver() objID      { return o.iterID }
     987           0 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
     988             : 
     989           0 : func onlyBatchIDs(ids ...objID) objIDSlice {
     990           0 :         var ret objIDSlice
     991           0 :         for _, id := range ids {
     992           0 :                 if id.tag() == batchTag {
     993           0 :                         ret = append(ret, id)
     994           0 :                 }
     995             :         }
     996           0 :         return ret
     997             : }
     998             : 
     999             : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
    1000             : type iterSeekPrefixGEOp struct {
    1001             :         iterID objID
    1002             :         key    []byte
    1003             : 
    1004             :         derivedReaderID objID
    1005             : }
    1006             : 
    1007           0 : func (o *iterSeekPrefixGEOp) run(t *test, h historyRecorder) {
    1008           0 :         i := t.getIter(o.iterID)
    1009           0 :         valid := i.SeekPrefixGE(o.key)
    1010           0 :         if valid {
    1011           0 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1012           0 :         } else {
    1013           0 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1014           0 :         }
    1015             : }
    1016             : 
    1017           1 : func (o *iterSeekPrefixGEOp) String() string {
    1018           1 :         return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
    1019           1 : }
    1020           0 : func (o *iterSeekPrefixGEOp) receiver() objID      { return o.iterID }
    1021           0 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1022             : 
    1023             : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
    1024             : type iterSeekLTOp struct {
    1025             :         iterID objID
    1026             :         key    []byte
    1027             :         limit  []byte
    1028             : 
    1029             :         derivedReaderID objID
    1030             : }
    1031             : 
    1032           0 : func (o *iterSeekLTOp) run(t *test, h historyRecorder) {
    1033           0 :         i := t.getIter(o.iterID)
    1034           0 :         var valid bool
    1035           0 :         var validStr string
    1036           0 :         if o.limit == nil {
    1037           0 :                 valid = i.SeekLT(o.key)
    1038           0 :                 validStr = validBoolToStr(valid)
    1039           0 :         } else {
    1040           0 :                 valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
    1041           0 :         }
    1042           0 :         if valid {
    1043           0 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1044           0 :         } else {
    1045           0 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1046           0 :         }
    1047             : }
    1048             : 
    1049           1 : func (o *iterSeekLTOp) String() string {
    1050           1 :         return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
    1051           1 : }
    1052             : 
    1053           0 : func (o *iterSeekLTOp) receiver() objID      { return o.iterID }
    1054           0 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1055             : 
    1056             : // iterFirstOp models an Iterator.First operation.
    1057             : type iterFirstOp struct {
    1058             :         iterID objID
    1059             : 
    1060             :         derivedReaderID objID
    1061             : }
    1062             : 
    1063           0 : func (o *iterFirstOp) run(t *test, h historyRecorder) {
    1064           0 :         i := t.getIter(o.iterID)
    1065           0 :         valid := i.First()
    1066           0 :         if valid {
    1067           0 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1068           0 :         } else {
    1069           0 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1070           0 :         }
    1071             : }
    1072             : 
    1073           1 : func (o *iterFirstOp) String() string       { return fmt.Sprintf("%s.First()", o.iterID) }
    1074           0 : func (o *iterFirstOp) receiver() objID      { return o.iterID }
    1075           0 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1076             : 
    1077             : // iterLastOp models an Iterator.Last operation.
    1078             : type iterLastOp struct {
    1079             :         iterID objID
    1080             : 
    1081             :         derivedReaderID objID
    1082             : }
    1083             : 
    1084           0 : func (o *iterLastOp) run(t *test, h historyRecorder) {
    1085           0 :         i := t.getIter(o.iterID)
    1086           0 :         valid := i.Last()
    1087           0 :         if valid {
    1088           0 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1089           0 :         } else {
    1090           0 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1091           0 :         }
    1092             : }
    1093             : 
    1094           1 : func (o *iterLastOp) String() string       { return fmt.Sprintf("%s.Last()", o.iterID) }
    1095           0 : func (o *iterLastOp) receiver() objID      { return o.iterID }
    1096           0 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1097             : 
    1098             : // iterNextOp models an Iterator.Next[WithLimit] operation.
    1099             : type iterNextOp struct {
    1100             :         iterID objID
    1101             :         limit  []byte
    1102             : 
    1103             :         derivedReaderID objID
    1104             : }
    1105             : 
    1106           0 : func (o *iterNextOp) run(t *test, h historyRecorder) {
    1107           0 :         i := t.getIter(o.iterID)
    1108           0 :         var valid bool
    1109           0 :         var validStr string
    1110           0 :         if o.limit == nil {
    1111           0 :                 valid = i.Next()
    1112           0 :                 validStr = validBoolToStr(valid)
    1113           0 :         } else {
    1114           0 :                 valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
    1115           0 :         }
    1116           0 :         if valid {
    1117           0 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1118           0 :         } else {
    1119           0 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1120           0 :         }
    1121             : }
    1122             : 
    1123           1 : func (o *iterNextOp) String() string       { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
    1124           0 : func (o *iterNextOp) receiver() objID      { return o.iterID }
    1125           0 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1126             : 
    1127             : // iterNextPrefixOp models an Iterator.NextPrefix operation.
    1128             : type iterNextPrefixOp struct {
    1129             :         iterID objID
    1130             : 
    1131             :         derivedReaderID objID
    1132             : }
    1133             : 
    1134           0 : func (o *iterNextPrefixOp) run(t *test, h historyRecorder) {
    1135           0 :         i := t.getIter(o.iterID)
    1136           0 :         valid := i.NextPrefix()
    1137           0 :         validStr := validBoolToStr(valid)
    1138           0 :         if valid {
    1139           0 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1140           0 :         } else {
    1141           0 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1142           0 :         }
    1143             : }
    1144             : 
    1145           1 : func (o *iterNextPrefixOp) String() string       { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
    1146           0 : func (o *iterNextPrefixOp) receiver() objID      { return o.iterID }
    1147           0 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1148             : 
    1149             : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
    1150             : type iterPrevOp struct {
    1151             :         iterID objID
    1152             :         limit  []byte
    1153             : 
    1154             :         derivedReaderID objID
    1155             : }
    1156             : 
    1157           0 : func (o *iterPrevOp) run(t *test, h historyRecorder) {
    1158           0 :         i := t.getIter(o.iterID)
    1159           0 :         var valid bool
    1160           0 :         var validStr string
    1161           0 :         if o.limit == nil {
    1162           0 :                 valid = i.Prev()
    1163           0 :                 validStr = validBoolToStr(valid)
    1164           0 :         } else {
    1165           0 :                 valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
    1166           0 :         }
    1167           0 :         if valid {
    1168           0 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1169           0 :         } else {
    1170           0 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1171           0 :         }
    1172             : }
    1173             : 
    1174           1 : func (o *iterPrevOp) String() string       { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
    1175           0 : func (o *iterPrevOp) receiver() objID      { return o.iterID }
    1176           0 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1177             : 
    1178             : // newSnapshotOp models a DB.NewSnapshot operation.
    1179             : type newSnapshotOp struct {
    1180             :         snapID objID
    1181             :         // If nonempty, this snapshot must not be used to read any keys outside of
    1182             :         // the provided bounds. This allows some implementations to use 'Eventually
    1183             :         // file-only snapshots,' which require bounds.
    1184             :         bounds []pebble.KeyRange
    1185             : }
    1186             : 
    1187           0 : func (o *newSnapshotOp) run(t *test, h historyRecorder) {
    1188           0 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
    1189           0 :         if len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1 {
    1190           0 :                 s := t.db.NewEventuallyFileOnlySnapshot(o.bounds)
    1191           0 :                 t.setSnapshot(o.snapID, s)
    1192           0 :         } else {
    1193           0 :                 s := t.db.NewSnapshot()
    1194           0 :                 t.setSnapshot(o.snapID, s)
    1195           0 :         }
    1196           0 :         h.Recordf("%s", o)
    1197             : }
    1198             : 
    1199           1 : func (o *newSnapshotOp) String() string {
    1200           1 :         var buf bytes.Buffer
    1201           1 :         fmt.Fprintf(&buf, "%s = db.NewSnapshot(", o.snapID)
    1202           1 :         for i := range o.bounds {
    1203           1 :                 if i > 0 {
    1204           1 :                         fmt.Fprint(&buf, ", ")
    1205           1 :                 }
    1206           1 :                 fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
    1207             :         }
    1208           1 :         fmt.Fprint(&buf, ")")
    1209           1 :         return buf.String()
    1210             : }
    1211           0 : func (o *newSnapshotOp) receiver() objID      { return dbObjID }
    1212           0 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
    1213             : 
    1214             : type dbRatchetFormatMajorVersionOp struct {
    1215             :         vers pebble.FormatMajorVersion
    1216             : }
    1217             : 
    1218           0 : func (o *dbRatchetFormatMajorVersionOp) run(t *test, h historyRecorder) {
    1219           0 :         var err error
    1220           0 :         // NB: We no-op the operation if we're already at or above the provided
    1221           0 :         // format major version. Different runs start at different format major
    1222           0 :         // versions, making the presence of an error and the error message itself
    1223           0 :         // non-deterministic if we attempt to upgrade to an older version.
    1224           0 :         //
    1225           0 :         //Regardless, subsequent operations should behave identically, which is what
    1226           0 :         //we're really aiming to test by including this format major version ratchet
    1227           0 :         //operation.
    1228           0 :         if t.db.FormatMajorVersion() < o.vers {
    1229           0 :                 err = t.db.RatchetFormatMajorVersion(o.vers)
    1230           0 :         }
    1231           0 :         h.Recordf("%s // %v", o, err)
    1232             : }
    1233             : 
    1234           1 : func (o *dbRatchetFormatMajorVersionOp) String() string {
    1235           1 :         return fmt.Sprintf("db.RatchetFormatMajorVersion(%s)", o.vers)
    1236           1 : }
    1237           0 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID      { return dbObjID }
    1238           0 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
    1239             : 
    1240             : type dbRestartOp struct{}
    1241             : 
    1242           0 : func (o *dbRestartOp) run(t *test, h historyRecorder) {
    1243           0 :         if err := t.restartDB(); err != nil {
    1244           0 :                 h.Recordf("%s // %v", o, err)
    1245           0 :                 h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
    1246           0 :         } else {
    1247           0 :                 h.Recordf("%s", o)
    1248           0 :         }
    1249             : }
    1250             : 
    1251           1 : func (o *dbRestartOp) String() string       { return "db.Restart()" }
    1252           0 : func (o *dbRestartOp) receiver() objID      { return dbObjID }
    1253           0 : func (o *dbRestartOp) syncObjs() objIDSlice { return nil }
    1254             : 
    1255           1 : func formatOps(ops []op) string {
    1256           1 :         var buf strings.Builder
    1257           1 :         for _, op := range ops {
    1258           1 :                 fmt.Fprintf(&buf, "%s\n", op)
    1259           1 :         }
    1260           1 :         return buf.String()
    1261             : }

Generated by: LCOV version 1.14