LCOV - code coverage report
Current view: top level - pebble/metamorphic - ops.go (source / functions) Hit Total Coverage
Test: 2023-10-04 08:18Z b013ca78 - tests + meta.lcov Lines: 764 800 95.5 %
Date: 2023-10-04 08:19:51 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "crypto/rand"
      10             :         "encoding/binary"
      11             :         "fmt"
      12             :         "io"
      13             :         "path/filepath"
      14             :         "strings"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/pebble"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/keyspan"
      20             :         "github.com/cockroachdb/pebble/internal/private"
      21             :         "github.com/cockroachdb/pebble/internal/testkeys"
      22             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      23             :         "github.com/cockroachdb/pebble/sstable"
      24             :         "github.com/cockroachdb/pebble/vfs"
      25             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      26             : )
      27             : 
      28             : // op defines the interface for a single operation, such as creating a batch,
      29             : // or advancing an iterator.
      30             : type op interface {
      31             :         String() string
      32             :         run(t *test, h historyRecorder)
      33             : 
      34             :         // receiver returns the object ID of the object the operation is performed
      35             :         // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
      36             :         // its receiver). Receivers are used for synchronization when running with
      37             :         // concurrency.
      38             :         receiver() objID
      39             : 
      40             :         // syncObjs returns an additional set of object IDs—excluding the
      41             :         // receiver—that the operation must synchronize with. At execution time,
      42             :         // the operation will run serially with respect to all other operations
      43             :         // that return these objects from their own syncObjs or receiver methods.
      44             :         syncObjs() objIDSlice
      45             : }
      46             : 
      47             : // initOp performs test initialization
      48             : type initOp struct {
      49             :         batchSlots    uint32
      50             :         iterSlots     uint32
      51             :         snapshotSlots uint32
      52             : }
      53             : 
      54           2 : func (o *initOp) run(t *test, h historyRecorder) {
      55           2 :         t.batches = make([]*pebble.Batch, o.batchSlots)
      56           2 :         t.iters = make([]*retryableIter, o.iterSlots)
      57           2 :         t.snapshots = make([]readerCloser, o.snapshotSlots)
      58           2 :         h.Recordf("%s", o)
      59           2 : }
      60             : 
      61           2 : func (o *initOp) String() string {
      62           2 :         return fmt.Sprintf("Init(%d /* batches */, %d /* iters */, %d /* snapshots */)",
      63           2 :                 o.batchSlots, o.iterSlots, o.snapshotSlots)
      64           2 : }
      65             : 
      66           2 : func (o *initOp) receiver() objID      { return dbObjID }
      67           0 : func (o *initOp) syncObjs() objIDSlice { return nil }
      68             : 
      69             : // applyOp models a Writer.Apply operation.
      70             : type applyOp struct {
      71             :         writerID objID
      72             :         batchID  objID
      73             : }
      74             : 
      75           2 : func (o *applyOp) run(t *test, h historyRecorder) {
      76           2 :         b := t.getBatch(o.batchID)
      77           2 :         w := t.getWriter(o.writerID)
      78           2 :         var err error
      79           2 :         if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
      80           1 :                 err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
      81           1 :                 if err == nil {
      82           1 :                         err = b.SyncWait()
      83           1 :                 }
      84           2 :         } else {
      85           2 :                 err = w.Apply(b, t.writeOpts)
      86           2 :         }
      87           2 :         h.Recordf("%s // %v", o, err)
      88             :         // batch will be closed by a closeOp which is guaranteed to be generated
      89             : }
      90             : 
      91           2 : func (o *applyOp) String() string  { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
      92           2 : func (o *applyOp) receiver() objID { return o.writerID }
      93           2 : func (o *applyOp) syncObjs() objIDSlice {
      94           2 :         // Apply should not be concurrent with operations that are mutating the
      95           2 :         // batch.
      96           2 :         return []objID{o.batchID}
      97           2 : }
      98             : 
      99             : // checkpointOp models a DB.Checkpoint operation.
     100             : type checkpointOp struct {
     101             :         // If non-empty, the checkpoint is restricted to these spans.
     102             :         spans []pebble.CheckpointSpan
     103             : }
     104             : 
     105           2 : func (o *checkpointOp) run(t *test, h historyRecorder) {
     106           2 :         // TODO(josh): db.Checkpoint does not work with shared storage yet.
     107           2 :         // It would be better to filter out ahead of calling run on the op,
     108           2 :         // by setting the weight that generator.go uses to zero, or similar.
     109           2 :         // But IIUC the ops are shared for ALL the metamorphic test runs, so
     110           2 :         // not sure how to do that easily:
     111           2 :         // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
     112           2 :         if t.testOpts.sharedStorageEnabled {
     113           1 :                 h.Recordf("%s // %v", o, nil)
     114           1 :                 return
     115           1 :         }
     116           2 :         var opts []pebble.CheckpointOption
     117           2 :         if len(o.spans) > 0 {
     118           2 :                 opts = append(opts, pebble.WithRestrictToSpans(o.spans))
     119           2 :         }
     120           2 :         err := withRetries(func() error {
     121           2 :                 return t.db.Checkpoint(o.dir(t.dir, h.op), opts...)
     122           2 :         })
     123           2 :         h.Recordf("%s // %v", o, err)
     124             : }
     125             : 
     126           2 : func (o *checkpointOp) dir(dataDir string, idx int) string {
     127           2 :         return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
     128           2 : }
     129             : 
     130           2 : func (o *checkpointOp) String() string {
     131           2 :         var spanStr bytes.Buffer
     132           2 :         for i, span := range o.spans {
     133           2 :                 if i > 0 {
     134           2 :                         spanStr.WriteString(",")
     135           2 :                 }
     136           2 :                 fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
     137             :         }
     138           2 :         return fmt.Sprintf("db.Checkpoint(%s)", spanStr.String())
     139             : }
     140             : 
     141           2 : func (o *checkpointOp) receiver() objID      { return dbObjID }
     142           2 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
     143             : 
     144             : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
     145             : type closeOp struct {
     146             :         objID objID
     147             : }
     148             : 
     149           2 : func (o *closeOp) run(t *test, h historyRecorder) {
     150           2 :         c := t.getCloser(o.objID)
     151           2 :         if o.objID.tag() == dbTag && t.opts.DisableWAL {
     152           1 :                 // Special case: If WAL is disabled, do a flush right before DB Close. This
     153           1 :                 // allows us to reuse this run's data directory as initial state for
     154           1 :                 // future runs without losing any mutations.
     155           1 :                 _ = t.db.Flush()
     156           1 :         }
     157           2 :         t.clearObj(o.objID)
     158           2 :         err := c.Close()
     159           2 :         h.Recordf("%s // %v", o, err)
     160             : }
     161             : 
     162           2 : func (o *closeOp) String() string  { return fmt.Sprintf("%s.Close()", o.objID) }
     163           2 : func (o *closeOp) receiver() objID { return o.objID }
     164           2 : func (o *closeOp) syncObjs() objIDSlice {
     165           2 :         // Synchronize on the database so that we don't close the database before
     166           2 :         // all its iterators, snapshots and batches are closed.
     167           2 :         // TODO(jackson): It would be nice to relax this so that Close calls can
     168           2 :         // execute in parallel.
     169           2 :         if o.objID == dbObjID {
     170           2 :                 return nil
     171           2 :         }
     172           2 :         return []objID{dbObjID}
     173             : }
     174             : 
     175             : // compactOp models a DB.Compact operation.
     176             : type compactOp struct {
     177             :         start       []byte
     178             :         end         []byte
     179             :         parallelize bool
     180             : }
     181             : 
     182           2 : func (o *compactOp) run(t *test, h historyRecorder) {
     183           2 :         err := withRetries(func() error {
     184           2 :                 return t.db.Compact(o.start, o.end, o.parallelize)
     185           2 :         })
     186           2 :         h.Recordf("%s // %v", o, err)
     187             : }
     188             : 
     189           2 : func (o *compactOp) String() string {
     190           2 :         return fmt.Sprintf("db.Compact(%q, %q, %t /* parallelize */)", o.start, o.end, o.parallelize)
     191           2 : }
     192             : 
     193           2 : func (o *compactOp) receiver() objID      { return dbObjID }
     194           2 : func (o *compactOp) syncObjs() objIDSlice { return nil }
     195             : 
     196             : // deleteOp models a Write.Delete operation.
     197             : type deleteOp struct {
     198             :         writerID objID
     199             :         key      []byte
     200             : }
     201             : 
     202           2 : func (o *deleteOp) run(t *test, h historyRecorder) {
     203           2 :         w := t.getWriter(o.writerID)
     204           2 :         var err error
     205           2 :         if t.testOpts.deleteSized && t.isFMV(pebble.FormatDeleteSizedAndObsolete) {
     206           1 :                 // Call DeleteSized with a deterministic size derived from the index.
     207           1 :                 // The size does not need to be accurate for correctness.
     208           1 :                 err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
     209           2 :         } else {
     210           2 :                 err = w.Delete(o.key, t.writeOpts)
     211           2 :         }
     212           2 :         h.Recordf("%s // %v", o, err)
     213             : }
     214             : 
     215           1 : func hashSize(index int) uint32 {
     216           1 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
     217           1 :         return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
     218           1 : }
     219             : 
     220           2 : func (o *deleteOp) String() string {
     221           2 :         return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
     222           2 : }
     223           2 : func (o *deleteOp) receiver() objID      { return o.writerID }
     224           2 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
     225             : 
     226             : // singleDeleteOp models a Write.SingleDelete operation.
     227             : type singleDeleteOp struct {
     228             :         writerID           objID
     229             :         key                []byte
     230             :         maybeReplaceDelete bool
     231             : }
     232             : 
     233           2 : func (o *singleDeleteOp) run(t *test, h historyRecorder) {
     234           2 :         w := t.getWriter(o.writerID)
     235           2 :         var err error
     236           2 :         if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
     237           1 :                 err = w.Delete(o.key, t.writeOpts)
     238           2 :         } else {
     239           2 :                 err = w.SingleDelete(o.key, t.writeOpts)
     240           2 :         }
     241             :         // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
     242             :         // write the former to the history log. The log line will indicate whether
     243             :         // or not the delete *could* have been replaced. The OPTIONS file should
     244             :         // also be consulted to determine what happened at runtime (i.e. by taking
     245             :         // the logical AND).
     246           2 :         h.Recordf("%s // %v", o, err)
     247             : }
     248             : 
     249           2 : func (o *singleDeleteOp) String() string {
     250           2 :         return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
     251           2 : }
     252             : 
     253           2 : func (o *singleDeleteOp) receiver() objID      { return o.writerID }
     254           2 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
     255             : 
     256             : // deleteRangeOp models a Write.DeleteRange operation.
     257             : type deleteRangeOp struct {
     258             :         writerID objID
     259             :         start    []byte
     260             :         end      []byte
     261             : }
     262             : 
     263           2 : func (o *deleteRangeOp) run(t *test, h historyRecorder) {
     264           2 :         w := t.getWriter(o.writerID)
     265           2 :         err := w.DeleteRange(o.start, o.end, t.writeOpts)
     266           2 :         h.Recordf("%s // %v", o, err)
     267           2 : }
     268             : 
     269           2 : func (o *deleteRangeOp) String() string {
     270           2 :         return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
     271           2 : }
     272             : 
     273           2 : func (o *deleteRangeOp) receiver() objID      { return o.writerID }
     274           2 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
     275             : 
     276             : // flushOp models a DB.Flush operation.
     277             : type flushOp struct {
     278             : }
     279             : 
     280           2 : func (o *flushOp) run(t *test, h historyRecorder) {
     281           2 :         err := t.db.Flush()
     282           2 :         h.Recordf("%s // %v", o, err)
     283           2 : }
     284             : 
     285           2 : func (o *flushOp) String() string       { return "db.Flush()" }
     286           2 : func (o *flushOp) receiver() objID      { return dbObjID }
     287           2 : func (o *flushOp) syncObjs() objIDSlice { return nil }
     288             : 
     289             : // mergeOp models a Write.Merge operation.
     290             : type mergeOp struct {
     291             :         writerID objID
     292             :         key      []byte
     293             :         value    []byte
     294             : }
     295             : 
     296           2 : func (o *mergeOp) run(t *test, h historyRecorder) {
     297           2 :         w := t.getWriter(o.writerID)
     298           2 :         err := w.Merge(o.key, o.value, t.writeOpts)
     299           2 :         h.Recordf("%s // %v", o, err)
     300           2 : }
     301             : 
     302           2 : func (o *mergeOp) String() string       { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
     303           2 : func (o *mergeOp) receiver() objID      { return o.writerID }
     304           2 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
     305             : 
     306             : // setOp models a Write.Set operation.
     307             : type setOp struct {
     308             :         writerID objID
     309             :         key      []byte
     310             :         value    []byte
     311             : }
     312             : 
     313           2 : func (o *setOp) run(t *test, h historyRecorder) {
     314           2 :         w := t.getWriter(o.writerID)
     315           2 :         err := w.Set(o.key, o.value, t.writeOpts)
     316           2 :         h.Recordf("%s // %v", o, err)
     317           2 : }
     318             : 
     319           2 : func (o *setOp) String() string       { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
     320           2 : func (o *setOp) receiver() objID      { return o.writerID }
     321           2 : func (o *setOp) syncObjs() objIDSlice { return nil }
     322             : 
     323             : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
     324             : type rangeKeyDeleteOp struct {
     325             :         writerID objID
     326             :         start    []byte
     327             :         end      []byte
     328             : }
     329             : 
     330           2 : func (o *rangeKeyDeleteOp) run(t *test, h historyRecorder) {
     331           2 :         w := t.getWriter(o.writerID)
     332           2 :         err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
     333           2 :         h.Recordf("%s // %v", o, err)
     334           2 : }
     335             : 
     336           2 : func (o *rangeKeyDeleteOp) String() string {
     337           2 :         return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
     338           2 : }
     339             : 
     340           2 : func (o *rangeKeyDeleteOp) receiver() objID      { return o.writerID }
     341           2 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
     342             : 
     343             : // rangeKeySetOp models a Write.RangeKeySet operation.
     344             : type rangeKeySetOp struct {
     345             :         writerID objID
     346             :         start    []byte
     347             :         end      []byte
     348             :         suffix   []byte
     349             :         value    []byte
     350             : }
     351             : 
     352           2 : func (o *rangeKeySetOp) run(t *test, h historyRecorder) {
     353           2 :         w := t.getWriter(o.writerID)
     354           2 :         err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
     355           2 :         h.Recordf("%s // %v", o, err)
     356           2 : }
     357             : 
     358           2 : func (o *rangeKeySetOp) String() string {
     359           2 :         return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
     360           2 :                 o.writerID, o.start, o.end, o.suffix, o.value)
     361           2 : }
     362             : 
     363           2 : func (o *rangeKeySetOp) receiver() objID      { return o.writerID }
     364           2 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
     365             : 
     366             : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
     367             : type rangeKeyUnsetOp struct {
     368             :         writerID objID
     369             :         start    []byte
     370             :         end      []byte
     371             :         suffix   []byte
     372             : }
     373             : 
     374           2 : func (o *rangeKeyUnsetOp) run(t *test, h historyRecorder) {
     375           2 :         w := t.getWriter(o.writerID)
     376           2 :         err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
     377           2 :         h.Recordf("%s // %v", o, err)
     378           2 : }
     379             : 
     380           2 : func (o *rangeKeyUnsetOp) String() string {
     381           2 :         return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
     382           2 :                 o.writerID, o.start, o.end, o.suffix)
     383           2 : }
     384             : 
     385           2 : func (o *rangeKeyUnsetOp) receiver() objID      { return o.writerID }
     386           2 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
     387             : 
     388             : // newBatchOp models a Write.NewBatch operation.
     389             : type newBatchOp struct {
     390             :         batchID objID
     391             : }
     392             : 
     393           2 : func (o *newBatchOp) run(t *test, h historyRecorder) {
     394           2 :         b := t.db.NewBatch()
     395           2 :         t.setBatch(o.batchID, b)
     396           2 :         h.Recordf("%s", o)
     397           2 : }
     398             : 
     399           2 : func (o *newBatchOp) String() string  { return fmt.Sprintf("%s = db.NewBatch()", o.batchID) }
     400           2 : func (o *newBatchOp) receiver() objID { return dbObjID }
     401           2 : func (o *newBatchOp) syncObjs() objIDSlice {
     402           2 :         // NewBatch should not be concurrent with operations that interact with that
     403           2 :         // same batch.
     404           2 :         return []objID{o.batchID}
     405           2 : }
     406             : 
     407             : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
     408             : type newIndexedBatchOp struct {
     409             :         batchID objID
     410             : }
     411             : 
     412           2 : func (o *newIndexedBatchOp) run(t *test, h historyRecorder) {
     413           2 :         b := t.db.NewIndexedBatch()
     414           2 :         t.setBatch(o.batchID, b)
     415           2 :         h.Recordf("%s", o)
     416           2 : }
     417             : 
     418           2 : func (o *newIndexedBatchOp) String() string {
     419           2 :         return fmt.Sprintf("%s = db.NewIndexedBatch()", o.batchID)
     420           2 : }
     421           2 : func (o *newIndexedBatchOp) receiver() objID { return dbObjID }
     422           2 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
     423           2 :         // NewIndexedBatch should not be concurrent with operations that interact
     424           2 :         // with that same batch.
     425           2 :         return []objID{o.batchID}
     426           2 : }
     427             : 
     428             : // batchCommitOp models a Batch.Commit operation.
     429             : type batchCommitOp struct {
     430             :         batchID objID
     431             : }
     432             : 
     433           1 : func (o *batchCommitOp) run(t *test, h historyRecorder) {
     434           1 :         b := t.getBatch(o.batchID)
     435           1 :         err := b.Commit(t.writeOpts)
     436           1 :         h.Recordf("%s // %v", o, err)
     437           1 : }
     438             : 
     439           2 : func (o *batchCommitOp) String() string  { return fmt.Sprintf("%s.Commit()", o.batchID) }
     440           1 : func (o *batchCommitOp) receiver() objID { return o.batchID }
     441           1 : func (o *batchCommitOp) syncObjs() objIDSlice {
     442           1 :         // Synchronize on the database so that NewIters wait for the commit.
     443           1 :         return []objID{dbObjID}
     444           1 : }
     445             : 
     446             : // ingestOp models a DB.Ingest operation.
     447             : type ingestOp struct {
     448             :         batchIDs []objID
     449             : }
     450             : 
     451           2 : func (o *ingestOp) run(t *test, h historyRecorder) {
     452           2 :         // We can only use apply as an alternative for ingestion if we are ingesting
     453           2 :         // a single batch. If we are ingesting multiple batches, the batches may
     454           2 :         // overlap which would cause ingestion to fail but apply would succeed.
     455           2 :         if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 {
     456           1 :                 id := o.batchIDs[0]
     457           1 :                 b := t.getBatch(id)
     458           1 :                 iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     459           1 :                 c, err := o.collapseBatch(t, iter, rangeDelIter, rangeKeyIter)
     460           1 :                 if err == nil {
     461           1 :                         w := t.getWriter(makeObjID(dbTag, 0))
     462           1 :                         err = w.Apply(c, t.writeOpts)
     463           1 :                 }
     464           1 :                 _ = b.Close()
     465           1 :                 _ = c.Close()
     466           1 :                 t.clearObj(id)
     467           1 :                 h.Recordf("%s // %v", o, err)
     468           1 :                 return
     469             :         }
     470             : 
     471           2 :         var paths []string
     472           2 :         var err error
     473           2 :         for i, id := range o.batchIDs {
     474           2 :                 b := t.getBatch(id)
     475           2 :                 t.clearObj(id)
     476           2 :                 path, err2 := o.build(t, h, b, i)
     477           2 :                 if err2 != nil {
     478           0 :                         h.Recordf("Build(%s) // %v", id, err2)
     479           0 :                 }
     480           2 :                 err = firstError(err, err2)
     481           2 :                 if err2 == nil {
     482           2 :                         paths = append(paths, path)
     483           2 :                 }
     484           2 :                 err = firstError(err, b.Close())
     485             :         }
     486             : 
     487           2 :         err = firstError(err, withRetries(func() error {
     488           2 :                 return t.db.Ingest(paths)
     489           2 :         }))
     490             : 
     491           2 :         h.Recordf("%s // %v", o, err)
     492             : }
     493             : 
     494           2 : func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
     495           2 :         rootFS := vfs.Root(t.opts.FS)
     496           2 :         path := rootFS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d", i))
     497           2 :         f, err := rootFS.Create(path)
     498           2 :         if err != nil {
     499           0 :                 return "", err
     500           0 :         }
     501             : 
     502           2 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     503           2 :         defer closeIters(iter, rangeDelIter, rangeKeyIter)
     504           2 : 
     505           2 :         equal := t.opts.Comparer.Equal
     506           2 :         tableFormat := t.db.FormatMajorVersion().MaxTableFormat()
     507           2 :         w := sstable.NewWriter(
     508           2 :                 objstorageprovider.NewFileWritable(f),
     509           2 :                 t.opts.MakeWriterOptions(0, tableFormat),
     510           2 :         )
     511           2 : 
     512           2 :         var lastUserKey []byte
     513           2 :         for key, value := iter.First(); key != nil; key, value = iter.Next() {
     514           2 :                 // Ignore duplicate keys.
     515           2 :                 if equal(lastUserKey, key.UserKey) {
     516           2 :                         continue
     517             :                 }
     518             :                 // NB: We don't have to copy the key or value since we're reading from a
     519             :                 // batch which doesn't do prefix compression.
     520           2 :                 lastUserKey = key.UserKey
     521           2 : 
     522           2 :                 key.SetSeqNum(base.SeqNumZero)
     523           2 :                 if err := w.Add(*key, value.InPlaceValue()); err != nil {
     524           0 :                         return "", err
     525           0 :                 }
     526             :         }
     527           2 :         if err := iter.Close(); err != nil {
     528           0 :                 return "", err
     529           0 :         }
     530           2 :         iter = nil
     531           2 : 
     532           2 :         if rangeDelIter != nil {
     533           2 :                 // NB: The range tombstones have already been fragmented by the Batch.
     534           2 :                 for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
     535           2 :                         // NB: We don't have to copy the key or value since we're reading from a
     536           2 :                         // batch which doesn't do prefix compression.
     537           2 :                         if err := w.DeleteRange(t.Start, t.End); err != nil {
     538           0 :                                 return "", err
     539           0 :                         }
     540             :                 }
     541           2 :                 if err := rangeDelIter.Close(); err != nil {
     542           0 :                         return "", err
     543           0 :                 }
     544           2 :                 rangeDelIter = nil
     545             :         }
     546             : 
     547           2 :         if err := w.Close(); err != nil {
     548           0 :                 return "", err
     549           0 :         }
     550           2 :         return path, nil
     551             : }
     552             : 
     553           2 : func (o *ingestOp) receiver() objID { return dbObjID }
     554           2 : func (o *ingestOp) syncObjs() objIDSlice {
     555           2 :         // Ingest should not be concurrent with mutating the batches that will be
     556           2 :         // ingested as sstables.
     557           2 :         return o.batchIDs
     558           2 : }
     559             : 
     560             : func closeIters(
     561             :         pointIter base.InternalIterator,
     562             :         rangeDelIter keyspan.FragmentIterator,
     563             :         rangeKeyIter keyspan.FragmentIterator,
     564           2 : ) {
     565           2 :         if pointIter != nil {
     566           2 :                 pointIter.Close()
     567           2 :         }
     568           2 :         if rangeDelIter != nil {
     569           2 :                 rangeDelIter.Close()
     570           2 :         }
     571           2 :         if rangeKeyIter != nil {
     572           2 :                 rangeKeyIter.Close()
     573           2 :         }
     574             : }
     575             : 
     576             : // collapseBatch collapses the mutations in a batch to be equivalent to an
     577             : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
     578             : // so that only the latest update is performed. All range deletions are
     579             : // performed first in the batch to match the semantics of ingestion where a
     580             : // range deletion does not delete a point record contained in the sstable.
     581             : func (o *ingestOp) collapseBatch(
     582             :         t *test, pointIter base.InternalIterator, rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     583           1 : ) (*pebble.Batch, error) {
     584           1 :         defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
     585           1 :         equal := t.opts.Comparer.Equal
     586           1 :         collapsed := t.db.NewBatch()
     587           1 : 
     588           1 :         if rangeDelIter != nil {
     589           1 :                 // NB: The range tombstones have already been fragmented by the Batch.
     590           1 :                 for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
     591           1 :                         // NB: We don't have to copy the key or value since we're reading from a
     592           1 :                         // batch which doesn't do prefix compression.
     593           1 :                         if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
     594           0 :                                 return nil, err
     595           0 :                         }
     596             :                 }
     597           1 :                 if err := rangeDelIter.Close(); err != nil {
     598           0 :                         return nil, err
     599           0 :                 }
     600           1 :                 rangeDelIter = nil
     601             :         }
     602             : 
     603           1 :         if pointIter != nil {
     604           1 :                 var lastUserKey []byte
     605           1 :                 for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
     606           1 :                         // Ignore duplicate keys.
     607           1 :                         if equal(lastUserKey, key.UserKey) {
     608           1 :                                 continue
     609             :                         }
     610             :                         // NB: We don't have to copy the key or value since we're reading from a
     611             :                         // batch which doesn't do prefix compression.
     612           1 :                         lastUserKey = key.UserKey
     613           1 : 
     614           1 :                         var err error
     615           1 :                         switch key.Kind() {
     616           1 :                         case pebble.InternalKeyKindDelete:
     617           1 :                                 err = collapsed.Delete(key.UserKey, nil)
     618           1 :                         case pebble.InternalKeyKindDeleteSized:
     619           1 :                                 v, _ := binary.Uvarint(value.InPlaceValue())
     620           1 :                                 // Batch.DeleteSized takes just the length of the value being
     621           1 :                                 // deleted and adds the key's length to derive the overall entry
     622           1 :                                 // size of the value being deleted. This has already been done
     623           1 :                                 // to the key we're reading from the batch, so we must subtract
     624           1 :                                 // the key length from the encoded value before calling
     625           1 :                                 // collapsed.DeleteSized, which will again add the key length
     626           1 :                                 // before encoding.
     627           1 :                                 err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
     628           1 :                         case pebble.InternalKeyKindSingleDelete:
     629           1 :                                 err = collapsed.SingleDelete(key.UserKey, nil)
     630           1 :                         case pebble.InternalKeyKindSet:
     631           1 :                                 err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
     632           1 :                         case pebble.InternalKeyKindMerge:
     633           1 :                                 err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
     634           0 :                         case pebble.InternalKeyKindLogData:
     635           0 :                                 err = collapsed.LogData(key.UserKey, nil)
     636           0 :                         default:
     637           0 :                                 err = errors.Errorf("unknown batch record kind: %d", key.Kind())
     638             :                         }
     639           1 :                         if err != nil {
     640           0 :                                 return nil, err
     641           0 :                         }
     642             :                 }
     643           1 :                 if err := pointIter.Close(); err != nil {
     644           0 :                         return nil, err
     645           0 :                 }
     646           1 :                 pointIter = nil
     647             :         }
     648             : 
     649           1 :         return collapsed, nil
     650             : }
     651             : 
     652           2 : func (o *ingestOp) String() string {
     653           2 :         var buf strings.Builder
     654           2 :         buf.WriteString("db.Ingest(")
     655           2 :         for i, id := range o.batchIDs {
     656           2 :                 if i > 0 {
     657           2 :                         buf.WriteString(", ")
     658           2 :                 }
     659           2 :                 buf.WriteString(id.String())
     660             :         }
     661           2 :         buf.WriteString(")")
     662           2 :         return buf.String()
     663             : }
     664             : 
     665             : // getOp models a Reader.Get operation.
     666             : type getOp struct {
     667             :         readerID objID
     668             :         key      []byte
     669             : }
     670             : 
     671           2 : func (o *getOp) run(t *test, h historyRecorder) {
     672           2 :         r := t.getReader(o.readerID)
     673           2 :         var val []byte
     674           2 :         var closer io.Closer
     675           2 :         err := withRetries(func() (err error) {
     676           2 :                 val, closer, err = r.Get(o.key)
     677           2 :                 return err
     678           2 :         })
     679           2 :         h.Recordf("%s // [%q] %v", o, val, err)
     680           2 :         if closer != nil {
     681           2 :                 closer.Close()
     682           2 :         }
     683             : }
     684             : 
     685           2 : func (o *getOp) String() string  { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
     686           2 : func (o *getOp) receiver() objID { return o.readerID }
     687           2 : func (o *getOp) syncObjs() objIDSlice {
     688           2 :         if o.readerID == dbObjID {
     689           2 :                 return nil
     690           2 :         }
     691             :         // batch.Get reads through to the current database state.
     692           2 :         return []objID{dbObjID}
     693             : }
     694             : 
     695             : // newIterOp models a Reader.NewIter operation.
     696             : type newIterOp struct {
     697             :         readerID objID
     698             :         iterID   objID
     699             :         iterOpts
     700             : }
     701             : 
     702           2 : func (o *newIterOp) run(t *test, h historyRecorder) {
     703           2 :         r := t.getReader(o.readerID)
     704           2 :         opts := iterOptions(o.iterOpts)
     705           2 : 
     706           2 :         var i *pebble.Iterator
     707           2 :         for {
     708           2 :                 i, _ = r.NewIter(opts)
     709           2 :                 if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
     710           2 :                         break
     711             :                 }
     712             :                 // close this iter and retry NewIter
     713           0 :                 _ = i.Close()
     714             :         }
     715           2 :         t.setIter(o.iterID, i)
     716           2 : 
     717           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     718           2 :         // the user-provided bounds.
     719           2 :         if opts != nil {
     720           2 :                 rand.Read(opts.LowerBound[:])
     721           2 :                 rand.Read(opts.UpperBound[:])
     722           2 :         }
     723           2 :         h.Recordf("%s // %v", o, i.Error())
     724             : }
     725             : 
     726           2 : func (o *newIterOp) String() string {
     727           2 :         return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     728           2 :                 o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     729           2 : }
     730             : 
     731           2 : func (o *newIterOp) receiver() objID { return o.readerID }
     732           2 : func (o *newIterOp) syncObjs() objIDSlice {
     733           2 :         // Prevent o.iterID ops from running before it exists.
     734           2 :         objs := []objID{o.iterID}
     735           2 :         // If reading through a batch, the new iterator will also observe database
     736           2 :         // state, and we must synchronize on the database state for a consistent
     737           2 :         // view.
     738           2 :         if o.readerID.tag() == batchTag {
     739           1 :                 objs = append(objs, dbObjID)
     740           1 :         }
     741           2 :         return objs
     742             : }
     743             : 
     744             : // newIterUsingCloneOp models a Iterator.Clone operation.
     745             : type newIterUsingCloneOp struct {
     746             :         existingIterID objID
     747             :         iterID         objID
     748             :         refreshBatch   bool
     749             :         iterOpts
     750             : 
     751             :         // derivedReaderID is the ID of the underlying reader that backs both the
     752             :         // existing iterator and the new iterator. The derivedReaderID is NOT
     753             :         // serialized by String and is derived from other operations during parse.
     754             :         derivedReaderID objID
     755             : }
     756             : 
     757           2 : func (o *newIterUsingCloneOp) run(t *test, h historyRecorder) {
     758           2 :         iter := t.getIter(o.existingIterID)
     759           2 :         cloneOpts := pebble.CloneOptions{
     760           2 :                 IterOptions:      iterOptions(o.iterOpts),
     761           2 :                 RefreshBatchView: o.refreshBatch,
     762           2 :         }
     763           2 :         i, err := iter.iter.Clone(cloneOpts)
     764           2 :         if err != nil {
     765           0 :                 panic(err)
     766             :         }
     767           2 :         t.setIter(o.iterID, i)
     768           2 :         h.Recordf("%s // %v", o, i.Error())
     769             : }
     770             : 
     771           2 : func (o *newIterUsingCloneOp) String() string {
     772           2 :         return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     773           2 :                 o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
     774           2 :                 o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     775           2 : }
     776             : 
     777           2 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
     778             : 
     779           2 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
     780           2 :         objIDs := []objID{o.iterID}
     781           2 :         // If the underlying reader is a batch, we must synchronize with the batch.
     782           2 :         // If refreshBatch=true, synchronizing is necessary to observe all the
     783           2 :         // mutations up to until this op and no more. Even when refreshBatch=false,
     784           2 :         // we must synchronize because iterator construction may access state cached
     785           2 :         // on the indexed batch to avoid refragmenting range tombstones or range
     786           2 :         // keys.
     787           2 :         if o.derivedReaderID.tag() == batchTag {
     788           0 :                 objIDs = append(objIDs, o.derivedReaderID)
     789           0 :         }
     790           2 :         return objIDs
     791             : }
     792             : 
     793             : // iterSetBoundsOp models an Iterator.SetBounds operation.
     794             : type iterSetBoundsOp struct {
     795             :         iterID objID
     796             :         lower  []byte
     797             :         upper  []byte
     798             : }
     799             : 
     800           2 : func (o *iterSetBoundsOp) run(t *test, h historyRecorder) {
     801           2 :         i := t.getIter(o.iterID)
     802           2 :         var lower, upper []byte
     803           2 :         if o.lower != nil {
     804           2 :                 lower = append(lower, o.lower...)
     805           2 :         }
     806           2 :         if o.upper != nil {
     807           2 :                 upper = append(upper, o.upper...)
     808           2 :         }
     809           2 :         i.SetBounds(lower, upper)
     810           2 : 
     811           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     812           2 :         // the user-provided bounds.
     813           2 :         rand.Read(lower[:])
     814           2 :         rand.Read(upper[:])
     815           2 : 
     816           2 :         h.Recordf("%s // %v", o, i.Error())
     817             : }
     818             : 
     819           2 : func (o *iterSetBoundsOp) String() string {
     820           2 :         return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
     821           2 : }
     822             : 
     823           2 : func (o *iterSetBoundsOp) receiver() objID      { return o.iterID }
     824           2 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
     825             : 
     826             : // iterSetOptionsOp models an Iterator.SetOptions operation.
     827             : type iterSetOptionsOp struct {
     828             :         iterID objID
     829             :         iterOpts
     830             : 
     831             :         // derivedReaderID is the ID of the underlying reader that backs the
     832             :         // iterator. The derivedReaderID is NOT serialized by String and is derived
     833             :         // from other operations during parse.
     834             :         derivedReaderID objID
     835             : }
     836             : 
     837           2 : func (o *iterSetOptionsOp) run(t *test, h historyRecorder) {
     838           2 :         i := t.getIter(o.iterID)
     839           2 : 
     840           2 :         opts := iterOptions(o.iterOpts)
     841           2 :         if opts == nil {
     842           2 :                 opts = &pebble.IterOptions{}
     843           2 :         }
     844           2 :         i.SetOptions(opts)
     845           2 : 
     846           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
     847           2 :         // the user-provided bounds.
     848           2 :         rand.Read(opts.LowerBound[:])
     849           2 :         rand.Read(opts.UpperBound[:])
     850           2 : 
     851           2 :         h.Recordf("%s // %v", o, i.Error())
     852             : }
     853             : 
     854           2 : func (o *iterSetOptionsOp) String() string {
     855           2 :         return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
     856           2 :                 o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
     857           2 : }
     858             : 
     859           2 : func iterOptions(o iterOpts) *pebble.IterOptions {
     860           2 :         if o.IsZero() {
     861           2 :                 return nil
     862           2 :         }
     863           2 :         var lower, upper []byte
     864           2 :         if o.lower != nil {
     865           2 :                 lower = append(lower, o.lower...)
     866           2 :         }
     867           2 :         if o.upper != nil {
     868           2 :                 upper = append(upper, o.upper...)
     869           2 :         }
     870           2 :         opts := &pebble.IterOptions{
     871           2 :                 LowerBound: lower,
     872           2 :                 UpperBound: upper,
     873           2 :                 KeyTypes:   pebble.IterKeyType(o.keyTypes),
     874           2 :                 RangeKeyMasking: pebble.RangeKeyMasking{
     875           2 :                         Suffix: o.maskSuffix,
     876           2 :                 },
     877           2 :                 UseL6Filters: o.useL6Filters,
     878           2 :         }
     879           2 :         if opts.RangeKeyMasking.Suffix != nil {
     880           2 :                 opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
     881           2 :                         return sstable.NewTestKeysMaskingFilter()
     882           2 :                 }
     883             :         }
     884           2 :         if o.filterMax > 0 {
     885           2 :                 opts.PointKeyFilters = []pebble.BlockPropertyFilter{
     886           2 :                         sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
     887           2 :                 }
     888           2 :                 // Enforce the timestamp bounds in SkipPoint, so that the iterator never
     889           2 :                 // returns a key outside the filterMin, filterMax bounds. This provides
     890           2 :                 // deterministic iteration.
     891           2 :                 opts.SkipPoint = func(k []byte) (skip bool) {
     892           2 :                         n := testkeys.Comparer.Split(k)
     893           2 :                         if n == len(k) {
     894           2 :                                 // No suffix, don't skip it.
     895           2 :                                 return false
     896           2 :                         }
     897           2 :                         v, err := testkeys.ParseSuffix(k[n:])
     898           2 :                         if err != nil {
     899           0 :                                 panic(err)
     900             :                         }
     901           2 :                         ts := uint64(v)
     902           2 :                         return ts < o.filterMin || ts >= o.filterMax
     903             :                 }
     904             :         }
     905           2 :         return opts
     906             : }
     907             : 
     908           2 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
     909             : 
     910           2 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
     911           2 :         if o.derivedReaderID.tag() == batchTag {
     912           1 :                 // If the underlying reader is a batch, we must synchronize with the
     913           1 :                 // batch so that we observe all the mutations up until this operation
     914           1 :                 // and no more.
     915           1 :                 return []objID{o.derivedReaderID}
     916           1 :         }
     917           2 :         return nil
     918             : }
     919             : 
     920             : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
     921             : type iterSeekGEOp struct {
     922             :         iterID objID
     923             :         key    []byte
     924             :         limit  []byte
     925             : 
     926             :         derivedReaderID objID
     927             : }
     928             : 
     929           2 : func iteratorPos(i *retryableIter) string {
     930           2 :         var buf bytes.Buffer
     931           2 :         fmt.Fprintf(&buf, "%q", i.Key())
     932           2 :         hasPoint, hasRange := i.HasPointAndRange()
     933           2 :         if hasPoint {
     934           2 :                 fmt.Fprintf(&buf, ",%q", i.Value())
     935           2 :         } else {
     936           2 :                 fmt.Fprint(&buf, ",<no point>")
     937           2 :         }
     938           2 :         if hasRange {
     939           2 :                 start, end := i.RangeBounds()
     940           2 :                 fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
     941           2 :                 for i, rk := range i.RangeKeys() {
     942           2 :                         if i > 0 {
     943           2 :                                 fmt.Fprint(&buf, ",")
     944           2 :                         }
     945           2 :                         fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
     946             :                 }
     947           2 :                 fmt.Fprint(&buf, "}")
     948           2 :         } else {
     949           2 :                 fmt.Fprint(&buf, ",<no range>")
     950           2 :         }
     951           2 :         if i.RangeKeyChanged() {
     952           2 :                 fmt.Fprint(&buf, "*")
     953           2 :         }
     954           2 :         return buf.String()
     955             : }
     956             : 
     957           2 : func validBoolToStr(valid bool) string {
     958           2 :         return fmt.Sprintf("%t", valid)
     959           2 : }
     960             : 
     961           2 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
     962           2 :         // We can't distinguish between IterExhausted and IterAtLimit in a
     963           2 :         // deterministic manner.
     964           2 :         switch validity {
     965           2 :         case pebble.IterExhausted, pebble.IterAtLimit:
     966           2 :                 return false, "invalid"
     967           2 :         case pebble.IterValid:
     968           2 :                 return true, "valid"
     969           0 :         default:
     970           0 :                 panic("unknown validity")
     971             :         }
     972             : }
     973             : 
     974           2 : func (o *iterSeekGEOp) run(t *test, h historyRecorder) {
     975           2 :         i := t.getIter(o.iterID)
     976           2 :         var valid bool
     977           2 :         var validStr string
     978           2 :         if o.limit == nil {
     979           2 :                 valid = i.SeekGE(o.key)
     980           2 :                 validStr = validBoolToStr(valid)
     981           2 :         } else {
     982           2 :                 valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
     983           2 :         }
     984           2 :         if valid {
     985           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
     986           2 :         } else {
     987           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
     988           2 :         }
     989             : }
     990             : 
     991           2 : func (o *iterSeekGEOp) String() string {
     992           2 :         return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
     993           2 : }
     994           2 : func (o *iterSeekGEOp) receiver() objID      { return o.iterID }
     995           2 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
     996             : 
     997           2 : func onlyBatchIDs(ids ...objID) objIDSlice {
     998           2 :         var ret objIDSlice
     999           2 :         for _, id := range ids {
    1000           2 :                 if id.tag() == batchTag {
    1001           1 :                         ret = append(ret, id)
    1002           1 :                 }
    1003             :         }
    1004           2 :         return ret
    1005             : }
    1006             : 
    1007             : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
    1008             : type iterSeekPrefixGEOp struct {
    1009             :         iterID objID
    1010             :         key    []byte
    1011             : 
    1012             :         derivedReaderID objID
    1013             : }
    1014             : 
    1015           2 : func (o *iterSeekPrefixGEOp) run(t *test, h historyRecorder) {
    1016           2 :         i := t.getIter(o.iterID)
    1017           2 :         valid := i.SeekPrefixGE(o.key)
    1018           2 :         if valid {
    1019           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1020           2 :         } else {
    1021           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1022           2 :         }
    1023             : }
    1024             : 
    1025           2 : func (o *iterSeekPrefixGEOp) String() string {
    1026           2 :         return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
    1027           2 : }
    1028           2 : func (o *iterSeekPrefixGEOp) receiver() objID      { return o.iterID }
    1029           2 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1030             : 
    1031             : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
    1032             : type iterSeekLTOp struct {
    1033             :         iterID objID
    1034             :         key    []byte
    1035             :         limit  []byte
    1036             : 
    1037             :         derivedReaderID objID
    1038             : }
    1039             : 
    1040           2 : func (o *iterSeekLTOp) run(t *test, h historyRecorder) {
    1041           2 :         i := t.getIter(o.iterID)
    1042           2 :         var valid bool
    1043           2 :         var validStr string
    1044           2 :         if o.limit == nil {
    1045           2 :                 valid = i.SeekLT(o.key)
    1046           2 :                 validStr = validBoolToStr(valid)
    1047           2 :         } else {
    1048           2 :                 valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
    1049           2 :         }
    1050           2 :         if valid {
    1051           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1052           2 :         } else {
    1053           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1054           2 :         }
    1055             : }
    1056             : 
    1057           2 : func (o *iterSeekLTOp) String() string {
    1058           2 :         return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
    1059           2 : }
    1060             : 
    1061           2 : func (o *iterSeekLTOp) receiver() objID      { return o.iterID }
    1062           2 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1063             : 
    1064             : // iterFirstOp models an Iterator.First operation.
    1065             : type iterFirstOp struct {
    1066             :         iterID objID
    1067             : 
    1068             :         derivedReaderID objID
    1069             : }
    1070             : 
    1071           2 : func (o *iterFirstOp) run(t *test, h historyRecorder) {
    1072           2 :         i := t.getIter(o.iterID)
    1073           2 :         valid := i.First()
    1074           2 :         if valid {
    1075           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1076           2 :         } else {
    1077           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1078           2 :         }
    1079             : }
    1080             : 
    1081           2 : func (o *iterFirstOp) String() string       { return fmt.Sprintf("%s.First()", o.iterID) }
    1082           2 : func (o *iterFirstOp) receiver() objID      { return o.iterID }
    1083           2 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1084             : 
    1085             : // iterLastOp models an Iterator.Last operation.
    1086             : type iterLastOp struct {
    1087             :         iterID objID
    1088             : 
    1089             :         derivedReaderID objID
    1090             : }
    1091             : 
    1092           2 : func (o *iterLastOp) run(t *test, h historyRecorder) {
    1093           2 :         i := t.getIter(o.iterID)
    1094           2 :         valid := i.Last()
    1095           2 :         if valid {
    1096           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1097           2 :         } else {
    1098           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1099           2 :         }
    1100             : }
    1101             : 
    1102           2 : func (o *iterLastOp) String() string       { return fmt.Sprintf("%s.Last()", o.iterID) }
    1103           2 : func (o *iterLastOp) receiver() objID      { return o.iterID }
    1104           2 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1105             : 
    1106             : // iterNextOp models an Iterator.Next[WithLimit] operation.
    1107             : type iterNextOp struct {
    1108             :         iterID objID
    1109             :         limit  []byte
    1110             : 
    1111             :         derivedReaderID objID
    1112             : }
    1113             : 
    1114           2 : func (o *iterNextOp) run(t *test, h historyRecorder) {
    1115           2 :         i := t.getIter(o.iterID)
    1116           2 :         var valid bool
    1117           2 :         var validStr string
    1118           2 :         if o.limit == nil {
    1119           2 :                 valid = i.Next()
    1120           2 :                 validStr = validBoolToStr(valid)
    1121           2 :         } else {
    1122           2 :                 valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
    1123           2 :         }
    1124           2 :         if valid {
    1125           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1126           2 :         } else {
    1127           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1128           2 :         }
    1129             : }
    1130             : 
    1131           2 : func (o *iterNextOp) String() string       { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
    1132           2 : func (o *iterNextOp) receiver() objID      { return o.iterID }
    1133           2 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1134             : 
    1135             : // iterNextPrefixOp models an Iterator.NextPrefix operation.
    1136             : type iterNextPrefixOp struct {
    1137             :         iterID objID
    1138             : 
    1139             :         derivedReaderID objID
    1140             : }
    1141             : 
    1142           2 : func (o *iterNextPrefixOp) run(t *test, h historyRecorder) {
    1143           2 :         i := t.getIter(o.iterID)
    1144           2 :         valid := i.NextPrefix()
    1145           2 :         validStr := validBoolToStr(valid)
    1146           2 :         if valid {
    1147           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1148           2 :         } else {
    1149           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1150           2 :         }
    1151             : }
    1152             : 
    1153           2 : func (o *iterNextPrefixOp) String() string       { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
    1154           2 : func (o *iterNextPrefixOp) receiver() objID      { return o.iterID }
    1155           2 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1156             : 
    1157             : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
    1158             : type iterPrevOp struct {
    1159             :         iterID objID
    1160             :         limit  []byte
    1161             : 
    1162             :         derivedReaderID objID
    1163             : }
    1164             : 
    1165           2 : func (o *iterPrevOp) run(t *test, h historyRecorder) {
    1166           2 :         i := t.getIter(o.iterID)
    1167           2 :         var valid bool
    1168           2 :         var validStr string
    1169           2 :         if o.limit == nil {
    1170           2 :                 valid = i.Prev()
    1171           2 :                 validStr = validBoolToStr(valid)
    1172           2 :         } else {
    1173           2 :                 valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
    1174           2 :         }
    1175           2 :         if valid {
    1176           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1177           2 :         } else {
    1178           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1179           2 :         }
    1180             : }
    1181             : 
    1182           2 : func (o *iterPrevOp) String() string       { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
    1183           2 : func (o *iterPrevOp) receiver() objID      { return o.iterID }
    1184           2 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1185             : 
    1186             : // newSnapshotOp models a DB.NewSnapshot operation.
    1187             : type newSnapshotOp struct {
    1188             :         snapID objID
    1189             :         // If nonempty, this snapshot must not be used to read any keys outside of
    1190             :         // the provided bounds. This allows some implementations to use 'Eventually
    1191             :         // file-only snapshots,' which require bounds.
    1192             :         bounds []pebble.KeyRange
    1193             : }
    1194             : 
    1195           2 : func (o *newSnapshotOp) run(t *test, h historyRecorder) {
    1196           2 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
    1197           2 :         if len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1 {
    1198           1 :                 s := t.db.NewEventuallyFileOnlySnapshot(o.bounds)
    1199           1 :                 t.setSnapshot(o.snapID, s)
    1200           2 :         } else {
    1201           2 :                 s := t.db.NewSnapshot()
    1202           2 :                 t.setSnapshot(o.snapID, s)
    1203           2 :         }
    1204           2 :         h.Recordf("%s", o)
    1205             : }
    1206             : 
    1207           2 : func (o *newSnapshotOp) String() string {
    1208           2 :         var buf bytes.Buffer
    1209           2 :         fmt.Fprintf(&buf, "%s = db.NewSnapshot(", o.snapID)
    1210           2 :         for i := range o.bounds {
    1211           2 :                 if i > 0 {
    1212           2 :                         fmt.Fprint(&buf, ", ")
    1213           2 :                 }
    1214           2 :                 fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
    1215             :         }
    1216           2 :         fmt.Fprint(&buf, ")")
    1217           2 :         return buf.String()
    1218             : }
    1219           2 : func (o *newSnapshotOp) receiver() objID      { return dbObjID }
    1220           2 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
    1221             : 
    1222             : type dbRatchetFormatMajorVersionOp struct {
    1223             :         vers pebble.FormatMajorVersion
    1224             : }
    1225             : 
    1226           2 : func (o *dbRatchetFormatMajorVersionOp) run(t *test, h historyRecorder) {
    1227           2 :         var err error
    1228           2 :         // NB: We no-op the operation if we're already at or above the provided
    1229           2 :         // format major version. Different runs start at different format major
    1230           2 :         // versions, making the presence of an error and the error message itself
    1231           2 :         // non-deterministic if we attempt to upgrade to an older version.
    1232           2 :         //
    1233           2 :         //Regardless, subsequent operations should behave identically, which is what
    1234           2 :         //we're really aiming to test by including this format major version ratchet
    1235           2 :         //operation.
    1236           2 :         if t.db.FormatMajorVersion() < o.vers {
    1237           2 :                 err = t.db.RatchetFormatMajorVersion(o.vers)
    1238           2 :         }
    1239           2 :         h.Recordf("%s // %v", o, err)
    1240             : }
    1241             : 
    1242           2 : func (o *dbRatchetFormatMajorVersionOp) String() string {
    1243           2 :         return fmt.Sprintf("db.RatchetFormatMajorVersion(%s)", o.vers)
    1244           2 : }
    1245           2 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID      { return dbObjID }
    1246           2 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
    1247             : 
    1248             : type dbRestartOp struct{}
    1249             : 
    1250           2 : func (o *dbRestartOp) run(t *test, h historyRecorder) {
    1251           2 :         if err := t.restartDB(); err != nil {
    1252           0 :                 h.Recordf("%s // %v", o, err)
    1253           0 :                 h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
    1254           2 :         } else {
    1255           2 :                 h.Recordf("%s", o)
    1256           2 :         }
    1257             : }
    1258             : 
    1259           2 : func (o *dbRestartOp) String() string       { return "db.Restart()" }
    1260           2 : func (o *dbRestartOp) receiver() objID      { return dbObjID }
    1261           2 : func (o *dbRestartOp) syncObjs() objIDSlice { return nil }
    1262             : 
    1263           1 : func formatOps(ops []op) string {
    1264           1 :         var buf strings.Builder
    1265           1 :         for _, op := range ops {
    1266           1 :                 fmt.Fprintf(&buf, "%s\n", op)
    1267           1 :         }
    1268           1 :         return buf.String()
    1269             : }

Generated by: LCOV version 1.14