LCOV - code coverage report
Current view: top level - pebble/metamorphic - ops.go (source / functions) Hit Total Coverage
Test: 2024-01-19 08:16Z d4e33557 - tests only.lcov Lines: 800 1273 62.8 %
Date: 2024-01-19 08:16:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "crypto/rand"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "path"
      15             :         "path/filepath"
      16             :         "strings"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble"
      20             :         "github.com/cockroachdb/pebble/internal/base"
      21             :         "github.com/cockroachdb/pebble/internal/keyspan"
      22             :         "github.com/cockroachdb/pebble/internal/private"
      23             :         "github.com/cockroachdb/pebble/internal/rangekey"
      24             :         "github.com/cockroachdb/pebble/internal/testkeys"
      25             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      26             :         "github.com/cockroachdb/pebble/sstable"
      27             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      28             : )
      29             : 
      30             : // Ops holds a sequence of operations to be executed by the metamorphic tests.
      31             : type Ops []op
      32             : 
      33             : // op defines the interface for a single operation, such as creating a batch,
      34             : // or advancing an iterator.
      35             : type op interface {
      36             :         String() string
      37             : 
      38             :         run(t *Test, h historyRecorder)
      39             : 
      40             :         // receiver returns the object ID of the object the operation is performed
      41             :         // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
      42             :         // its receiver). Receivers are used for synchronization when running with
      43             :         // concurrency.
      44             :         receiver() objID
      45             : 
      46             :         // syncObjs returns an additional set of object IDs—excluding the
      47             :         // receiver—that the operation must synchronize with. At execution time,
      48             :         // the operation will run serially with respect to all other operations
      49             :         // that return these objects from their own syncObjs or receiver methods.
      50             :         syncObjs() objIDSlice
      51             : 
      52             :         // keys returns all user keys used by the operation, as pointers to slices.
      53             :         // The caller can then modify these slices to rewrite the keys.
      54             :         //
      55             :         // Used for simplification of operations for easier investigations.
      56             :         keys() []*[]byte
      57             : 
      58             :         // diagramKeyRanges() returns key spans associated with this operation, to be
      59             :         // shown on an ASCII diagram of operations.
      60             :         diagramKeyRanges() []pebble.KeyRange
      61             : }
      62             : 
      63             : // initOp performs test initialization
      64             : type initOp struct {
      65             :         dbSlots       uint32
      66             :         batchSlots    uint32
      67             :         iterSlots     uint32
      68             :         snapshotSlots uint32
      69             : }
      70             : 
      71           1 : func (o *initOp) run(t *Test, h historyRecorder) {
      72           1 :         t.batches = make([]*pebble.Batch, o.batchSlots)
      73           1 :         t.iters = make([]*retryableIter, o.iterSlots)
      74           1 :         t.snapshots = make([]readerCloser, o.snapshotSlots)
      75           1 :         h.Recordf("%s", o)
      76           1 : }
      77             : 
      78           1 : func (o *initOp) String() string {
      79           1 :         return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */)",
      80           1 :                 o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots)
      81           1 : }
      82             : 
      83           1 : func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
      84           1 : func (o *initOp) syncObjs() objIDSlice {
      85           1 :         syncObjs := make([]objID, 0)
      86           1 :         // Add any additional DBs to syncObjs.
      87           1 :         for i := uint32(2); i < o.dbSlots+1; i++ {
      88           0 :                 syncObjs = append(syncObjs, makeObjID(dbTag, i))
      89           0 :         }
      90           1 :         return syncObjs
      91             : }
      92             : 
      93           0 : func (o *initOp) keys() []*[]byte                     { return nil }
      94           1 : func (o *initOp) diagramKeyRanges() []pebble.KeyRange { return nil }
      95             : 
      96             : // applyOp models a Writer.Apply operation.
      97             : type applyOp struct {
      98             :         writerID objID
      99             :         batchID  objID
     100             : }
     101             : 
     102           1 : func (o *applyOp) run(t *Test, h historyRecorder) {
     103           1 :         b := t.getBatch(o.batchID)
     104           1 :         w := t.getWriter(o.writerID)
     105           1 :         var err error
     106           1 :         if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
     107           0 :                 err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
     108           0 :                 if err == nil {
     109           0 :                         err = b.SyncWait()
     110           0 :                 }
     111           1 :         } else {
     112           1 :                 err = w.Apply(b, t.writeOpts)
     113           1 :         }
     114           1 :         h.Recordf("%s // %v", o, err)
     115             :         // batch will be closed by a closeOp which is guaranteed to be generated
     116             : }
     117             : 
     118           1 : func (o *applyOp) String() string  { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
     119           1 : func (o *applyOp) receiver() objID { return o.writerID }
     120           1 : func (o *applyOp) syncObjs() objIDSlice {
     121           1 :         // Apply should not be concurrent with operations that are mutating the
     122           1 :         // batch.
     123           1 :         return []objID{o.batchID}
     124           1 : }
     125             : 
     126           0 : func (o *applyOp) keys() []*[]byte                     { return nil }
     127           0 : func (o *applyOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     128             : 
     129             : // checkpointOp models a DB.Checkpoint operation.
     130             : type checkpointOp struct {
     131             :         dbID objID
     132             :         // If non-empty, the checkpoint is restricted to these spans.
     133             :         spans []pebble.CheckpointSpan
     134             : }
     135             : 
     136           1 : func (o *checkpointOp) run(t *Test, h historyRecorder) {
     137           1 :         // TODO(josh): db.Checkpoint does not work with shared storage yet.
     138           1 :         // It would be better to filter out ahead of calling run on the op,
     139           1 :         // by setting the weight that generator.go uses to zero, or similar.
     140           1 :         // But IIUC the ops are shared for ALL the metamorphic test runs, so
     141           1 :         // not sure how to do that easily:
     142           1 :         // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
     143           1 :         if t.testOpts.sharedStorageEnabled {
     144           0 :                 h.Recordf("%s // %v", o, nil)
     145           0 :                 return
     146           0 :         }
     147           1 :         var opts []pebble.CheckpointOption
     148           1 :         if len(o.spans) > 0 {
     149           1 :                 opts = append(opts, pebble.WithRestrictToSpans(o.spans))
     150           1 :         }
     151           1 :         db := t.getDB(o.dbID)
     152           1 :         err := t.withRetries(func() error {
     153           1 :                 return db.Checkpoint(o.dir(t.dir, h.op), opts...)
     154           1 :         })
     155           1 :         h.Recordf("%s // %v", o, err)
     156             : }
     157             : 
     158           1 : func (o *checkpointOp) dir(dataDir string, idx int) string {
     159           1 :         return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
     160           1 : }
     161             : 
     162           1 : func (o *checkpointOp) String() string {
     163           1 :         var spanStr bytes.Buffer
     164           1 :         for i, span := range o.spans {
     165           1 :                 if i > 0 {
     166           1 :                         spanStr.WriteString(",")
     167           1 :                 }
     168           1 :                 fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
     169             :         }
     170           1 :         return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
     171             : }
     172             : 
     173           1 : func (o *checkpointOp) receiver() objID      { return o.dbID }
     174           1 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
     175             : 
     176           0 : func (o *checkpointOp) keys() []*[]byte {
     177           0 :         var res []*[]byte
     178           0 :         for i := range o.spans {
     179           0 :                 res = append(res, &o.spans[i].Start, &o.spans[i].End)
     180           0 :         }
     181           0 :         return res
     182             : }
     183             : 
     184           0 : func (o *checkpointOp) diagramKeyRanges() []pebble.KeyRange {
     185           0 :         var res []pebble.KeyRange
     186           0 :         for i := range o.spans {
     187           0 :                 res = append(res, pebble.KeyRange{
     188           0 :                         Start: o.spans[i].Start,
     189           0 :                         End:   o.spans[i].End,
     190           0 :                 })
     191           0 :         }
     192           0 :         return res
     193             : }
     194             : 
     195             : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
     196             : type closeOp struct {
     197             :         objID objID
     198             : 
     199             :         // affectedObjects is the list of additional objects that are affected by this
     200             :         // operation, and which syncObjs() must return so that we don't perform the
     201             :         // close in parallel with other operations to affected objects.
     202             :         affectedObjects []objID
     203             : }
     204             : 
     205           1 : func (o *closeOp) run(t *Test, h historyRecorder) {
     206           1 :         c := t.getCloser(o.objID)
     207           1 :         if o.objID.tag() == dbTag && t.opts.DisableWAL {
     208           1 :                 // Special case: If WAL is disabled, do a flush right before DB Close. This
     209           1 :                 // allows us to reuse this run's data directory as initial state for
     210           1 :                 // future runs without losing any mutations.
     211           1 :                 _ = t.getDB(o.objID).Flush()
     212           1 :         }
     213           1 :         t.clearObj(o.objID)
     214           1 :         err := c.Close()
     215           1 :         h.Recordf("%s // %v", o, err)
     216             : }
     217             : 
     218           1 : func (o *closeOp) String() string  { return fmt.Sprintf("%s.Close()", o.objID) }
     219           1 : func (o *closeOp) receiver() objID { return o.objID }
     220           1 : func (o *closeOp) syncObjs() objIDSlice {
     221           1 :         return o.affectedObjects
     222           1 : }
     223             : 
     224           0 : func (o *closeOp) keys() []*[]byte                     { return nil }
     225           0 : func (o *closeOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     226             : 
     227             : // compactOp models a DB.Compact operation.
     228             : type compactOp struct {
     229             :         dbID        objID
     230             :         start       []byte
     231             :         end         []byte
     232             :         parallelize bool
     233             : }
     234             : 
     235           1 : func (o *compactOp) run(t *Test, h historyRecorder) {
     236           1 :         err := t.withRetries(func() error {
     237           1 :                 return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
     238           1 :         })
     239           1 :         h.Recordf("%s // %v", o, err)
     240             : }
     241             : 
     242           1 : func (o *compactOp) String() string {
     243           1 :         return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)", o.dbID, o.start, o.end, o.parallelize)
     244           1 : }
     245             : 
     246           1 : func (o *compactOp) receiver() objID      { return o.dbID }
     247           1 : func (o *compactOp) syncObjs() objIDSlice { return nil }
     248             : 
     249           1 : func (o *compactOp) keys() []*[]byte {
     250           1 :         return []*[]byte{&o.start, &o.end}
     251           1 : }
     252             : 
     253           1 : func (o *compactOp) diagramKeyRanges() []pebble.KeyRange {
     254           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     255           1 : }
     256             : 
     257             : // deleteOp models a Write.Delete operation.
     258             : type deleteOp struct {
     259             :         writerID objID
     260             :         key      []byte
     261             : 
     262             :         derivedDBID objID
     263             : }
     264             : 
     265           1 : func (o *deleteOp) run(t *Test, h historyRecorder) {
     266           1 :         w := t.getWriter(o.writerID)
     267           1 :         var err error
     268           1 :         if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
     269           0 :                 // Call DeleteSized with a deterministic size derived from the index.
     270           0 :                 // The size does not need to be accurate for correctness.
     271           0 :                 err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
     272           1 :         } else {
     273           1 :                 err = w.Delete(o.key, t.writeOpts)
     274           1 :         }
     275           1 :         h.Recordf("%s // %v", o, err)
     276             : }
     277             : 
     278           0 : func hashSize(index int) uint32 {
     279           0 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
     280           0 :         return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
     281           0 : }
     282             : 
     283           1 : func (o *deleteOp) String() string {
     284           1 :         return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
     285           1 : }
     286           1 : func (o *deleteOp) receiver() objID      { return o.writerID }
     287           1 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
     288             : 
     289           0 : func (o *deleteOp) keys() []*[]byte {
     290           0 :         return []*[]byte{&o.key}
     291           0 : }
     292             : 
     293           0 : func (o *deleteOp) diagramKeyRanges() []pebble.KeyRange {
     294           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     295           0 : }
     296             : 
     297             : // singleDeleteOp models a Write.SingleDelete operation.
     298             : type singleDeleteOp struct {
     299             :         writerID           objID
     300             :         key                []byte
     301             :         maybeReplaceDelete bool
     302             : }
     303             : 
     304           1 : func (o *singleDeleteOp) run(t *Test, h historyRecorder) {
     305           1 :         w := t.getWriter(o.writerID)
     306           1 :         var err error
     307           1 :         if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
     308           1 :                 err = w.Delete(o.key, t.writeOpts)
     309           1 :         } else {
     310           1 :                 err = w.SingleDelete(o.key, t.writeOpts)
     311           1 :         }
     312             :         // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
     313             :         // write the former to the history log. The log line will indicate whether
     314             :         // or not the delete *could* have been replaced. The OPTIONS file should
     315             :         // also be consulted to determine what happened at runtime (i.e. by taking
     316             :         // the logical AND).
     317           1 :         h.Recordf("%s // %v", o, err)
     318             : }
     319             : 
     320           1 : func (o *singleDeleteOp) String() string {
     321           1 :         return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
     322           1 : }
     323             : 
     324           1 : func (o *singleDeleteOp) receiver() objID      { return o.writerID }
     325           1 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
     326             : 
     327           0 : func (o *singleDeleteOp) keys() []*[]byte {
     328           0 :         return []*[]byte{&o.key}
     329           0 : }
     330             : 
     331           0 : func (o *singleDeleteOp) diagramKeyRanges() []pebble.KeyRange {
     332           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     333           0 : }
     334             : 
     335             : // deleteRangeOp models a Write.DeleteRange operation.
     336             : type deleteRangeOp struct {
     337             :         writerID objID
     338             :         start    []byte
     339             :         end      []byte
     340             : }
     341             : 
     342           1 : func (o *deleteRangeOp) run(t *Test, h historyRecorder) {
     343           1 :         w := t.getWriter(o.writerID)
     344           1 :         err := w.DeleteRange(o.start, o.end, t.writeOpts)
     345           1 :         h.Recordf("%s // %v", o, err)
     346           1 : }
     347             : 
     348           1 : func (o *deleteRangeOp) String() string {
     349           1 :         return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
     350           1 : }
     351             : 
     352           1 : func (o *deleteRangeOp) receiver() objID      { return o.writerID }
     353           1 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
     354             : 
     355           0 : func (o *deleteRangeOp) keys() []*[]byte {
     356           0 :         return []*[]byte{&o.start, &o.end}
     357           0 : }
     358             : 
     359           0 : func (o *deleteRangeOp) diagramKeyRanges() []pebble.KeyRange {
     360           0 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     361           0 : }
     362             : 
     363             : // flushOp models a DB.Flush operation.
     364             : type flushOp struct {
     365             :         db objID
     366             : }
     367             : 
     368           1 : func (o *flushOp) run(t *Test, h historyRecorder) {
     369           1 :         db := t.getDB(o.db)
     370           1 :         err := db.Flush()
     371           1 :         h.Recordf("%s // %v", o, err)
     372           1 : }
     373             : 
     374           1 : func (o *flushOp) String() string                      { return fmt.Sprintf("%s.Flush()", o.db) }
     375           1 : func (o *flushOp) receiver() objID                     { return o.db }
     376           1 : func (o *flushOp) syncObjs() objIDSlice                { return nil }
     377           0 : func (o *flushOp) keys() []*[]byte                     { return nil }
     378           0 : func (o *flushOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     379             : 
     380             : // mergeOp models a Write.Merge operation.
     381             : type mergeOp struct {
     382             :         writerID objID
     383             :         key      []byte
     384             :         value    []byte
     385             : }
     386             : 
     387           1 : func (o *mergeOp) run(t *Test, h historyRecorder) {
     388           1 :         w := t.getWriter(o.writerID)
     389           1 :         err := w.Merge(o.key, o.value, t.writeOpts)
     390           1 :         h.Recordf("%s // %v", o, err)
     391           1 : }
     392             : 
     393           1 : func (o *mergeOp) String() string       { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
     394           1 : func (o *mergeOp) receiver() objID      { return o.writerID }
     395           1 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
     396             : 
     397           0 : func (o *mergeOp) keys() []*[]byte {
     398           0 :         return []*[]byte{&o.key}
     399           0 : }
     400             : 
     401           0 : func (o *mergeOp) diagramKeyRanges() []pebble.KeyRange {
     402           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     403           0 : }
     404             : 
     405             : // setOp models a Write.Set operation.
     406             : type setOp struct {
     407             :         writerID objID
     408             :         key      []byte
     409             :         value    []byte
     410             : }
     411             : 
     412           1 : func (o *setOp) run(t *Test, h historyRecorder) {
     413           1 :         w := t.getWriter(o.writerID)
     414           1 :         err := w.Set(o.key, o.value, t.writeOpts)
     415           1 :         h.Recordf("%s // %v", o, err)
     416           1 : }
     417             : 
     418           1 : func (o *setOp) String() string       { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
     419           1 : func (o *setOp) receiver() objID      { return o.writerID }
     420           1 : func (o *setOp) syncObjs() objIDSlice { return nil }
     421             : 
     422           0 : func (o *setOp) keys() []*[]byte {
     423           0 :         return []*[]byte{&o.key}
     424           0 : }
     425             : 
     426           0 : func (o *setOp) diagramKeyRanges() []pebble.KeyRange {
     427           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     428           0 : }
     429             : 
     430             : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
     431             : type rangeKeyDeleteOp struct {
     432             :         writerID objID
     433             :         start    []byte
     434             :         end      []byte
     435             : }
     436             : 
     437           1 : func (o *rangeKeyDeleteOp) run(t *Test, h historyRecorder) {
     438           1 :         w := t.getWriter(o.writerID)
     439           1 :         err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
     440           1 :         h.Recordf("%s // %v", o, err)
     441           1 : }
     442             : 
     443           1 : func (o *rangeKeyDeleteOp) String() string {
     444           1 :         return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
     445           1 : }
     446             : 
     447           1 : func (o *rangeKeyDeleteOp) receiver() objID      { return o.writerID }
     448           1 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
     449             : 
     450           0 : func (o *rangeKeyDeleteOp) keys() []*[]byte {
     451           0 :         return []*[]byte{&o.start, &o.end}
     452           0 : }
     453             : 
     454           1 : func (o *rangeKeyDeleteOp) diagramKeyRanges() []pebble.KeyRange {
     455           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     456           1 : }
     457             : 
     458             : // rangeKeySetOp models a Write.RangeKeySet operation.
     459             : type rangeKeySetOp struct {
     460             :         writerID objID
     461             :         start    []byte
     462             :         end      []byte
     463             :         suffix   []byte
     464             :         value    []byte
     465             : }
     466             : 
     467           1 : func (o *rangeKeySetOp) run(t *Test, h historyRecorder) {
     468           1 :         w := t.getWriter(o.writerID)
     469           1 :         err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
     470           1 :         h.Recordf("%s // %v", o, err)
     471           1 : }
     472             : 
     473           1 : func (o *rangeKeySetOp) String() string {
     474           1 :         return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
     475           1 :                 o.writerID, o.start, o.end, o.suffix, o.value)
     476           1 : }
     477             : 
     478           1 : func (o *rangeKeySetOp) receiver() objID      { return o.writerID }
     479           1 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
     480             : 
     481           1 : func (o *rangeKeySetOp) keys() []*[]byte {
     482           1 :         return []*[]byte{&o.start, &o.end}
     483           1 : }
     484             : 
     485           1 : func (o *rangeKeySetOp) diagramKeyRanges() []pebble.KeyRange {
     486           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     487           1 : }
     488             : 
     489             : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
     490             : type rangeKeyUnsetOp struct {
     491             :         writerID objID
     492             :         start    []byte
     493             :         end      []byte
     494             :         suffix   []byte
     495             : }
     496             : 
     497           1 : func (o *rangeKeyUnsetOp) run(t *Test, h historyRecorder) {
     498           1 :         w := t.getWriter(o.writerID)
     499           1 :         err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
     500           1 :         h.Recordf("%s // %v", o, err)
     501           1 : }
     502             : 
     503           1 : func (o *rangeKeyUnsetOp) String() string {
     504           1 :         return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
     505           1 :                 o.writerID, o.start, o.end, o.suffix)
     506           1 : }
     507             : 
     508           1 : func (o *rangeKeyUnsetOp) receiver() objID      { return o.writerID }
     509           1 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
     510             : 
     511           0 : func (o *rangeKeyUnsetOp) keys() []*[]byte {
     512           0 :         return []*[]byte{&o.start, &o.end}
     513           0 : }
     514             : 
     515           0 : func (o *rangeKeyUnsetOp) diagramKeyRanges() []pebble.KeyRange {
     516           0 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     517           0 : }
     518             : 
     519             : // newBatchOp models a Write.NewBatch operation.
     520             : type newBatchOp struct {
     521             :         dbID    objID
     522             :         batchID objID
     523             : }
     524             : 
     525           1 : func (o *newBatchOp) run(t *Test, h historyRecorder) {
     526           1 :         b := t.getDB(o.dbID).NewBatch()
     527           1 :         t.setBatch(o.batchID, b)
     528           1 :         h.Recordf("%s", o)
     529           1 : }
     530             : 
     531           1 : func (o *newBatchOp) String() string  { return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID) }
     532           1 : func (o *newBatchOp) receiver() objID { return o.dbID }
     533           1 : func (o *newBatchOp) syncObjs() objIDSlice {
     534           1 :         // NewBatch should not be concurrent with operations that interact with that
     535           1 :         // same batch.
     536           1 :         return []objID{o.batchID}
     537           1 : }
     538             : 
     539           0 : func (o *newBatchOp) keys() []*[]byte                     { return nil }
     540           0 : func (o *newBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     541             : 
     542             : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
     543             : type newIndexedBatchOp struct {
     544             :         dbID    objID
     545             :         batchID objID
     546             : }
     547             : 
     548           1 : func (o *newIndexedBatchOp) run(t *Test, h historyRecorder) {
     549           1 :         b := t.getDB(o.dbID).NewIndexedBatch()
     550           1 :         t.setBatch(o.batchID, b)
     551           1 :         h.Recordf("%s", o)
     552           1 : }
     553             : 
     554           1 : func (o *newIndexedBatchOp) String() string {
     555           1 :         return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
     556           1 : }
     557           1 : func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
     558           1 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
     559           1 :         // NewIndexedBatch should not be concurrent with operations that interact
     560           1 :         // with that same batch.
     561           1 :         return []objID{o.batchID}
     562           1 : }
     563             : 
     564           0 : func (o *newIndexedBatchOp) keys() []*[]byte                     { return nil }
     565           0 : func (o *newIndexedBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     566             : 
     567             : // batchCommitOp models a Batch.Commit operation.
     568             : type batchCommitOp struct {
     569             :         dbID    objID
     570             :         batchID objID
     571             : }
     572             : 
     573           1 : func (o *batchCommitOp) run(t *Test, h historyRecorder) {
     574           1 :         b := t.getBatch(o.batchID)
     575           1 :         err := b.Commit(t.writeOpts)
     576           1 :         h.Recordf("%s // %v", o, err)
     577           1 : }
     578             : 
     579           1 : func (o *batchCommitOp) String() string  { return fmt.Sprintf("%s.Commit()", o.batchID) }
     580           1 : func (o *batchCommitOp) receiver() objID { return o.batchID }
     581           1 : func (o *batchCommitOp) syncObjs() objIDSlice {
     582           1 :         // Synchronize on the database so that NewIters wait for the commit.
     583           1 :         return []objID{o.dbID}
     584           1 : }
     585             : 
     586           0 : func (o *batchCommitOp) keys() []*[]byte                     { return nil }
     587           0 : func (o *batchCommitOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     588             : 
     589             : // ingestOp models a DB.Ingest operation.
     590             : type ingestOp struct {
     591             :         dbID     objID
     592             :         batchIDs []objID
     593             : 
     594             :         derivedDBIDs []objID
     595             : }
     596             : 
     597           1 : func (o *ingestOp) run(t *Test, h historyRecorder) {
     598           1 :         // We can only use apply as an alternative for ingestion if we are ingesting
     599           1 :         // a single batch. If we are ingesting multiple batches, the batches may
     600           1 :         // overlap which would cause ingestion to fail but apply would succeed.
     601           1 :         if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
     602           0 :                 id := o.batchIDs[0]
     603           0 :                 b := t.getBatch(id)
     604           0 :                 iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     605           0 :                 db := t.getDB(o.dbID)
     606           0 :                 c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
     607           0 :                 if err == nil {
     608           0 :                         err = db.Apply(c, t.writeOpts)
     609           0 :                 }
     610           0 :                 _ = b.Close()
     611           0 :                 _ = c.Close()
     612           0 :                 t.clearObj(id)
     613           0 :                 h.Recordf("%s // %v", o, err)
     614           0 :                 return
     615             :         }
     616             : 
     617           1 :         var paths []string
     618           1 :         var err error
     619           1 :         for i, id := range o.batchIDs {
     620           1 :                 b := t.getBatch(id)
     621           1 :                 t.clearObj(id)
     622           1 :                 path, err2 := o.build(t, h, b, i)
     623           1 :                 if err2 != nil {
     624           0 :                         h.Recordf("Build(%s) // %v", id, err2)
     625           0 :                 }
     626           1 :                 err = firstError(err, err2)
     627           1 :                 if err2 == nil {
     628           1 :                         paths = append(paths, path)
     629           1 :                 }
     630           1 :                 err = firstError(err, b.Close())
     631             :         }
     632             : 
     633           1 :         err = firstError(err, t.withRetries(func() error {
     634           1 :                 return t.getDB(o.dbID).Ingest(paths)
     635           1 :         }))
     636             : 
     637           1 :         h.Recordf("%s // %v", o, err)
     638             : }
     639             : 
     640             : func buildForIngest(
     641             :         t *Test, dbID objID, h historyRecorder, b *pebble.Batch, i int,
     642           1 : ) (string, *sstable.WriterMetadata, error) {
     643           1 :         path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
     644           1 :         f, err := t.opts.FS.Create(path)
     645           1 :         if err != nil {
     646           0 :                 return "", nil, err
     647           0 :         }
     648           1 :         db := t.getDB(dbID)
     649           1 : 
     650           1 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     651           1 :         defer closeIters(iter, rangeDelIter, rangeKeyIter)
     652           1 : 
     653           1 :         equal := t.opts.Comparer.Equal
     654           1 :         tableFormat := db.FormatMajorVersion().MaxTableFormat()
     655           1 :         wOpts := t.opts.MakeWriterOptions(0, tableFormat)
     656           1 :         if t.testOpts.disableValueBlocksForIngestSSTables {
     657           1 :                 wOpts.DisableValueBlocks = true
     658           1 :         }
     659           1 :         w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), wOpts)
     660           1 : 
     661           1 :         var lastUserKey []byte
     662           1 :         for key, value := iter.First(); key != nil; key, value = iter.Next() {
     663           1 :                 // Ignore duplicate keys.
     664           1 :                 if equal(lastUserKey, key.UserKey) {
     665           1 :                         continue
     666             :                 }
     667             :                 // NB: We don't have to copy the key or value since we're reading from a
     668             :                 // batch which doesn't do prefix compression.
     669           1 :                 lastUserKey = key.UserKey
     670           1 : 
     671           1 :                 key.SetSeqNum(base.SeqNumZero)
     672           1 :                 // It's possible that we wrote the key on a batch from a db that supported
     673           1 :                 // DeleteSized, but are now ingesting into a db that does not. Detect
     674           1 :                 // this case and translate the key to an InternalKeyKindDelete.
     675           1 :                 if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(dbID, pebble.FormatDeleteSizedAndObsolete) {
     676           0 :                         value = pebble.LazyValue{}
     677           0 :                         key.SetKind(pebble.InternalKeyKindDelete)
     678           0 :                 }
     679           1 :                 if err := w.Add(*key, value.InPlaceValue()); err != nil {
     680           0 :                         return "", nil, err
     681           0 :                 }
     682             :         }
     683           1 :         if err := iter.Close(); err != nil {
     684           0 :                 return "", nil, err
     685           0 :         }
     686           1 :         iter = nil
     687           1 : 
     688           1 :         if rangeDelIter != nil {
     689           1 :                 // NB: The range tombstones have already been fragmented by the Batch.
     690           1 :                 t, err := rangeDelIter.First()
     691           1 :                 for ; t != nil; t, err = rangeDelIter.Next() {
     692           1 :                         // NB: We don't have to copy the key or value since we're reading from a
     693           1 :                         // batch which doesn't do prefix compression.
     694           1 :                         if err := w.DeleteRange(t.Start, t.End); err != nil {
     695           0 :                                 return "", nil, err
     696           0 :                         }
     697             :                 }
     698           1 :                 if err != nil {
     699           0 :                         return "", nil, err
     700           0 :                 }
     701           1 :                 if err := rangeDelIter.Close(); err != nil {
     702           0 :                         return "", nil, err
     703           0 :                 }
     704           1 :                 rangeDelIter = nil
     705             :         }
     706             : 
     707           1 :         if rangeKeyIter != nil {
     708           1 :                 span, err := rangeKeyIter.First()
     709           1 :                 for ; span != nil; span, err = rangeKeyIter.Next() {
     710           1 :                         // Coalesce the keys of this span and then zero the sequence
     711           1 :                         // numbers. This is necessary in order to make the range keys within
     712           1 :                         // the ingested sstable internally consistent at the sequence number
     713           1 :                         // it's ingested at. The individual keys within a batch are
     714           1 :                         // committed at unique sequence numbers, whereas all the keys of an
     715           1 :                         // ingested sstable are given the same sequence number. A span
     716           1 :                         // contaning keys that both set and unset the same suffix at the
     717           1 :                         // same sequence number is nonsensical, so we "coalesce" or collapse
     718           1 :                         // the keys.
     719           1 :                         collapsed := keyspan.Span{
     720           1 :                                 Start: span.Start,
     721           1 :                                 End:   span.End,
     722           1 :                                 Keys:  make([]keyspan.Key, 0, len(span.Keys)),
     723           1 :                         }
     724           1 :                         err = rangekey.Coalesce(t.opts.Comparer.Compare, equal, span.Keys, &collapsed.Keys)
     725           1 :                         if err != nil {
     726           0 :                                 return "", nil, err
     727           0 :                         }
     728           1 :                         for i := range collapsed.Keys {
     729           1 :                                 collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
     730           1 :                         }
     731           1 :                         keyspan.SortKeysByTrailer(&collapsed.Keys)
     732           1 :                         if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
     733           0 :                                 return "", nil, err
     734           0 :                         }
     735             :                 }
     736           1 :                 if err != nil {
     737           0 :                         return "", nil, err
     738           0 :                 }
     739           1 :                 if err := rangeKeyIter.Close(); err != nil {
     740           0 :                         return "", nil, err
     741           0 :                 }
     742           1 :                 rangeKeyIter = nil
     743             :         }
     744             : 
     745           1 :         if err := w.Close(); err != nil {
     746           0 :                 return "", nil, err
     747           0 :         }
     748           1 :         meta, err := w.Metadata()
     749           1 :         return path, meta, err
     750             : }
     751             : 
     752           1 : func (o *ingestOp) build(t *Test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
     753           1 :         path, _, err := buildForIngest(t, o.dbID, h, b, i)
     754           1 :         return path, err
     755           1 : }
     756             : 
     757           1 : func (o *ingestOp) receiver() objID { return o.dbID }
     758           1 : func (o *ingestOp) syncObjs() objIDSlice {
     759           1 :         // Ingest should not be concurrent with mutating the batches that will be
     760           1 :         // ingested as sstables.
     761           1 :         objs := make([]objID, 0, len(o.batchIDs)+1)
     762           1 :         objs = append(objs, o.batchIDs...)
     763           1 :         addedDBs := make(map[objID]struct{})
     764           1 :         for i := range o.derivedDBIDs {
     765           1 :                 _, ok := addedDBs[o.derivedDBIDs[i]]
     766           1 :                 if !ok && o.derivedDBIDs[i] != o.dbID {
     767           0 :                         objs = append(objs, o.derivedDBIDs[i])
     768           0 :                         addedDBs[o.derivedDBIDs[i]] = struct{}{}
     769           0 :                 }
     770             :         }
     771           1 :         return objs
     772             : }
     773             : 
     774             : func closeIters(
     775             :         pointIter base.InternalIterator,
     776             :         rangeDelIter keyspan.FragmentIterator,
     777             :         rangeKeyIter keyspan.FragmentIterator,
     778           1 : ) {
     779           1 :         if pointIter != nil {
     780           1 :                 pointIter.Close()
     781           1 :         }
     782           1 :         if rangeDelIter != nil {
     783           1 :                 rangeDelIter.Close()
     784           1 :         }
     785           1 :         if rangeKeyIter != nil {
     786           1 :                 rangeKeyIter.Close()
     787           1 :         }
     788             : }
     789             : 
     790             : // collapseBatch collapses the mutations in a batch to be equivalent to an
     791             : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
     792             : // so that only the latest update is performed. All range deletions are
     793             : // performed first in the batch to match the semantics of ingestion where a
     794             : // range deletion does not delete a point record contained in the sstable.
     795             : func (o *ingestOp) collapseBatch(
     796             :         t *Test,
     797             :         db *pebble.DB,
     798             :         pointIter base.InternalIterator,
     799             :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     800             :         b *pebble.Batch,
     801           0 : ) (*pebble.Batch, error) {
     802           0 :         defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
     803           0 :         equal := t.opts.Comparer.Equal
     804           0 :         collapsed := db.NewBatch()
     805           0 : 
     806           0 :         if rangeDelIter != nil {
     807           0 :                 // NB: The range tombstones have already been fragmented by the Batch.
     808           0 :                 t, err := rangeDelIter.First()
     809           0 :                 for ; t != nil; t, err = rangeDelIter.Next() {
     810           0 :                         // NB: We don't have to copy the key or value since we're reading from a
     811           0 :                         // batch which doesn't do prefix compression.
     812           0 :                         if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
     813           0 :                                 return nil, err
     814           0 :                         }
     815             :                 }
     816           0 :                 if err != nil {
     817           0 :                         return nil, err
     818           0 :                 }
     819           0 :                 if err := rangeDelIter.Close(); err != nil {
     820           0 :                         return nil, err
     821           0 :                 }
     822           0 :                 rangeDelIter = nil
     823             :         }
     824             : 
     825           0 :         if pointIter != nil {
     826           0 :                 var lastUserKey []byte
     827           0 :                 for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
     828           0 :                         // Ignore duplicate keys.
     829           0 :                         //
     830           0 :                         // Note: this is necessary due to MERGE keys, otherwise it would be
     831           0 :                         // fine to include all the keys in the batch and let the normal
     832           0 :                         // sequence number precedence determine which of the keys "wins".
     833           0 :                         // But the code to build the ingested sstable will only keep the
     834           0 :                         // most recent internal key and will not merge across internal keys.
     835           0 :                         if equal(lastUserKey, key.UserKey) {
     836           0 :                                 continue
     837             :                         }
     838             :                         // NB: We don't have to copy the key or value since we're reading from a
     839             :                         // batch which doesn't do prefix compression.
     840           0 :                         lastUserKey = key.UserKey
     841           0 : 
     842           0 :                         var err error
     843           0 :                         switch key.Kind() {
     844           0 :                         case pebble.InternalKeyKindDelete:
     845           0 :                                 err = collapsed.Delete(key.UserKey, nil)
     846           0 :                         case pebble.InternalKeyKindDeleteSized:
     847           0 :                                 v, _ := binary.Uvarint(value.InPlaceValue())
     848           0 :                                 // Batch.DeleteSized takes just the length of the value being
     849           0 :                                 // deleted and adds the key's length to derive the overall entry
     850           0 :                                 // size of the value being deleted. This has already been done
     851           0 :                                 // to the key we're reading from the batch, so we must subtract
     852           0 :                                 // the key length from the encoded value before calling
     853           0 :                                 // collapsed.DeleteSized, which will again add the key length
     854           0 :                                 // before encoding.
     855           0 :                                 err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
     856           0 :                         case pebble.InternalKeyKindSingleDelete:
     857           0 :                                 err = collapsed.SingleDelete(key.UserKey, nil)
     858           0 :                         case pebble.InternalKeyKindSet:
     859           0 :                                 err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
     860           0 :                         case pebble.InternalKeyKindMerge:
     861           0 :                                 err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
     862           0 :                         case pebble.InternalKeyKindLogData:
     863           0 :                                 err = collapsed.LogData(key.UserKey, nil)
     864           0 :                         default:
     865           0 :                                 err = errors.Errorf("unknown batch record kind: %d", key.Kind())
     866             :                         }
     867           0 :                         if err != nil {
     868           0 :                                 return nil, err
     869           0 :                         }
     870             :                 }
     871           0 :                 if err := pointIter.Close(); err != nil {
     872           0 :                         return nil, err
     873           0 :                 }
     874           0 :                 pointIter = nil
     875             :         }
     876             : 
     877             :         // There's no equivalent of a MERGE operator for range keys, so there's no
     878             :         // need to collapse the range keys here. Rather than reading the range keys
     879             :         // from `rangeKeyIter`, which will already be fragmented, read the range
     880             :         // keys from the batch and copy them verbatim. This marginally improves our
     881             :         // test coverage over the alternative approach of pre-fragmenting and
     882             :         // pre-coalescing before writing to the batch.
     883             :         //
     884             :         // The `rangeKeyIter` is used only to determine if there are any range keys
     885             :         // in the batch at all, and only because we already have it handy from
     886             :         // private.BatchSort.
     887           0 :         if rangeKeyIter != nil {
     888           0 :                 for r := b.Reader(); ; {
     889           0 :                         kind, key, value, ok, err := r.Next()
     890           0 :                         if !ok {
     891           0 :                                 if err != nil {
     892           0 :                                         return nil, err
     893           0 :                                 }
     894           0 :                                 break
     895           0 :                         } else if !rangekey.IsRangeKey(kind) {
     896           0 :                                 continue
     897             :                         }
     898           0 :                         ik := base.MakeInternalKey(key, 0, kind)
     899           0 :                         if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
     900           0 :                                 return nil, err
     901           0 :                         }
     902             :                 }
     903           0 :                 if err := rangeKeyIter.Close(); err != nil {
     904           0 :                         return nil, err
     905           0 :                 }
     906           0 :                 rangeKeyIter = nil
     907             :         }
     908             : 
     909           0 :         return collapsed, nil
     910             : }
     911             : 
     912           1 : func (o *ingestOp) String() string {
     913           1 :         var buf strings.Builder
     914           1 :         buf.WriteString(o.dbID.String())
     915           1 :         buf.WriteString(".Ingest(")
     916           1 :         for i, id := range o.batchIDs {
     917           1 :                 if i > 0 {
     918           1 :                         buf.WriteString(", ")
     919           1 :                 }
     920           1 :                 buf.WriteString(id.String())
     921             :         }
     922           1 :         buf.WriteString(")")
     923           1 :         return buf.String()
     924             : }
     925             : 
     926           0 : func (o *ingestOp) keys() []*[]byte                     { return nil }
     927           0 : func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     928             : 
     929             : type ingestAndExciseOp struct {
     930             :         dbID                   objID
     931             :         batchID                objID
     932             :         derivedDBID            objID
     933             :         exciseStart, exciseEnd []byte
     934             : }
     935             : 
     936           0 : func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) {
     937           0 :         var err error
     938           0 :         b := t.getBatch(o.batchID)
     939           0 :         t.clearObj(o.batchID)
     940           0 :         if t.testOpts.Opts.Comparer.Compare(o.exciseEnd, o.exciseStart) <= 0 {
     941           0 :                 panic("non-well-formed excise span")
     942             :         }
     943           0 :         if b.Empty() {
     944           0 :                 // No-op.
     945           0 :                 h.Recordf("%s // %v", o, err)
     946           0 :                 return
     947           0 :         }
     948           0 :         path, writerMeta, err2 := o.build(t, h, b, 0 /* i */)
     949           0 :         if err2 != nil {
     950           0 :                 h.Recordf("Build(%s) // %v", o.batchID, err2)
     951           0 :                 return
     952           0 :         }
     953           0 :         err = firstError(err, err2)
     954           0 :         err = firstError(err, b.Close())
     955           0 : 
     956           0 :         if writerMeta.Properties.NumEntries == 0 && writerMeta.Properties.NumRangeKeys() == 0 {
     957           0 :                 // No-op.
     958           0 :                 h.Recordf("%s // %v", o, err)
     959           0 :                 return
     960           0 :         }
     961           0 :         db := t.getDB(o.dbID)
     962           0 :         if !t.testOpts.useExcise {
     963           0 :                 // Do a rangedel and rangekeydel before the ingestion. This mimics the
     964           0 :                 // behaviour of an excise.
     965           0 :                 err = firstError(err, db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts))
     966           0 :                 err = firstError(err, db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts))
     967           0 :         }
     968             : 
     969           0 :         if t.testOpts.useExcise {
     970           0 :                 err = firstError(err, t.withRetries(func() error {
     971           0 :                         _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, pebble.KeyRange{
     972           0 :                                 Start: o.exciseStart,
     973           0 :                                 End:   o.exciseEnd,
     974           0 :                         })
     975           0 :                         return err
     976           0 :                 }))
     977           0 :         } else {
     978           0 :                 err = firstError(err, t.withRetries(func() error {
     979           0 :                         return t.getDB(o.dbID).Ingest([]string{path})
     980           0 :                 }))
     981             :         }
     982             : 
     983           0 :         h.Recordf("%s // %v", o, err)
     984             : }
     985             : 
     986             : func (o *ingestAndExciseOp) build(
     987             :         t *Test, h historyRecorder, b *pebble.Batch, i int,
     988           0 : ) (string, *sstable.WriterMetadata, error) {
     989           0 :         return buildForIngest(t, o.dbID, h, b, i)
     990           0 : }
     991             : 
     992           0 : func (o *ingestAndExciseOp) receiver() objID { return o.dbID }
     993           0 : func (o *ingestAndExciseOp) syncObjs() objIDSlice {
     994           0 :         // Ingest should not be concurrent with mutating the batches that will be
     995           0 :         // ingested as sstables.
     996           0 :         objs := []objID{o.batchID}
     997           0 :         if o.derivedDBID != o.dbID {
     998           0 :                 objs = append(objs, o.derivedDBID)
     999           0 :         }
    1000           0 :         return objs
    1001             : }
    1002             : 
    1003           1 : func (o *ingestAndExciseOp) String() string {
    1004           1 :         return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd)
    1005           1 : }
    1006             : 
    1007           0 : func (o *ingestAndExciseOp) keys() []*[]byte {
    1008           0 :         return []*[]byte{&o.exciseStart, &o.exciseEnd}
    1009           0 : }
    1010             : 
    1011           0 : func (o *ingestAndExciseOp) diagramKeyRanges() []pebble.KeyRange {
    1012           0 :         return []pebble.KeyRange{{Start: o.exciseStart, End: o.exciseEnd}}
    1013           0 : }
    1014             : 
    1015             : // getOp models a Reader.Get operation.
    1016             : type getOp struct {
    1017             :         readerID    objID
    1018             :         key         []byte
    1019             :         derivedDBID objID
    1020             : }
    1021             : 
    1022           1 : func (o *getOp) run(t *Test, h historyRecorder) {
    1023           1 :         r := t.getReader(o.readerID)
    1024           1 :         var val []byte
    1025           1 :         var closer io.Closer
    1026           1 :         err := t.withRetries(func() (err error) {
    1027           1 :                 val, closer, err = r.Get(o.key)
    1028           1 :                 return err
    1029           1 :         })
    1030           1 :         h.Recordf("%s // [%q] %v", o, val, err)
    1031           1 :         if closer != nil {
    1032           1 :                 closer.Close()
    1033           1 :         }
    1034             : }
    1035             : 
    1036           1 : func (o *getOp) String() string  { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
    1037           1 : func (o *getOp) receiver() objID { return o.readerID }
    1038           1 : func (o *getOp) syncObjs() objIDSlice {
    1039           1 :         if o.readerID.tag() == dbTag {
    1040           1 :                 return nil
    1041           1 :         }
    1042             :         // batch.Get reads through to the current database state.
    1043           1 :         if o.derivedDBID != 0 {
    1044           1 :                 return []objID{o.derivedDBID}
    1045           1 :         }
    1046           0 :         return nil
    1047             : }
    1048             : 
    1049           0 : func (o *getOp) keys() []*[]byte {
    1050           0 :         return []*[]byte{&o.key}
    1051           0 : }
    1052             : 
    1053           0 : func (o *getOp) diagramKeyRanges() []pebble.KeyRange {
    1054           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1055           0 : }
    1056             : 
    1057             : // newIterOp models a Reader.NewIter operation.
    1058             : type newIterOp struct {
    1059             :         readerID objID
    1060             :         iterID   objID
    1061             :         iterOpts
    1062             :         derivedDBID objID
    1063             : }
    1064             : 
    1065             : // Enable this to enable debug logging of range key iterator operations.
    1066             : const debugIterators = false
    1067             : 
    1068           1 : func (o *newIterOp) run(t *Test, h historyRecorder) {
    1069           1 :         r := t.getReader(o.readerID)
    1070           1 :         opts := iterOptions(o.iterOpts)
    1071           1 :         if debugIterators {
    1072           0 :                 opts.DebugRangeKeyStack = true
    1073           0 :         }
    1074             : 
    1075           1 :         var i *pebble.Iterator
    1076           1 :         for {
    1077           1 :                 i, _ = r.NewIter(opts)
    1078           1 :                 if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
    1079           1 :                         break
    1080             :                 }
    1081             :                 // close this iter and retry NewIter
    1082           0 :                 _ = i.Close()
    1083             :         }
    1084           1 :         t.setIter(o.iterID, i)
    1085           1 : 
    1086           1 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1087           1 :         // the user-provided bounds.
    1088           1 :         if opts != nil {
    1089           1 :                 rand.Read(opts.LowerBound[:])
    1090           1 :                 rand.Read(opts.UpperBound[:])
    1091           1 :         }
    1092           1 :         h.Recordf("%s // %v", o, i.Error())
    1093             : }
    1094             : 
    1095           1 : func (o *newIterOp) String() string {
    1096           1 :         return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
    1097           1 :                 o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
    1098           1 : }
    1099             : 
    1100           1 : func (o *newIterOp) receiver() objID { return o.readerID }
    1101           1 : func (o *newIterOp) syncObjs() objIDSlice {
    1102           1 :         // Prevent o.iterID ops from running before it exists.
    1103           1 :         objs := []objID{o.iterID}
    1104           1 :         // If reading through a batch or snapshot, the new iterator will also observe database
    1105           1 :         // state, and we must synchronize on the database state for a consistent
    1106           1 :         // view.
    1107           1 :         if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
    1108           1 :                 objs = append(objs, o.derivedDBID)
    1109           1 :         }
    1110           1 :         return objs
    1111             : }
    1112             : 
    1113           0 : func (o *newIterOp) keys() []*[]byte                     { return nil }
    1114           1 : func (o *newIterOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1115             : 
    1116             : // newIterUsingCloneOp models a Iterator.Clone operation.
    1117             : type newIterUsingCloneOp struct {
    1118             :         existingIterID objID
    1119             :         iterID         objID
    1120             :         refreshBatch   bool
    1121             :         iterOpts
    1122             : 
    1123             :         // derivedReaderID is the ID of the underlying reader that backs both the
    1124             :         // existing iterator and the new iterator. The derivedReaderID is NOT
    1125             :         // serialized by String and is derived from other operations during parse.
    1126             :         derivedReaderID objID
    1127             : }
    1128             : 
    1129           1 : func (o *newIterUsingCloneOp) run(t *Test, h historyRecorder) {
    1130           1 :         iter := t.getIter(o.existingIterID)
    1131           1 :         cloneOpts := pebble.CloneOptions{
    1132           1 :                 IterOptions:      iterOptions(o.iterOpts),
    1133           1 :                 RefreshBatchView: o.refreshBatch,
    1134           1 :         }
    1135           1 :         i, err := iter.iter.Clone(cloneOpts)
    1136           1 :         if err != nil {
    1137           0 :                 panic(err)
    1138             :         }
    1139           1 :         t.setIter(o.iterID, i)
    1140           1 :         h.Recordf("%s // %v", o, i.Error())
    1141             : }
    1142             : 
    1143           1 : func (o *newIterUsingCloneOp) String() string {
    1144           1 :         return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
    1145           1 :                 o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
    1146           1 :                 o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
    1147           1 : }
    1148             : 
    1149           1 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
    1150             : 
    1151           1 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
    1152           1 :         objIDs := []objID{o.iterID}
    1153           1 :         // If the underlying reader is a batch, we must synchronize with the batch.
    1154           1 :         // If refreshBatch=true, synchronizing is necessary to observe all the
    1155           1 :         // mutations up to until this op and no more. Even when refreshBatch=false,
    1156           1 :         // we must synchronize because iterator construction may access state cached
    1157           1 :         // on the indexed batch to avoid refragmenting range tombstones or range
    1158           1 :         // keys.
    1159           1 :         if o.derivedReaderID.tag() == batchTag {
    1160           0 :                 objIDs = append(objIDs, o.derivedReaderID)
    1161           0 :         }
    1162           1 :         return objIDs
    1163             : }
    1164             : 
    1165           0 : func (o *newIterUsingCloneOp) keys() []*[]byte                     { return nil }
    1166           0 : func (o *newIterUsingCloneOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1167             : 
    1168             : // iterSetBoundsOp models an Iterator.SetBounds operation.
    1169             : type iterSetBoundsOp struct {
    1170             :         iterID objID
    1171             :         lower  []byte
    1172             :         upper  []byte
    1173             : }
    1174             : 
    1175           1 : func (o *iterSetBoundsOp) run(t *Test, h historyRecorder) {
    1176           1 :         i := t.getIter(o.iterID)
    1177           1 :         var lower, upper []byte
    1178           1 :         if o.lower != nil {
    1179           1 :                 lower = append(lower, o.lower...)
    1180           1 :         }
    1181           1 :         if o.upper != nil {
    1182           1 :                 upper = append(upper, o.upper...)
    1183           1 :         }
    1184           1 :         i.SetBounds(lower, upper)
    1185           1 : 
    1186           1 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1187           1 :         // the user-provided bounds.
    1188           1 :         rand.Read(lower[:])
    1189           1 :         rand.Read(upper[:])
    1190           1 : 
    1191           1 :         h.Recordf("%s // %v", o, i.Error())
    1192             : }
    1193             : 
    1194           1 : func (o *iterSetBoundsOp) String() string {
    1195           1 :         return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
    1196           1 : }
    1197             : 
    1198           1 : func (o *iterSetBoundsOp) receiver() objID      { return o.iterID }
    1199           1 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
    1200             : 
    1201           0 : func (o *iterSetBoundsOp) keys() []*[]byte {
    1202           0 :         return []*[]byte{&o.lower, &o.upper}
    1203           0 : }
    1204             : 
    1205           0 : func (o *iterSetBoundsOp) diagramKeyRanges() []pebble.KeyRange {
    1206           0 :         return []pebble.KeyRange{{Start: o.lower, End: o.upper}}
    1207           0 : }
    1208             : 
    1209             : // iterSetOptionsOp models an Iterator.SetOptions operation.
    1210             : type iterSetOptionsOp struct {
    1211             :         iterID objID
    1212             :         iterOpts
    1213             : 
    1214             :         // derivedReaderID is the ID of the underlying reader that backs the
    1215             :         // iterator. The derivedReaderID is NOT serialized by String and is derived
    1216             :         // from other operations during parse.
    1217             :         derivedReaderID objID
    1218             : }
    1219             : 
    1220           1 : func (o *iterSetOptionsOp) run(t *Test, h historyRecorder) {
    1221           1 :         i := t.getIter(o.iterID)
    1222           1 : 
    1223           1 :         opts := iterOptions(o.iterOpts)
    1224           1 :         if opts == nil {
    1225           1 :                 opts = &pebble.IterOptions{}
    1226           1 :         }
    1227           1 :         i.SetOptions(opts)
    1228           1 : 
    1229           1 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1230           1 :         // the user-provided bounds.
    1231           1 :         rand.Read(opts.LowerBound[:])
    1232           1 :         rand.Read(opts.UpperBound[:])
    1233           1 : 
    1234           1 :         h.Recordf("%s // %v", o, i.Error())
    1235             : }
    1236             : 
    1237           1 : func (o *iterSetOptionsOp) String() string {
    1238           1 :         return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
    1239           1 :                 o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
    1240           1 : }
    1241             : 
    1242           1 : func iterOptions(o iterOpts) *pebble.IterOptions {
    1243           1 :         if o.IsZero() {
    1244           1 :                 return nil
    1245           1 :         }
    1246           1 :         var lower, upper []byte
    1247           1 :         if o.lower != nil {
    1248           1 :                 lower = append(lower, o.lower...)
    1249           1 :         }
    1250           1 :         if o.upper != nil {
    1251           1 :                 upper = append(upper, o.upper...)
    1252           1 :         }
    1253           1 :         opts := &pebble.IterOptions{
    1254           1 :                 LowerBound: lower,
    1255           1 :                 UpperBound: upper,
    1256           1 :                 KeyTypes:   pebble.IterKeyType(o.keyTypes),
    1257           1 :                 RangeKeyMasking: pebble.RangeKeyMasking{
    1258           1 :                         Suffix: o.maskSuffix,
    1259           1 :                 },
    1260           1 :                 UseL6Filters: o.useL6Filters,
    1261           1 :         }
    1262           1 :         if opts.RangeKeyMasking.Suffix != nil {
    1263           1 :                 opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
    1264           1 :                         return sstable.NewTestKeysMaskingFilter()
    1265           1 :                 }
    1266             :         }
    1267           1 :         if o.filterMax > 0 {
    1268           1 :                 opts.PointKeyFilters = []pebble.BlockPropertyFilter{
    1269           1 :                         sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
    1270           1 :                 }
    1271           1 :                 // Enforce the timestamp bounds in SkipPoint, so that the iterator never
    1272           1 :                 // returns a key outside the filterMin, filterMax bounds. This provides
    1273           1 :                 // deterministic iteration.
    1274           1 :                 opts.SkipPoint = func(k []byte) (skip bool) {
    1275           1 :                         n := testkeys.Comparer.Split(k)
    1276           1 :                         if n == len(k) {
    1277           1 :                                 // No suffix, don't skip it.
    1278           1 :                                 return false
    1279           1 :                         }
    1280           1 :                         v, err := testkeys.ParseSuffix(k[n:])
    1281           1 :                         if err != nil {
    1282           0 :                                 panic(err)
    1283             :                         }
    1284           1 :                         ts := uint64(v)
    1285           1 :                         return ts < o.filterMin || ts >= o.filterMax
    1286             :                 }
    1287             :         }
    1288           1 :         return opts
    1289             : }
    1290             : 
    1291           1 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
    1292             : 
    1293           1 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
    1294           1 :         if o.derivedReaderID.tag() == batchTag {
    1295           0 :                 // If the underlying reader is a batch, we must synchronize with the
    1296           0 :                 // batch so that we observe all the mutations up until this operation
    1297           0 :                 // and no more.
    1298           0 :                 return []objID{o.derivedReaderID}
    1299           0 :         }
    1300           1 :         return nil
    1301             : }
    1302             : 
    1303           0 : func (o *iterSetOptionsOp) keys() []*[]byte                     { return nil }
    1304           0 : func (o *iterSetOptionsOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1305             : 
    1306             : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
    1307             : type iterSeekGEOp struct {
    1308             :         iterID objID
    1309             :         key    []byte
    1310             :         limit  []byte
    1311             : 
    1312             :         derivedReaderID objID
    1313             : }
    1314             : 
    1315           1 : func iteratorPos(i *retryableIter) string {
    1316           1 :         var buf bytes.Buffer
    1317           1 :         fmt.Fprintf(&buf, "%q", i.Key())
    1318           1 :         hasPoint, hasRange := i.HasPointAndRange()
    1319           1 :         if hasPoint {
    1320           1 :                 fmt.Fprintf(&buf, ",%q", i.Value())
    1321           1 :         } else {
    1322           1 :                 fmt.Fprint(&buf, ",<no point>")
    1323           1 :         }
    1324           1 :         if hasRange {
    1325           1 :                 start, end := i.RangeBounds()
    1326           1 :                 fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
    1327           1 :                 for i, rk := range i.RangeKeys() {
    1328           1 :                         if i > 0 {
    1329           1 :                                 fmt.Fprint(&buf, ",")
    1330           1 :                         }
    1331           1 :                         fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
    1332             :                 }
    1333           1 :                 fmt.Fprint(&buf, "}")
    1334           1 :         } else {
    1335           1 :                 fmt.Fprint(&buf, ",<no range>")
    1336           1 :         }
    1337           1 :         if i.RangeKeyChanged() {
    1338           1 :                 fmt.Fprint(&buf, "*")
    1339           1 :         }
    1340           1 :         return buf.String()
    1341             : }
    1342             : 
    1343           1 : func validBoolToStr(valid bool) string {
    1344           1 :         return fmt.Sprintf("%t", valid)
    1345           1 : }
    1346             : 
    1347           1 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
    1348           1 :         // We can't distinguish between IterExhausted and IterAtLimit in a
    1349           1 :         // deterministic manner.
    1350           1 :         switch validity {
    1351           1 :         case pebble.IterExhausted, pebble.IterAtLimit:
    1352           1 :                 return false, "invalid"
    1353           1 :         case pebble.IterValid:
    1354           1 :                 return true, "valid"
    1355           0 :         default:
    1356           0 :                 panic("unknown validity")
    1357             :         }
    1358             : }
    1359             : 
    1360           1 : func (o *iterSeekGEOp) run(t *Test, h historyRecorder) {
    1361           1 :         i := t.getIter(o.iterID)
    1362           1 :         var valid bool
    1363           1 :         var validStr string
    1364           1 :         if o.limit == nil {
    1365           1 :                 valid = i.SeekGE(o.key)
    1366           1 :                 validStr = validBoolToStr(valid)
    1367           1 :         } else {
    1368           1 :                 valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
    1369           1 :         }
    1370           1 :         if valid {
    1371           1 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1372           1 :         } else {
    1373           1 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1374           1 :         }
    1375             : }
    1376             : 
    1377           1 : func (o *iterSeekGEOp) String() string {
    1378           1 :         return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
    1379           1 : }
    1380           1 : func (o *iterSeekGEOp) receiver() objID      { return o.iterID }
    1381           1 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1382             : 
    1383           0 : func (o *iterSeekGEOp) keys() []*[]byte {
    1384           0 :         return []*[]byte{&o.key}
    1385           0 : }
    1386             : 
    1387           1 : func (o *iterSeekGEOp) diagramKeyRanges() []pebble.KeyRange {
    1388           1 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1389           1 : }
    1390             : 
    1391           1 : func onlyBatchIDs(ids ...objID) objIDSlice {
    1392           1 :         var ret objIDSlice
    1393           1 :         for _, id := range ids {
    1394           1 :                 if id.tag() == batchTag {
    1395           0 :                         ret = append(ret, id)
    1396           0 :                 }
    1397             :         }
    1398           1 :         return ret
    1399             : }
    1400             : 
    1401             : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
    1402             : type iterSeekPrefixGEOp struct {
    1403             :         iterID objID
    1404             :         key    []byte
    1405             : 
    1406             :         derivedReaderID objID
    1407             : }
    1408             : 
    1409           1 : func (o *iterSeekPrefixGEOp) run(t *Test, h historyRecorder) {
    1410           1 :         i := t.getIter(o.iterID)
    1411           1 :         valid := i.SeekPrefixGE(o.key)
    1412           1 :         if valid {
    1413           1 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1414           1 :         } else {
    1415           1 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1416           1 :         }
    1417             : }
    1418             : 
    1419           1 : func (o *iterSeekPrefixGEOp) String() string {
    1420           1 :         return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
    1421           1 : }
    1422           1 : func (o *iterSeekPrefixGEOp) receiver() objID      { return o.iterID }
    1423           1 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1424             : 
    1425           0 : func (o *iterSeekPrefixGEOp) keys() []*[]byte {
    1426           0 :         return []*[]byte{&o.key}
    1427           0 : }
    1428             : 
    1429           0 : func (o *iterSeekPrefixGEOp) diagramKeyRanges() []pebble.KeyRange {
    1430           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1431           0 : }
    1432             : 
    1433             : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
    1434             : type iterSeekLTOp struct {
    1435             :         iterID objID
    1436             :         key    []byte
    1437             :         limit  []byte
    1438             : 
    1439             :         derivedReaderID objID
    1440             : }
    1441             : 
    1442           1 : func (o *iterSeekLTOp) run(t *Test, h historyRecorder) {
    1443           1 :         i := t.getIter(o.iterID)
    1444           1 :         var valid bool
    1445           1 :         var validStr string
    1446           1 :         if o.limit == nil {
    1447           1 :                 valid = i.SeekLT(o.key)
    1448           1 :                 validStr = validBoolToStr(valid)
    1449           1 :         } else {
    1450           1 :                 valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
    1451           1 :         }
    1452           1 :         if valid {
    1453           1 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1454           1 :         } else {
    1455           1 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1456           1 :         }
    1457             : }
    1458             : 
    1459           1 : func (o *iterSeekLTOp) String() string {
    1460           1 :         return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
    1461           1 : }
    1462             : 
    1463           1 : func (o *iterSeekLTOp) receiver() objID      { return o.iterID }
    1464           1 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1465             : 
    1466           0 : func (o *iterSeekLTOp) keys() []*[]byte {
    1467           0 :         return []*[]byte{&o.key}
    1468           0 : }
    1469             : 
    1470           0 : func (o *iterSeekLTOp) diagramKeyRanges() []pebble.KeyRange {
    1471           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1472           0 : }
    1473             : 
    1474             : // iterFirstOp models an Iterator.First operation.
    1475             : type iterFirstOp struct {
    1476             :         iterID objID
    1477             : 
    1478             :         derivedReaderID objID
    1479             : }
    1480             : 
    1481           1 : func (o *iterFirstOp) run(t *Test, h historyRecorder) {
    1482           1 :         i := t.getIter(o.iterID)
    1483           1 :         valid := i.First()
    1484           1 :         if valid {
    1485           1 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1486           1 :         } else {
    1487           1 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1488           1 :         }
    1489             : }
    1490             : 
    1491           1 : func (o *iterFirstOp) String() string       { return fmt.Sprintf("%s.First()", o.iterID) }
    1492           1 : func (o *iterFirstOp) receiver() objID      { return o.iterID }
    1493           1 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1494             : 
    1495           0 : func (o *iterFirstOp) keys() []*[]byte                     { return nil }
    1496           0 : func (o *iterFirstOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1497             : 
    1498             : // iterLastOp models an Iterator.Last operation.
    1499             : type iterLastOp struct {
    1500             :         iterID objID
    1501             : 
    1502             :         derivedReaderID objID
    1503             : }
    1504             : 
    1505           1 : func (o *iterLastOp) run(t *Test, h historyRecorder) {
    1506           1 :         i := t.getIter(o.iterID)
    1507           1 :         valid := i.Last()
    1508           1 :         if valid {
    1509           1 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1510           1 :         } else {
    1511           1 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1512           1 :         }
    1513             : }
    1514             : 
    1515           1 : func (o *iterLastOp) String() string       { return fmt.Sprintf("%s.Last()", o.iterID) }
    1516           1 : func (o *iterLastOp) receiver() objID      { return o.iterID }
    1517           1 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1518             : 
    1519           0 : func (o *iterLastOp) keys() []*[]byte                     { return nil }
    1520           0 : func (o *iterLastOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1521             : 
    1522             : // iterNextOp models an Iterator.Next[WithLimit] operation.
    1523             : type iterNextOp struct {
    1524             :         iterID objID
    1525             :         limit  []byte
    1526             : 
    1527             :         derivedReaderID objID
    1528             : }
    1529             : 
    1530           1 : func (o *iterNextOp) run(t *Test, h historyRecorder) {
    1531           1 :         i := t.getIter(o.iterID)
    1532           1 :         var valid bool
    1533           1 :         var validStr string
    1534           1 :         if o.limit == nil {
    1535           1 :                 valid = i.Next()
    1536           1 :                 validStr = validBoolToStr(valid)
    1537           1 :         } else {
    1538           1 :                 valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
    1539           1 :         }
    1540           1 :         if valid {
    1541           1 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1542           1 :         } else {
    1543           1 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1544           1 :         }
    1545             : }
    1546             : 
    1547           1 : func (o *iterNextOp) String() string       { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
    1548           1 : func (o *iterNextOp) receiver() objID      { return o.iterID }
    1549           1 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1550             : 
    1551           0 : func (o *iterNextOp) keys() []*[]byte                     { return nil }
    1552           0 : func (o *iterNextOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1553             : 
    1554             : // iterNextPrefixOp models an Iterator.NextPrefix operation.
    1555             : type iterNextPrefixOp struct {
    1556             :         iterID objID
    1557             : 
    1558             :         derivedReaderID objID
    1559             : }
    1560             : 
    1561           1 : func (o *iterNextPrefixOp) run(t *Test, h historyRecorder) {
    1562           1 :         i := t.getIter(o.iterID)
    1563           1 :         valid := i.NextPrefix()
    1564           1 :         validStr := validBoolToStr(valid)
    1565           1 :         if valid {
    1566           1 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1567           1 :         } else {
    1568           1 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1569           1 :         }
    1570             : }
    1571             : 
    1572           1 : func (o *iterNextPrefixOp) String() string       { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
    1573           1 : func (o *iterNextPrefixOp) receiver() objID      { return o.iterID }
    1574           1 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1575             : 
    1576           0 : func (o *iterNextPrefixOp) keys() []*[]byte                     { return nil }
    1577           0 : func (o *iterNextPrefixOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1578             : 
    1579             : // iterCanSingleDelOp models a call to CanDeterministicallySingleDelete with an
    1580             : // Iterator.
    1581             : type iterCanSingleDelOp struct {
    1582             :         iterID objID
    1583             : 
    1584             :         derivedReaderID objID
    1585             : }
    1586             : 
    1587           1 : func (o *iterCanSingleDelOp) run(t *Test, h historyRecorder) {
    1588           1 :         // TODO(jackson): When we perform error injection, we'll need to rethink
    1589           1 :         // this.
    1590           1 :         _, err := pebble.CanDeterministicallySingleDelete(t.getIter(o.iterID).iter)
    1591           1 :         // The return value of CanDeterministicallySingleDelete is dependent on
    1592           1 :         // internal LSM state and non-deterministic, so we don't record it.
    1593           1 :         // Including the operation within the metamorphic test at all helps ensure
    1594           1 :         // that it does not change the result of any other Iterator operation that
    1595           1 :         // should be deterministic, regardless of its own outcome.
    1596           1 :         //
    1597           1 :         // We still record the value of the error because it's deterministic, at
    1598           1 :         // least for now. The possible error cases are:
    1599           1 :         //  - The iterator was already in an error state when the operation ran.
    1600           1 :         //  - The operation is deterministically invalid (like using an InternalNext
    1601           1 :         //    to change directions.)
    1602           1 :         h.Recordf("%s // %v", o, err)
    1603           1 : }
    1604             : 
    1605           1 : func (o *iterCanSingleDelOp) String() string       { return fmt.Sprintf("%s.InternalNext()", o.iterID) }
    1606           1 : func (o *iterCanSingleDelOp) receiver() objID      { return o.iterID }
    1607           1 : func (o *iterCanSingleDelOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1608             : 
    1609           0 : func (o *iterCanSingleDelOp) keys() []*[]byte                     { return nil }
    1610           0 : func (o *iterCanSingleDelOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1611             : 
    1612             : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
    1613             : type iterPrevOp struct {
    1614             :         iterID objID
    1615             :         limit  []byte
    1616             : 
    1617             :         derivedReaderID objID
    1618             : }
    1619             : 
    1620           1 : func (o *iterPrevOp) run(t *Test, h historyRecorder) {
    1621           1 :         i := t.getIter(o.iterID)
    1622           1 :         var valid bool
    1623           1 :         var validStr string
    1624           1 :         if o.limit == nil {
    1625           1 :                 valid = i.Prev()
    1626           1 :                 validStr = validBoolToStr(valid)
    1627           1 :         } else {
    1628           1 :                 valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
    1629           1 :         }
    1630           1 :         if valid {
    1631           1 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1632           1 :         } else {
    1633           1 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1634           1 :         }
    1635             : }
    1636             : 
    1637           1 : func (o *iterPrevOp) String() string       { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
    1638           1 : func (o *iterPrevOp) receiver() objID      { return o.iterID }
    1639           1 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1640             : 
    1641           0 : func (o *iterPrevOp) keys() []*[]byte                     { return nil }
    1642           0 : func (o *iterPrevOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1643             : 
    1644             : // newSnapshotOp models a DB.NewSnapshot operation.
    1645             : type newSnapshotOp struct {
    1646             :         dbID   objID
    1647             :         snapID objID
    1648             :         // If nonempty, this snapshot must not be used to read any keys outside of
    1649             :         // the provided bounds. This allows some implementations to use 'Eventually
    1650             :         // file-only snapshots,' which require bounds.
    1651             :         bounds []pebble.KeyRange
    1652             : }
    1653             : 
    1654           1 : func (o *newSnapshotOp) run(t *Test, h historyRecorder) {
    1655           1 :         bounds := o.bounds
    1656           1 :         if len(bounds) == 0 {
    1657           0 :                 panic("bounds unexpectedly unset for newSnapshotOp")
    1658             :         }
    1659             :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
    1660           1 :         createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1
    1661           1 :         // If either of these options is true, an EFOS _must_ be created, regardless
    1662           1 :         // of what the fibonacci hash returned.
    1663           1 :         excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExcise
    1664           1 :         if createEfos || excisePossible {
    1665           1 :                 s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds)
    1666           1 :                 t.setSnapshot(o.snapID, s)
    1667           1 :                 // If the EFOS isn't guaranteed to always create iterators, we must force
    1668           1 :                 // a flush on this DB so it transitions this EFOS into a file-only snapshot.
    1669           1 :                 if excisePossible && !t.testOpts.efosAlwaysCreatesIters {
    1670           0 :                         err := t.getDB(o.dbID).Flush()
    1671           0 :                         if err != nil {
    1672           0 :                                 h.Recordf("%s // %v", o, err)
    1673           0 :                         }
    1674             :                 }
    1675           1 :         } else {
    1676           1 :                 s := t.getDB(o.dbID).NewSnapshot()
    1677           1 :                 t.setSnapshot(o.snapID, s)
    1678           1 :         }
    1679           1 :         h.Recordf("%s", o)
    1680             : }
    1681             : 
    1682           1 : func (o *newSnapshotOp) String() string {
    1683           1 :         var buf bytes.Buffer
    1684           1 :         fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
    1685           1 :         for i := range o.bounds {
    1686           1 :                 if i > 0 {
    1687           1 :                         fmt.Fprint(&buf, ", ")
    1688           1 :                 }
    1689           1 :                 fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
    1690             :         }
    1691           1 :         fmt.Fprint(&buf, ")")
    1692           1 :         return buf.String()
    1693             : }
    1694           1 : func (o *newSnapshotOp) receiver() objID      { return o.dbID }
    1695           1 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
    1696             : 
    1697           1 : func (o *newSnapshotOp) keys() []*[]byte {
    1698           1 :         var res []*[]byte
    1699           1 :         for i := range o.bounds {
    1700           1 :                 res = append(res, &o.bounds[i].Start, &o.bounds[i].End)
    1701           1 :         }
    1702           1 :         return res
    1703             : }
    1704             : 
    1705           1 : func (o *newSnapshotOp) diagramKeyRanges() []pebble.KeyRange {
    1706           1 :         return o.bounds
    1707           1 : }
    1708             : 
    1709             : type dbRatchetFormatMajorVersionOp struct {
    1710             :         dbID objID
    1711             :         vers pebble.FormatMajorVersion
    1712             : }
    1713             : 
    1714           1 : func (o *dbRatchetFormatMajorVersionOp) run(t *Test, h historyRecorder) {
    1715           1 :         var err error
    1716           1 :         // NB: We no-op the operation if we're already at or above the provided
    1717           1 :         // format major version. Different runs start at different format major
    1718           1 :         // versions, making the presence of an error and the error message itself
    1719           1 :         // non-deterministic if we attempt to upgrade to an older version.
    1720           1 :         //
    1721           1 :         //Regardless, subsequent operations should behave identically, which is what
    1722           1 :         //we're really aiming to test by including this format major version ratchet
    1723           1 :         //operation.
    1724           1 :         if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
    1725           1 :                 err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
    1726           1 :         }
    1727           1 :         h.Recordf("%s // %v", o, err)
    1728             : }
    1729             : 
    1730           1 : func (o *dbRatchetFormatMajorVersionOp) String() string {
    1731           1 :         return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
    1732           1 : }
    1733           1 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID      { return o.dbID }
    1734           1 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
    1735             : 
    1736           0 : func (o *dbRatchetFormatMajorVersionOp) keys() []*[]byte                     { return nil }
    1737           0 : func (o *dbRatchetFormatMajorVersionOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1738             : 
    1739             : type dbRestartOp struct {
    1740             :         dbID objID
    1741             : 
    1742             :         // affectedObjects is the list of additional objects that are affected by this
    1743             :         // operation, and which syncObjs() must return so that we don't perform the
    1744             :         // restart in parallel with other operations to affected objects.
    1745             :         affectedObjects []objID
    1746             : }
    1747             : 
    1748           1 : func (o *dbRestartOp) run(t *Test, h historyRecorder) {
    1749           1 :         if err := t.restartDB(o.dbID); err != nil {
    1750           0 :                 h.Recordf("%s // %v", o, err)
    1751           0 :                 h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
    1752           1 :         } else {
    1753           1 :                 h.Recordf("%s", o)
    1754           1 :         }
    1755             : }
    1756             : 
    1757           1 : func (o *dbRestartOp) String() string       { return fmt.Sprintf("%s.Restart()", o.dbID) }
    1758           1 : func (o *dbRestartOp) receiver() objID      { return o.dbID }
    1759           1 : func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjects }
    1760             : 
    1761           0 : func (o *dbRestartOp) keys() []*[]byte                     { return nil }
    1762           0 : func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1763             : 
    1764           1 : func formatOps(ops []op) string {
    1765           1 :         var buf strings.Builder
    1766           1 :         for _, op := range ops {
    1767           1 :                 fmt.Fprintf(&buf, "%s\n", op)
    1768           1 :         }
    1769           1 :         return buf.String()
    1770             : }
    1771             : 
    1772             : // replicateOp models an operation that could copy keys from one db to
    1773             : // another through either an IngestAndExcise, or an Ingest.
    1774             : type replicateOp struct {
    1775             :         source, dest objID
    1776             :         start, end   []byte
    1777             : }
    1778             : 
    1779             : func (r *replicateOp) runSharedReplicate(
    1780             :         t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
    1781           0 : ) {
    1782           0 :         var sharedSSTs []pebble.SharedSSTMeta
    1783           0 :         var err error
    1784           0 :         err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
    1785           0 :                 func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
    1786           0 :                         val, _, err := value.Value(nil)
    1787           0 :                         if err != nil {
    1788           0 :                                 panic(err)
    1789             :                         }
    1790           0 :                         return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
    1791             :                 },
    1792           0 :                 func(start, end []byte, seqNum uint64) error {
    1793           0 :                         return w.DeleteRange(start, end)
    1794           0 :                 },
    1795           0 :                 func(start, end []byte, keys []keyspan.Key) error {
    1796           0 :                         s := keyspan.Span{
    1797           0 :                                 Start: start,
    1798           0 :                                 End:   end,
    1799           0 :                                 Keys:  keys,
    1800           0 :                         }
    1801           0 :                         return rangekey.Encode(&s, w.AddRangeKey)
    1802           0 :                 },
    1803           0 :                 func(sst *pebble.SharedSSTMeta) error {
    1804           0 :                         sharedSSTs = append(sharedSSTs, *sst)
    1805           0 :                         return nil
    1806           0 :                 },
    1807             :         )
    1808           0 :         if err != nil {
    1809           0 :                 h.Recordf("%s // %v", r, err)
    1810           0 :                 return
    1811           0 :         }
    1812             : 
    1813           0 :         err = w.Close()
    1814           0 :         if err != nil {
    1815           0 :                 h.Recordf("%s // %v", r, err)
    1816           0 :                 return
    1817           0 :         }
    1818           0 :         meta, err := w.Metadata()
    1819           0 :         if err != nil {
    1820           0 :                 h.Recordf("%s // %v", r, err)
    1821           0 :                 return
    1822           0 :         }
    1823           0 :         if len(sharedSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
    1824           0 :                 // IngestAndExcise below will be a no-op. We should do a
    1825           0 :                 // DeleteRange+RangeKeyDel to mimic the behaviour of the non-shared-replicate
    1826           0 :                 // case.
    1827           0 :                 //
    1828           0 :                 // TODO(bilal): Remove this when we support excises with no matching ingests.
    1829           0 :                 if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
    1830           0 :                         h.Recordf("%s // %v", r, err)
    1831           0 :                         return
    1832           0 :                 }
    1833           0 :                 err := dest.DeleteRange(r.start, r.end, t.writeOpts)
    1834           0 :                 h.Recordf("%s // %v", r, err)
    1835           0 :                 return
    1836             :         }
    1837             : 
    1838           0 :         _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end})
    1839           0 :         h.Recordf("%s // %v", r, err)
    1840             : }
    1841             : 
    1842           0 : func (r *replicateOp) run(t *Test, h historyRecorder) {
    1843           0 :         // Shared replication only works if shared storage is enabled.
    1844           0 :         useSharedIngest := t.testOpts.useSharedReplicate
    1845           0 :         if !t.testOpts.sharedStorageEnabled {
    1846           0 :                 useSharedIngest = false
    1847           0 :         }
    1848             : 
    1849           0 :         source := t.getDB(r.source)
    1850           0 :         dest := t.getDB(r.dest)
    1851           0 :         sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
    1852           0 :         f, err := t.opts.FS.Create(sstPath)
    1853           0 :         if err != nil {
    1854           0 :                 h.Recordf("%s // %v", r, err)
    1855           0 :                 return
    1856           0 :         }
    1857           0 :         w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.FormatMajorVersion().MaxTableFormat()))
    1858           0 : 
    1859           0 :         if useSharedIngest {
    1860           0 :                 r.runSharedReplicate(t, h, source, dest, w, sstPath)
    1861           0 :                 return
    1862           0 :         }
    1863             : 
    1864             :         // First, do a RangeKeyDelete and DeleteRange on the whole span.
    1865           0 :         if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
    1866           0 :                 h.Recordf("%s // %v", r, err)
    1867           0 :                 return
    1868           0 :         }
    1869           0 :         if err := dest.DeleteRange(r.start, r.end, t.writeOpts); err != nil {
    1870           0 :                 h.Recordf("%s // %v", r, err)
    1871           0 :                 return
    1872           0 :         }
    1873           0 :         iter, err := source.NewIter(&pebble.IterOptions{
    1874           0 :                 LowerBound: r.start,
    1875           0 :                 UpperBound: r.end,
    1876           0 :                 KeyTypes:   pebble.IterKeyTypePointsAndRanges,
    1877           0 :         })
    1878           0 :         if err != nil {
    1879           0 :                 panic(err)
    1880             :         }
    1881           0 :         defer iter.Close()
    1882           0 : 
    1883           0 :         for ok := iter.SeekGE(r.start); ok && iter.Error() == nil; ok = iter.Next() {
    1884           0 :                 hasPoint, hasRange := iter.HasPointAndRange()
    1885           0 :                 if hasPoint {
    1886           0 :                         val, err := iter.ValueAndErr()
    1887           0 :                         if err != nil {
    1888           0 :                                 panic(err)
    1889             :                         }
    1890           0 :                         if err := w.Set(iter.Key(), val); err != nil {
    1891           0 :                                 panic(err)
    1892             :                         }
    1893             :                 }
    1894           0 :                 if hasRange && iter.RangeKeyChanged() {
    1895           0 :                         rangeKeys := iter.RangeKeys()
    1896           0 :                         rkStart, rkEnd := iter.RangeBounds()
    1897           0 : 
    1898           0 :                         span := &keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))}
    1899           0 :                         for i := range rangeKeys {
    1900           0 :                                 span.Keys[i] = keyspan.Key{
    1901           0 :                                         Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
    1902           0 :                                         Suffix:  rangeKeys[i].Suffix,
    1903           0 :                                         Value:   rangeKeys[i].Value,
    1904           0 :                                 }
    1905           0 :                         }
    1906           0 :                         keyspan.SortKeysByTrailer(&span.Keys)
    1907           0 :                         if err := rangekey.Encode(span, w.AddRangeKey); err != nil {
    1908           0 :                                 panic(err)
    1909             :                         }
    1910             :                 }
    1911             :         }
    1912           0 :         if err := iter.Error(); err != nil {
    1913           0 :                 h.Recordf("%s // %v", r, err)
    1914           0 :                 return
    1915           0 :         }
    1916           0 :         if err := w.Close(); err != nil {
    1917           0 :                 panic(err)
    1918             :         }
    1919             : 
    1920           0 :         err = dest.Ingest([]string{sstPath})
    1921           0 :         h.Recordf("%s // %v", r, err)
    1922             : }
    1923             : 
    1924           1 : func (r *replicateOp) String() string {
    1925           1 :         return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest, r.start, r.end)
    1926           1 : }
    1927             : 
    1928           0 : func (r *replicateOp) receiver() objID      { return r.source }
    1929           0 : func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }
    1930             : 
    1931           0 : func (r *replicateOp) keys() []*[]byte {
    1932           0 :         return []*[]byte{&r.start, &r.end}
    1933           0 : }
    1934             : 
    1935           1 : func (r *replicateOp) diagramKeyRanges() []pebble.KeyRange {
    1936           1 :         return []pebble.KeyRange{{Start: r.start, End: r.end}}
    1937           1 : }

Generated by: LCOV version 1.14