LCOV - code coverage report
Current view: top level - pebble/metamorphic - ops.go (source / functions) Hit Total Coverage
Test: 2025-01-06 08:17Z fca2fd50 - tests only.lcov Lines: 924 1459 63.3 %
Date: 2025-01-06 08:19:00 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "crypto/rand"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "path"
      15             :         "path/filepath"
      16             :         "slices"
      17             :         "strings"
      18             : 
      19             :         "github.com/cockroachdb/errors"
      20             :         "github.com/cockroachdb/pebble"
      21             :         "github.com/cockroachdb/pebble/internal/base"
      22             :         "github.com/cockroachdb/pebble/internal/keyspan"
      23             :         "github.com/cockroachdb/pebble/internal/private"
      24             :         "github.com/cockroachdb/pebble/internal/rangekey"
      25             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      26             :         "github.com/cockroachdb/pebble/sstable"
      27             :         "github.com/cockroachdb/pebble/vfs"
      28             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      29             : )
      30             : 
      31             : // Ops holds a sequence of operations to be executed by the metamorphic tests.
      32             : type Ops []op
      33             : 
      34             : // op defines the interface for a single operation, such as creating a batch,
      35             : // or advancing an iterator.
      36             : type op interface {
      37             :         formattedString(KeyFormat) string
      38             : 
      39             :         run(t *Test, h historyRecorder)
      40             : 
      41             :         // receiver returns the object ID of the object the operation is performed
      42             :         // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
      43             :         // its receiver). Receivers are used for synchronization when running with
      44             :         // concurrency.
      45             :         receiver() objID
      46             : 
      47             :         // syncObjs returns an additional set of object IDs—excluding the
      48             :         // receiver—that the operation must synchronize with. At execution time,
      49             :         // the operation will run serially with respect to all other operations
      50             :         // that return these objects from their own syncObjs or receiver methods.
      51             :         syncObjs() objIDSlice
      52             : 
      53             :         // rewriteKeys invokes fn for every user key used by the operation. The key
      54             :         // is updated to be the result of fn(key).
      55             :         //
      56             :         // Used for simplification of operations for easier investigations.
      57             :         rewriteKeys(fn func(UserKey) UserKey)
      58             : 
      59             :         // diagramKeyRanges() returns key spans associated with this operation, to be
      60             :         // shown on an ASCII diagram of operations.
      61             :         diagramKeyRanges() []pebble.KeyRange
      62             : }
      63             : 
      64             : // A UserKey is a user key used by the metamorphic test. The format is
      65             : // determined by the KeyFormat used by the test.
      66             : type UserKey []byte
      67             : 
      68             : // A UserKeySuffix is the suffix of a user key used by the metamorphic test. The
      69             : // format is determined by the KeyFormat used by the test.
      70             : type UserKeySuffix []byte
      71             : 
      72             : // initOp performs test initialization
      73             : type initOp struct {
      74             :         dbSlots          uint32
      75             :         batchSlots       uint32
      76             :         iterSlots        uint32
      77             :         snapshotSlots    uint32
      78             :         externalObjSlots uint32
      79             : }
      80             : 
      81           1 : func (o *initOp) run(t *Test, h historyRecorder) {
      82           1 :         t.batches = make([]*pebble.Batch, o.batchSlots)
      83           1 :         t.iters = make([]*retryableIter, o.iterSlots)
      84           1 :         t.snapshots = make([]readerCloser, o.snapshotSlots)
      85           1 :         t.externalObjs = make([]externalObjMeta, o.externalObjSlots)
      86           1 :         h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
      87           1 : }
      88             : 
      89           1 : func (o *initOp) formattedString(kf KeyFormat) string {
      90           1 :         return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */, %d /* externalObjs */)",
      91           1 :                 o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots, o.externalObjSlots)
      92           1 : }
      93             : 
      94           1 : func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
      95           1 : func (o *initOp) syncObjs() objIDSlice {
      96           1 :         syncObjs := make([]objID, 0)
      97           1 :         // Add any additional DBs to syncObjs.
      98           1 :         for i := uint32(2); i < o.dbSlots+1; i++ {
      99           0 :                 syncObjs = append(syncObjs, makeObjID(dbTag, i))
     100           0 :         }
     101           1 :         return syncObjs
     102             : }
     103             : 
     104           0 : func (o *initOp) rewriteKeys(func(UserKey) UserKey)   {}
     105           1 : func (o *initOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     106             : 
     107             : // applyOp models a Writer.Apply operation.
     108             : type applyOp struct {
     109             :         writerID objID
     110             :         batchID  objID
     111             : }
     112             : 
     113           1 : func (o *applyOp) run(t *Test, h historyRecorder) {
     114           1 :         b := t.getBatch(o.batchID)
     115           1 :         w := t.getWriter(o.writerID)
     116           1 :         var err error
     117           1 :         if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
     118           0 :                 err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
     119           0 :                 if err == nil {
     120           0 :                         err = b.SyncWait()
     121           0 :                 }
     122           1 :         } else {
     123           1 :                 err = w.Apply(b, t.writeOpts)
     124           1 :         }
     125           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     126             :         // batch will be closed by a closeOp which is guaranteed to be generated
     127             : }
     128             : 
     129           1 : func (o *applyOp) formattedString(KeyFormat) string {
     130           1 :         return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID)
     131           1 : }
     132           1 : func (o *applyOp) receiver() objID { return o.writerID }
     133           1 : func (o *applyOp) syncObjs() objIDSlice {
     134           1 :         // Apply should not be concurrent with operations that are mutating the
     135           1 :         // batch.
     136           1 :         return []objID{o.batchID}
     137           1 : }
     138             : 
     139           0 : func (o *applyOp) rewriteKeys(func(UserKey) UserKey)   {}
     140           0 : func (o *applyOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     141             : 
     142             : // checkpointOp models a DB.Checkpoint operation.
     143             : type checkpointOp struct {
     144             :         dbID objID
     145             :         // If non-empty, the checkpoint is restricted to these spans.
     146             :         spans []pebble.CheckpointSpan
     147             : }
     148             : 
     149           1 : func (o *checkpointOp) run(t *Test, h historyRecorder) {
     150           1 :         // TODO(josh): db.Checkpoint does not work with shared storage yet.
     151           1 :         // It would be better to filter out ahead of calling run on the op,
     152           1 :         // by setting the weight that generator.go uses to zero, or similar.
     153           1 :         // But IIUC the ops are shared for ALL the metamorphic test runs, so
     154           1 :         // not sure how to do that easily:
     155           1 :         // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
     156           1 :         if t.testOpts.sharedStorageEnabled || t.testOpts.externalStorageEnabled {
     157           0 :                 h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), nil)
     158           0 :                 return
     159           0 :         }
     160           1 :         var opts []pebble.CheckpointOption
     161           1 :         if len(o.spans) > 0 {
     162           1 :                 opts = append(opts, pebble.WithRestrictToSpans(o.spans))
     163           1 :         }
     164           1 :         db := t.getDB(o.dbID)
     165           1 :         err := t.withRetries(func() error {
     166           1 :                 return db.Checkpoint(o.dir(t.dir, h.op), opts...)
     167           1 :         })
     168           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     169             : }
     170             : 
     171           1 : func (o *checkpointOp) dir(dataDir string, idx int) string {
     172           1 :         return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
     173           1 : }
     174             : 
     175           1 : func (o *checkpointOp) formattedString(kf KeyFormat) string {
     176           1 :         var spanStr bytes.Buffer
     177           1 :         for i, span := range o.spans {
     178           1 :                 if i > 0 {
     179           1 :                         spanStr.WriteString(",")
     180           1 :                 }
     181           1 :                 fmt.Fprintf(&spanStr, "%q,%q", kf.FormatKey(span.Start), kf.FormatKey(span.End))
     182             :         }
     183           1 :         return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
     184             : }
     185             : 
     186           1 : func (o *checkpointOp) receiver() objID      { return o.dbID }
     187           1 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
     188           0 : func (o *checkpointOp) rewriteKeys(fn func(UserKey) UserKey) {
     189           0 :         for i := range o.spans {
     190           0 :                 o.spans[i].Start = fn(o.spans[i].Start)
     191           0 :                 o.spans[i].End = fn(o.spans[i].End)
     192           0 :         }
     193             : }
     194             : 
     195           0 : func (o *checkpointOp) diagramKeyRanges() []pebble.KeyRange {
     196           0 :         var res []pebble.KeyRange
     197           0 :         for i := range o.spans {
     198           0 :                 res = append(res, pebble.KeyRange{
     199           0 :                         Start: o.spans[i].Start,
     200           0 :                         End:   o.spans[i].End,
     201           0 :                 })
     202           0 :         }
     203           0 :         return res
     204             : }
     205             : 
     206             : // downloadOp models a DB.Download operation.
     207             : type downloadOp struct {
     208             :         dbID  objID
     209             :         spans []pebble.DownloadSpan
     210             : }
     211             : 
     212           1 : func (o *downloadOp) run(t *Test, h historyRecorder) {
     213           1 :         if t.testOpts.disableDownloads {
     214           0 :                 h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), nil)
     215           0 :                 return
     216           0 :         }
     217           1 :         db := t.getDB(o.dbID)
     218           1 :         err := t.withRetries(func() error {
     219           1 :                 return db.Download(context.Background(), o.spans)
     220           1 :         })
     221           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     222             : }
     223             : 
     224           1 : func (o *downloadOp) formattedString(kf KeyFormat) string {
     225           1 :         var spanStr bytes.Buffer
     226           1 :         for i, span := range o.spans {
     227           1 :                 if i > 0 {
     228           1 :                         spanStr.WriteString(", ")
     229           1 :                 }
     230           1 :                 fmt.Fprintf(&spanStr, "%q /* start */, %q /* end */, %v /* viaBackingFileDownload */",
     231           1 :                         kf.FormatKey(span.StartKey),
     232           1 :                         kf.FormatKey(span.EndKey),
     233           1 :                         span.ViaBackingFileDownload)
     234             :         }
     235           1 :         return fmt.Sprintf("%s.Download(%s)", o.dbID, spanStr.String())
     236             : }
     237             : 
     238           1 : func (o *downloadOp) receiver() objID     { return o.dbID }
     239           1 : func (o downloadOp) syncObjs() objIDSlice { return nil }
     240             : 
     241           0 : func (o *downloadOp) rewriteKeys(fn func(UserKey) UserKey) {
     242           0 :         for i := range o.spans {
     243           0 :                 o.spans[i].StartKey = fn(o.spans[i].StartKey)
     244           0 :                 o.spans[i].EndKey = fn(o.spans[i].EndKey)
     245           0 :         }
     246             : }
     247             : 
     248           0 : func (o *downloadOp) diagramKeyRanges() []pebble.KeyRange {
     249           0 :         var res []pebble.KeyRange
     250           0 :         for i := range o.spans {
     251           0 :                 res = append(res, pebble.KeyRange{
     252           0 :                         Start: o.spans[i].StartKey,
     253           0 :                         End:   o.spans[i].EndKey,
     254           0 :                 })
     255           0 :         }
     256           0 :         return res
     257             : }
     258             : 
     259             : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
     260             : type closeOp struct {
     261             :         objID objID
     262             : 
     263             :         // affectedObjects is the list of additional objects that are affected by this
     264             :         // operation, and which syncObjs() must return so that we don't perform the
     265             :         // close in parallel with other operations to affected objects.
     266             :         affectedObjects []objID
     267             : }
     268             : 
     269           1 : func (o *closeOp) run(t *Test, h historyRecorder) {
     270           1 :         c := t.getCloser(o.objID)
     271           1 :         if o.objID.tag() == dbTag && t.opts.DisableWAL {
     272           1 :                 // Special case: If WAL is disabled, do a flush right before DB Close. This
     273           1 :                 // allows us to reuse this run's data directory as initial state for
     274           1 :                 // future runs without losing any mutations.
     275           1 :                 _ = t.getDB(o.objID).Flush()
     276           1 :         }
     277           1 :         t.clearObj(o.objID)
     278           1 :         err := c.Close()
     279           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     280             : }
     281             : 
     282           1 : func (o *closeOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.Close()", o.objID) }
     283           1 : func (o *closeOp) receiver() objID                  { return o.objID }
     284           1 : func (o *closeOp) syncObjs() objIDSlice {
     285           1 :         return o.affectedObjects
     286           1 : }
     287             : 
     288           0 : func (o *closeOp) rewriteKeys(func(UserKey) UserKey)   {}
     289           0 : func (o *closeOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     290             : 
     291             : // compactOp models a DB.Compact operation.
     292             : type compactOp struct {
     293             :         dbID        objID
     294             :         start       UserKey
     295             :         end         UserKey
     296             :         parallelize bool
     297             : }
     298             : 
     299           1 : func (o *compactOp) run(t *Test, h historyRecorder) {
     300           1 :         err := t.withRetries(func() error {
     301           1 :                 return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
     302           1 :         })
     303           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     304             : }
     305             : 
     306           1 : func (o *compactOp) formattedString(kf KeyFormat) string {
     307           1 :         return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)",
     308           1 :                 o.dbID, kf.FormatKey(o.start), kf.FormatKey(o.end), o.parallelize)
     309           1 : }
     310             : 
     311           1 : func (o *compactOp) receiver() objID      { return o.dbID }
     312           1 : func (o *compactOp) syncObjs() objIDSlice { return nil }
     313             : 
     314           1 : func (o *compactOp) rewriteKeys(fn func(UserKey) UserKey) {
     315           1 :         o.start = fn(o.start)
     316           1 :         o.end = fn(o.end)
     317           1 : }
     318             : 
     319           1 : func (o *compactOp) diagramKeyRanges() []pebble.KeyRange {
     320           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     321           1 : }
     322             : 
     323             : // deleteOp models a Write.Delete operation.
     324             : type deleteOp struct {
     325             :         writerID objID
     326             :         key      UserKey
     327             : 
     328             :         derivedDBID objID
     329             : }
     330             : 
     331           1 : func (o *deleteOp) run(t *Test, h historyRecorder) {
     332           1 :         w := t.getWriter(o.writerID)
     333           1 :         var err error
     334           1 :         if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
     335           1 :                 // Call DeleteSized with a deterministic size derived from the index.
     336           1 :                 // The size does not need to be accurate for correctness.
     337           1 :                 err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
     338           1 :         } else {
     339           1 :                 err = w.Delete(o.key, t.writeOpts)
     340           1 :         }
     341           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     342             : }
     343             : 
     344           1 : func hashSize(index int) uint32 {
     345           1 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
     346           1 :         return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
     347           1 : }
     348             : 
     349           1 : func (o *deleteOp) formattedString(kf KeyFormat) string {
     350           1 :         return fmt.Sprintf("%s.Delete(%q)", o.writerID, kf.FormatKey(o.key))
     351           1 : }
     352           1 : func (o *deleteOp) receiver() objID      { return o.writerID }
     353           1 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
     354             : 
     355           0 : func (o *deleteOp) rewriteKeys(fn func(UserKey) UserKey) {
     356           0 :         o.key = fn(o.key)
     357           0 : }
     358             : 
     359           0 : func (o *deleteOp) diagramKeyRanges() []pebble.KeyRange {
     360           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     361           0 : }
     362             : 
     363             : // singleDeleteOp models a Write.SingleDelete operation.
     364             : type singleDeleteOp struct {
     365             :         writerID           objID
     366             :         key                UserKey
     367             :         maybeReplaceDelete bool
     368             : }
     369             : 
     370           1 : func (o *singleDeleteOp) run(t *Test, h historyRecorder) {
     371           1 :         w := t.getWriter(o.writerID)
     372           1 :         var err error
     373           1 :         if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
     374           1 :                 err = w.Delete(o.key, t.writeOpts)
     375           1 :         } else {
     376           1 :                 err = w.SingleDelete(o.key, t.writeOpts)
     377           1 :         }
     378             :         // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
     379             :         // write the former to the history log. The log line will indicate whether
     380             :         // or not the delete *could* have been replaced. The OPTIONS file should
     381             :         // also be consulted to determine what happened at runtime (i.e. by taking
     382             :         // the logical AND).
     383           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     384             : }
     385             : 
     386           1 : func (o *singleDeleteOp) formattedString(kf KeyFormat) string {
     387           1 :         return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)",
     388           1 :                 o.writerID, kf.FormatKey(o.key), o.maybeReplaceDelete)
     389           1 : }
     390             : 
     391           1 : func (o *singleDeleteOp) receiver() objID      { return o.writerID }
     392           1 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
     393             : 
     394           0 : func (o *singleDeleteOp) rewriteKeys(fn func(UserKey) UserKey) {
     395           0 :         o.key = fn(o.key)
     396           0 : }
     397             : 
     398           0 : func (o *singleDeleteOp) diagramKeyRanges() []pebble.KeyRange {
     399           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     400           0 : }
     401             : 
     402             : // deleteRangeOp models a Write.DeleteRange operation.
     403             : type deleteRangeOp struct {
     404             :         writerID objID
     405             :         start    UserKey
     406             :         end      UserKey
     407             : }
     408             : 
     409           1 : func (o *deleteRangeOp) run(t *Test, h historyRecorder) {
     410           1 :         w := t.getWriter(o.writerID)
     411           1 :         err := w.DeleteRange(o.start, o.end, t.writeOpts)
     412           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     413           1 : }
     414             : 
     415           1 : func (o *deleteRangeOp) formattedString(kf KeyFormat) string {
     416           1 :         return fmt.Sprintf("%s.DeleteRange(%q, %q)",
     417           1 :                 o.writerID, kf.FormatKey(o.start), kf.FormatKey(o.end))
     418           1 : }
     419             : 
     420           1 : func (o *deleteRangeOp) receiver() objID      { return o.writerID }
     421           1 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
     422             : 
     423           0 : func (o *deleteRangeOp) rewriteKeys(fn func(UserKey) UserKey) {
     424           0 :         o.start = fn(o.start)
     425           0 :         o.end = fn(o.end)
     426           0 : }
     427             : 
     428           0 : func (o *deleteRangeOp) diagramKeyRanges() []pebble.KeyRange {
     429           0 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     430           0 : }
     431             : 
     432             : // flushOp models a DB.Flush operation.
     433             : type flushOp struct {
     434             :         db objID
     435             : }
     436             : 
     437           1 : func (o *flushOp) run(t *Test, h historyRecorder) {
     438           1 :         db := t.getDB(o.db)
     439           1 :         err := db.Flush()
     440           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     441           1 : }
     442             : 
     443           1 : func (o *flushOp) formattedString(KeyFormat) string    { return fmt.Sprintf("%s.Flush()", o.db) }
     444           1 : func (o *flushOp) receiver() objID                     { return o.db }
     445           1 : func (o *flushOp) syncObjs() objIDSlice                { return nil }
     446           0 : func (o *flushOp) rewriteKeys(func(UserKey) UserKey)   {}
     447           0 : func (o *flushOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     448             : 
     449             : // mergeOp models a Write.Merge operation.
     450             : type mergeOp struct {
     451             :         writerID objID
     452             :         key      UserKey
     453             :         value    []byte
     454             : }
     455             : 
     456           1 : func (o *mergeOp) run(t *Test, h historyRecorder) {
     457           1 :         w := t.getWriter(o.writerID)
     458           1 :         err := w.Merge(o.key, o.value, t.writeOpts)
     459           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     460           1 : }
     461             : 
     462           1 : func (o *mergeOp) formattedString(kf KeyFormat) string {
     463           1 :         return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, kf.FormatKey(o.key), o.value)
     464           1 : }
     465           1 : func (o *mergeOp) receiver() objID      { return o.writerID }
     466           1 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
     467             : 
     468           0 : func (o *mergeOp) rewriteKeys(fn func(UserKey) UserKey) {
     469           0 :         o.key = fn(o.key)
     470           0 : }
     471             : 
     472           0 : func (o *mergeOp) diagramKeyRanges() []pebble.KeyRange {
     473           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     474           0 : }
     475             : 
     476             : // setOp models a Write.Set operation.
     477             : type setOp struct {
     478             :         writerID objID
     479             :         key      UserKey
     480             :         value    []byte
     481             : }
     482             : 
     483           1 : func (o *setOp) run(t *Test, h historyRecorder) {
     484           1 :         w := t.getWriter(o.writerID)
     485           1 :         err := w.Set(o.key, o.value, t.writeOpts)
     486           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     487           1 : }
     488             : 
     489           1 : func (o *setOp) formattedString(kf KeyFormat) string {
     490           1 :         return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, kf.FormatKey(o.key), o.value)
     491           1 : }
     492           1 : func (o *setOp) receiver() objID      { return o.writerID }
     493           1 : func (o *setOp) syncObjs() objIDSlice { return nil }
     494             : 
     495           0 : func (o *setOp) rewriteKeys(fn func(UserKey) UserKey) {
     496           0 :         o.key = fn(o.key)
     497           0 : }
     498             : 
     499           0 : func (o *setOp) diagramKeyRanges() []pebble.KeyRange {
     500           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     501           0 : }
     502             : 
     503             : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
     504             : type rangeKeyDeleteOp struct {
     505             :         writerID objID
     506             :         start    []byte
     507             :         end      []byte
     508             : }
     509             : 
     510           1 : func (o *rangeKeyDeleteOp) run(t *Test, h historyRecorder) {
     511           1 :         w := t.getWriter(o.writerID)
     512           1 :         err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
     513           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     514           1 : }
     515             : 
     516           1 : func (o *rangeKeyDeleteOp) formattedString(kf KeyFormat) string {
     517           1 :         return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)",
     518           1 :                 o.writerID, kf.FormatKey(o.start), kf.FormatKey(o.end))
     519           1 : }
     520             : 
     521           1 : func (o *rangeKeyDeleteOp) receiver() objID      { return o.writerID }
     522           1 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
     523             : 
     524           0 : func (o *rangeKeyDeleteOp) rewriteKeys(fn func(UserKey) UserKey) {
     525           0 :         o.start = fn(o.start)
     526           0 :         o.end = fn(o.end)
     527           0 : }
     528             : 
     529           1 : func (o *rangeKeyDeleteOp) diagramKeyRanges() []pebble.KeyRange {
     530           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     531           1 : }
     532             : 
     533             : // rangeKeySetOp models a Write.RangeKeySet operation.
     534             : type rangeKeySetOp struct {
     535             :         writerID objID
     536             :         start    []byte
     537             :         end      []byte
     538             :         suffix   []byte
     539             :         value    []byte
     540             : }
     541             : 
     542           1 : func (o *rangeKeySetOp) run(t *Test, h historyRecorder) {
     543           1 :         w := t.getWriter(o.writerID)
     544           1 :         err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
     545           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     546           1 : }
     547             : 
     548           1 : func (o *rangeKeySetOp) formattedString(kf KeyFormat) string {
     549           1 :         return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
     550           1 :                 o.writerID, kf.FormatKey(o.start), kf.FormatKey(o.end),
     551           1 :                 kf.FormatKeySuffix(o.suffix), o.value)
     552           1 : }
     553             : 
     554           1 : func (o *rangeKeySetOp) receiver() objID      { return o.writerID }
     555           1 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
     556             : 
     557           1 : func (o *rangeKeySetOp) rewriteKeys(fn func(UserKey) UserKey) {
     558           1 :         o.start = fn(o.start)
     559           1 :         o.end = fn(o.end)
     560           1 : }
     561             : 
     562           1 : func (o *rangeKeySetOp) diagramKeyRanges() []pebble.KeyRange {
     563           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     564           1 : }
     565             : 
     566             : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
     567             : type rangeKeyUnsetOp struct {
     568             :         writerID objID
     569             :         start    UserKey
     570             :         end      UserKey
     571             :         suffix   UserKeySuffix
     572             : }
     573             : 
     574           1 : func (o *rangeKeyUnsetOp) run(t *Test, h historyRecorder) {
     575           1 :         w := t.getWriter(o.writerID)
     576           1 :         err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
     577           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     578           1 : }
     579             : 
     580           1 : func (o *rangeKeyUnsetOp) formattedString(kf KeyFormat) string {
     581           1 :         return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
     582           1 :                 o.writerID, kf.FormatKey(o.start), kf.FormatKey(o.end),
     583           1 :                 kf.FormatKeySuffix(o.suffix))
     584           1 : }
     585             : 
     586           1 : func (o *rangeKeyUnsetOp) receiver() objID      { return o.writerID }
     587           1 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
     588             : 
     589           0 : func (o *rangeKeyUnsetOp) rewriteKeys(fn func(UserKey) UserKey) {
     590           0 :         o.start = fn(o.start)
     591           0 :         o.end = fn(o.end)
     592           0 : }
     593             : 
     594           0 : func (o *rangeKeyUnsetOp) diagramKeyRanges() []pebble.KeyRange {
     595           0 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     596           0 : }
     597             : 
     598             : // logDataOp models a Writer.LogData operation.
     599             : type logDataOp struct {
     600             :         writerID objID
     601             :         data     []byte
     602             : }
     603             : 
     604           1 : func (o *logDataOp) run(t *Test, h historyRecorder) {
     605           1 :         w := t.getWriter(o.writerID)
     606           1 :         err := w.LogData(o.data, t.writeOpts)
     607           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     608           1 : }
     609             : 
     610           1 : func (o *logDataOp) formattedString(KeyFormat) string {
     611           1 :         return fmt.Sprintf("%s.LogData(%q)", o.writerID, o.data)
     612           1 : }
     613             : 
     614           1 : func (o *logDataOp) receiver() objID                     { return o.writerID }
     615           1 : func (o *logDataOp) syncObjs() objIDSlice                { return nil }
     616           0 : func (o *logDataOp) rewriteKeys(func(UserKey) UserKey)   {}
     617           0 : func (o *logDataOp) diagramKeyRanges() []pebble.KeyRange { return []pebble.KeyRange{} }
     618             : 
     619             : // newBatchOp models a Write.NewBatch operation.
     620             : type newBatchOp struct {
     621             :         dbID    objID
     622             :         batchID objID
     623             : }
     624             : 
     625           1 : func (o *newBatchOp) run(t *Test, h historyRecorder) {
     626           1 :         b := t.getDB(o.dbID).NewBatch()
     627           1 :         t.setBatch(o.batchID, b)
     628           1 :         h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
     629           1 : }
     630             : 
     631           1 : func (o *newBatchOp) formattedString(KeyFormat) string {
     632           1 :         return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID)
     633           1 : }
     634           1 : func (o *newBatchOp) receiver() objID { return o.dbID }
     635           1 : func (o *newBatchOp) syncObjs() objIDSlice {
     636           1 :         // NewBatch should not be concurrent with operations that interact with that
     637           1 :         // same batch.
     638           1 :         return []objID{o.batchID}
     639           1 : }
     640             : 
     641           0 : func (o *newBatchOp) rewriteKeys(fn func(UserKey) UserKey) {}
     642           0 : func (o *newBatchOp) diagramKeyRanges() []pebble.KeyRange  { return nil }
     643             : 
     644             : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
     645             : type newIndexedBatchOp struct {
     646             :         dbID    objID
     647             :         batchID objID
     648             : }
     649             : 
     650           1 : func (o *newIndexedBatchOp) run(t *Test, h historyRecorder) {
     651           1 :         b := t.getDB(o.dbID).NewIndexedBatch()
     652           1 :         t.setBatch(o.batchID, b)
     653           1 :         h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
     654           1 : }
     655             : 
     656           1 : func (o *newIndexedBatchOp) formattedString(KeyFormat) string {
     657           1 :         return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
     658           1 : }
     659           1 : func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
     660           1 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
     661           1 :         // NewIndexedBatch should not be concurrent with operations that interact
     662           1 :         // with that same batch.
     663           1 :         return []objID{o.batchID}
     664           1 : }
     665             : 
     666           0 : func (o *newIndexedBatchOp) rewriteKeys(func(UserKey) UserKey)   {}
     667           0 : func (o *newIndexedBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     668             : 
     669             : // batchCommitOp models a Batch.Commit operation.
     670             : type batchCommitOp struct {
     671             :         dbID    objID
     672             :         batchID objID
     673             : }
     674             : 
     675           1 : func (o *batchCommitOp) run(t *Test, h historyRecorder) {
     676           1 :         b := t.getBatch(o.batchID)
     677           1 :         err := b.Commit(t.writeOpts)
     678           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     679           1 : }
     680             : 
     681           1 : func (o *batchCommitOp) formattedString(KeyFormat) string {
     682           1 :         return fmt.Sprintf("%s.Commit()", o.batchID)
     683           1 : }
     684           1 : func (o *batchCommitOp) receiver() objID { return o.batchID }
     685           1 : func (o *batchCommitOp) syncObjs() objIDSlice {
     686           1 :         // Synchronize on the database so that NewIters wait for the commit.
     687           1 :         return []objID{o.dbID}
     688           1 : }
     689             : 
     690           0 : func (o *batchCommitOp) rewriteKeys(func(UserKey) UserKey)   {}
     691           0 : func (o *batchCommitOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     692             : 
     693             : // ingestOp models a DB.Ingest operation.
     694             : type ingestOp struct {
     695             :         dbID     objID
     696             :         batchIDs []objID
     697             : 
     698             :         derivedDBIDs []objID
     699             : }
     700             : 
     701           1 : func (o *ingestOp) run(t *Test, h historyRecorder) {
     702           1 :         // We can only use apply as an alternative for ingestion if we are ingesting
     703           1 :         // a single batch. If we are ingesting multiple batches, the batches may
     704           1 :         // overlap which would cause ingestion to fail but apply would succeed.
     705           1 :         if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
     706           0 :                 id := o.batchIDs[0]
     707           0 :                 b := t.getBatch(id)
     708           0 :                 iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     709           0 :                 db := t.getDB(o.dbID)
     710           0 :                 c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
     711           0 :                 if err == nil {
     712           0 :                         err = db.Apply(c, t.writeOpts)
     713           0 :                 }
     714           0 :                 _ = b.Close()
     715           0 :                 _ = c.Close()
     716           0 :                 t.clearObj(id)
     717           0 :                 h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     718           0 :                 return
     719             :         }
     720             : 
     721           1 :         var paths []string
     722           1 :         var err error
     723           1 :         for i, id := range o.batchIDs {
     724           1 :                 b := t.getBatch(id)
     725           1 :                 t.clearObj(id)
     726           1 :                 path, _, err2 := buildForIngest(t, o.dbID, b, i)
     727           1 :                 if err2 != nil {
     728           0 :                         h.Recordf("Build(%s) // %v", id, err2)
     729           0 :                 }
     730           1 :                 err = firstError(err, err2)
     731           1 :                 if err2 == nil {
     732           1 :                         paths = append(paths, path)
     733           1 :                 }
     734           1 :                 err = firstError(err, b.Close())
     735             :         }
     736             : 
     737           1 :         err = firstError(err, t.withRetries(func() error {
     738           1 :                 return t.getDB(o.dbID).Ingest(context.Background(), paths)
     739           1 :         }))
     740             : 
     741           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     742             : }
     743             : 
     744           1 : func (o *ingestOp) receiver() objID { return o.dbID }
     745           1 : func (o *ingestOp) syncObjs() objIDSlice {
     746           1 :         // Ingest should not be concurrent with mutating the batches that will be
     747           1 :         // ingested as sstables.
     748           1 :         objs := make([]objID, 0, len(o.batchIDs)+1)
     749           1 :         objs = append(objs, o.batchIDs...)
     750           1 :         addedDBs := make(map[objID]struct{})
     751           1 :         for i := range o.derivedDBIDs {
     752           1 :                 _, ok := addedDBs[o.derivedDBIDs[i]]
     753           1 :                 if !ok && o.derivedDBIDs[i] != o.dbID {
     754           0 :                         objs = append(objs, o.derivedDBIDs[i])
     755           0 :                         addedDBs[o.derivedDBIDs[i]] = struct{}{}
     756           0 :                 }
     757             :         }
     758           1 :         return objs
     759             : }
     760             : 
     761             : func closeIters(
     762             :         pointIter base.InternalIterator,
     763             :         rangeDelIter keyspan.FragmentIterator,
     764             :         rangeKeyIter keyspan.FragmentIterator,
     765           0 : ) {
     766           0 :         if pointIter != nil {
     767           0 :                 pointIter.Close()
     768           0 :         }
     769           0 :         if rangeDelIter != nil {
     770           0 :                 rangeDelIter.Close()
     771           0 :         }
     772           0 :         if rangeKeyIter != nil {
     773           0 :                 rangeKeyIter.Close()
     774           0 :         }
     775             : }
     776             : 
     777             : // collapseBatch collapses the mutations in a batch to be equivalent to an
     778             : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
     779             : // so that only the latest update is performed. All range deletions are
     780             : // performed first in the batch to match the semantics of ingestion where a
     781             : // range deletion does not delete a point record contained in the sstable.
     782             : func (o *ingestOp) collapseBatch(
     783             :         t *Test,
     784             :         db *pebble.DB,
     785             :         pointIter base.InternalIterator,
     786             :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     787             :         b *pebble.Batch,
     788           0 : ) (*pebble.Batch, error) {
     789           0 :         defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
     790           0 :         equal := t.opts.Comparer.Equal
     791           0 :         collapsed := db.NewBatch()
     792           0 : 
     793           0 :         if rangeDelIter != nil {
     794           0 :                 // NB: The range tombstones have already been fragmented by the Batch.
     795           0 :                 t, err := rangeDelIter.First()
     796           0 :                 for ; t != nil; t, err = rangeDelIter.Next() {
     797           0 :                         // NB: We don't have to copy the key or value since we're reading from a
     798           0 :                         // batch which doesn't do prefix compression.
     799           0 :                         if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
     800           0 :                                 return nil, err
     801           0 :                         }
     802             :                 }
     803           0 :                 if err != nil {
     804           0 :                         return nil, err
     805           0 :                 }
     806           0 :                 rangeDelIter.Close()
     807           0 :                 rangeDelIter = nil
     808             :         }
     809             : 
     810           0 :         if pointIter != nil {
     811           0 :                 var lastUserKey []byte
     812           0 :                 for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
     813           0 :                         // Ignore duplicate keys.
     814           0 :                         //
     815           0 :                         // Note: this is necessary due to MERGE keys, otherwise it would be
     816           0 :                         // fine to include all the keys in the batch and let the normal
     817           0 :                         // sequence number precedence determine which of the keys "wins".
     818           0 :                         // But the code to build the ingested sstable will only keep the
     819           0 :                         // most recent internal key and will not merge across internal keys.
     820           0 :                         if equal(lastUserKey, kv.K.UserKey) {
     821           0 :                                 continue
     822             :                         }
     823             :                         // NB: We don't have to copy the key or value since we're reading from a
     824             :                         // batch which doesn't do prefix compression.
     825           0 :                         lastUserKey = kv.K.UserKey
     826           0 : 
     827           0 :                         var err error
     828           0 :                         switch kv.Kind() {
     829           0 :                         case pebble.InternalKeyKindDelete:
     830           0 :                                 err = collapsed.Delete(kv.K.UserKey, nil)
     831           0 :                         case pebble.InternalKeyKindDeleteSized:
     832           0 :                                 v, _ := binary.Uvarint(kv.InPlaceValue())
     833           0 :                                 // Batch.DeleteSized takes just the length of the value being
     834           0 :                                 // deleted and adds the key's length to derive the overall entry
     835           0 :                                 // size of the value being deleted. This has already been done
     836           0 :                                 // to the key we're reading from the batch, so we must subtract
     837           0 :                                 // the key length from the encoded value before calling
     838           0 :                                 // collapsed.DeleteSized, which will again add the key length
     839           0 :                                 // before encoding.
     840           0 :                                 err = collapsed.DeleteSized(kv.K.UserKey, uint32(v-uint64(len(kv.K.UserKey))), nil)
     841           0 :                         case pebble.InternalKeyKindSingleDelete:
     842           0 :                                 err = collapsed.SingleDelete(kv.K.UserKey, nil)
     843           0 :                         case pebble.InternalKeyKindSet:
     844           0 :                                 err = collapsed.Set(kv.K.UserKey, kv.InPlaceValue(), nil)
     845           0 :                         case pebble.InternalKeyKindMerge:
     846           0 :                                 err = collapsed.Merge(kv.K.UserKey, kv.InPlaceValue(), nil)
     847           0 :                         case pebble.InternalKeyKindLogData:
     848           0 :                                 err = collapsed.LogData(kv.K.UserKey, nil)
     849           0 :                         default:
     850           0 :                                 err = errors.Errorf("unknown batch record kind: %d", kv.Kind())
     851             :                         }
     852           0 :                         if err != nil {
     853           0 :                                 return nil, err
     854           0 :                         }
     855             :                 }
     856           0 :                 if err := pointIter.Close(); err != nil {
     857           0 :                         return nil, err
     858           0 :                 }
     859           0 :                 pointIter = nil
     860             :         }
     861             : 
     862             :         // There's no equivalent of a MERGE operator for range keys, so there's no
     863             :         // need to collapse the range keys here. Rather than reading the range keys
     864             :         // from `rangeKeyIter`, which will already be fragmented, read the range
     865             :         // keys from the batch and copy them verbatim. This marginally improves our
     866             :         // test coverage over the alternative approach of pre-fragmenting and
     867             :         // pre-coalescing before writing to the batch.
     868             :         //
     869             :         // The `rangeKeyIter` is used only to determine if there are any range keys
     870             :         // in the batch at all, and only because we already have it handy from
     871             :         // private.BatchSort.
     872           0 :         if rangeKeyIter != nil {
     873           0 :                 for r := b.Reader(); ; {
     874           0 :                         kind, key, value, ok, err := r.Next()
     875           0 :                         if !ok {
     876           0 :                                 if err != nil {
     877           0 :                                         return nil, err
     878           0 :                                 }
     879           0 :                                 break
     880           0 :                         } else if !rangekey.IsRangeKey(kind) {
     881           0 :                                 continue
     882             :                         }
     883           0 :                         ik := base.MakeInternalKey(key, 0, kind)
     884           0 :                         if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
     885           0 :                                 return nil, err
     886           0 :                         }
     887             :                 }
     888           0 :                 rangeKeyIter.Close()
     889           0 :                 rangeKeyIter = nil
     890             :         }
     891             : 
     892           0 :         return collapsed, nil
     893             : }
     894             : 
     895           1 : func (o *ingestOp) formattedString(KeyFormat) string {
     896           1 :         var buf strings.Builder
     897           1 :         buf.WriteString(o.dbID.String())
     898           1 :         buf.WriteString(".Ingest(")
     899           1 :         for i, id := range o.batchIDs {
     900           1 :                 if i > 0 {
     901           1 :                         buf.WriteString(", ")
     902           1 :                 }
     903           1 :                 buf.WriteString(id.String())
     904             :         }
     905           1 :         buf.WriteString(")")
     906           1 :         return buf.String()
     907             : }
     908             : 
     909           0 : func (o *ingestOp) rewriteKeys(func(UserKey) UserKey)   {}
     910           0 : func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     911             : 
     912             : type ingestAndExciseOp struct {
     913             :         dbID                   objID
     914             :         batchID                objID
     915             :         derivedDBID            objID
     916             :         exciseStart, exciseEnd UserKey
     917             : }
     918             : 
     919           1 : func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) {
     920           1 :         var err error
     921           1 :         b := t.getBatch(o.batchID)
     922           1 :         t.clearObj(o.batchID)
     923           1 :         if t.testOpts.Opts.Comparer.Compare(o.exciseEnd, o.exciseStart) <= 0 {
     924           0 :                 panic("non-well-formed excise span")
     925             :         }
     926           1 :         db := t.getDB(o.dbID)
     927           1 :         if b.Empty() {
     928           1 :                 h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat),
     929           1 :                         o.simulateExcise(db, t))
     930           1 :                 return
     931           1 :         }
     932             : 
     933           1 :         path, writerMeta, err2 := buildForIngest(t, o.dbID, b, 0 /* i */)
     934           1 :         if err2 != nil {
     935           0 :                 h.Recordf("Build(%s) // %v", o.batchID, err2)
     936           0 :                 return
     937           0 :         }
     938           1 :         err = firstError(err, b.Close())
     939           1 : 
     940           1 :         if writerMeta.Properties.NumEntries == 0 && writerMeta.Properties.NumRangeKeys() == 0 {
     941           0 :                 h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), o.simulateExcise(db, t))
     942           0 :                 return
     943           0 :         }
     944             : 
     945           1 :         if t.testOpts.useExcise {
     946           1 :                 err = firstError(err, t.withRetries(func() error {
     947           1 :                         _, err := db.IngestAndExcise(context.Background(), []string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{
     948           1 :                                 Start: o.exciseStart,
     949           1 :                                 End:   o.exciseEnd,
     950           1 :                         })
     951           1 :                         return err
     952           1 :                 }))
     953           1 :         } else {
     954           1 :                 err = firstError(err, o.simulateExcise(db, t))
     955           1 :                 err = firstError(err, t.withRetries(func() error {
     956           1 :                         return db.Ingest(context.Background(), []string{path})
     957           1 :                 }))
     958             :         }
     959             : 
     960           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
     961             : }
     962             : 
     963           1 : func (o *ingestAndExciseOp) simulateExcise(db *pebble.DB, t *Test) error {
     964           1 :         // Simulate the excise using a DeleteRange and RangeKeyDelete.
     965           1 :         return errors.CombineErrors(
     966           1 :                 db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts),
     967           1 :                 db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts),
     968           1 :         )
     969           1 : }
     970             : 
     971           1 : func (o *ingestAndExciseOp) receiver() objID { return o.dbID }
     972           1 : func (o *ingestAndExciseOp) syncObjs() objIDSlice {
     973           1 :         // Ingest should not be concurrent with mutating the batches that will be
     974           1 :         // ingested as sstables.
     975           1 :         objs := []objID{o.batchID}
     976           1 :         if o.derivedDBID != o.dbID {
     977           0 :                 objs = append(objs, o.derivedDBID)
     978           0 :         }
     979           1 :         return objs
     980             : }
     981             : 
     982           1 : func (o *ingestAndExciseOp) formattedString(kf KeyFormat) string {
     983           1 :         return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)",
     984           1 :                 o.dbID, o.batchID, kf.FormatKey(o.exciseStart), kf.FormatKey(o.exciseEnd))
     985           1 : }
     986             : 
     987           0 : func (o *ingestAndExciseOp) rewriteKeys(fn func(UserKey) UserKey) {
     988           0 :         o.exciseStart = fn(o.exciseStart)
     989           0 :         o.exciseEnd = fn(o.exciseEnd)
     990           0 : }
     991             : 
     992           0 : func (o *ingestAndExciseOp) diagramKeyRanges() []pebble.KeyRange {
     993           0 :         return []pebble.KeyRange{{Start: o.exciseStart, End: o.exciseEnd}}
     994           0 : }
     995             : 
     996             : // ingestExternalFilesOp models a DB.IngestExternalFiles operation.
     997             : //
     998             : // When remote storage is not enabled, the operation is emulated using the
     999             : // regular DB.Ingest; this serves as a cross-check of the result.
    1000             : type ingestExternalFilesOp struct {
    1001             :         dbID objID
    1002             :         // The bounds of the objects cannot overlap.
    1003             :         objs []externalObjWithBounds
    1004             : }
    1005             : 
    1006             : type externalObjWithBounds struct {
    1007             :         externalObjID objID
    1008             : 
    1009             :         // bounds for the external object. These bounds apply after keys undergo
    1010             :         // any prefix or suffix transforms.
    1011             :         bounds pebble.KeyRange
    1012             : 
    1013             :         syntheticPrefix sstable.SyntheticPrefix
    1014             :         syntheticSuffix sstable.SyntheticSuffix
    1015             : }
    1016             : 
    1017           1 : func (o *ingestExternalFilesOp) run(t *Test, h historyRecorder) {
    1018           1 :         db := t.getDB(o.dbID)
    1019           1 : 
    1020           1 :         // Verify the objects exist (useful for --try-to-reduce).
    1021           1 :         for i := range o.objs {
    1022           1 :                 t.getExternalObj(o.objs[i].externalObjID)
    1023           1 :         }
    1024             : 
    1025           1 :         var err error
    1026           1 :         if !t.testOpts.externalStorageEnabled {
    1027           1 :                 // Emulate the operation by crating local, truncated SST files and ingesting
    1028           1 :                 // them.
    1029           1 :                 var paths []string
    1030           1 :                 for i, obj := range o.objs {
    1031           1 :                         // Make sure the object exists and is not empty.
    1032           1 :                         path, sstMeta := buildForIngestExternalEmulation(
    1033           1 :                                 t, o.dbID, obj.externalObjID, obj.bounds, obj.syntheticSuffix, obj.syntheticPrefix, i,
    1034           1 :                         )
    1035           1 :                         if sstMeta.HasPointKeys || sstMeta.HasRangeKeys || sstMeta.HasRangeDelKeys {
    1036           1 :                                 paths = append(paths, path)
    1037           1 :                         }
    1038             :                 }
    1039           1 :                 if len(paths) > 0 {
    1040           1 :                         err = db.Ingest(context.Background(), paths)
    1041           1 :                 }
    1042           0 :         } else {
    1043           0 :                 external := make([]pebble.ExternalFile, len(o.objs))
    1044           0 :                 for i, obj := range o.objs {
    1045           0 :                         meta := t.getExternalObj(obj.externalObjID)
    1046           0 :                         external[i] = pebble.ExternalFile{
    1047           0 :                                 Locator:           "external",
    1048           0 :                                 ObjName:           externalObjName(obj.externalObjID),
    1049           0 :                                 Size:              meta.sstMeta.Size,
    1050           0 :                                 StartKey:          obj.bounds.Start,
    1051           0 :                                 EndKey:            obj.bounds.End,
    1052           0 :                                 EndKeyIsInclusive: false,
    1053           0 :                                 // Note: if the table has point/range keys, we don't know for sure whether
    1054           0 :                                 // this particular range has any, but that's acceptable.
    1055           0 :                                 HasPointKey:     meta.sstMeta.HasPointKeys || meta.sstMeta.HasRangeDelKeys,
    1056           0 :                                 HasRangeKey:     meta.sstMeta.HasRangeKeys,
    1057           0 :                                 SyntheticSuffix: obj.syntheticSuffix,
    1058           0 :                         }
    1059           0 :                         if obj.syntheticPrefix.IsSet() {
    1060           0 :                                 external[i].SyntheticPrefix = obj.syntheticPrefix
    1061           0 :                         }
    1062             :                 }
    1063           0 :                 _, err = db.IngestExternalFiles(context.Background(), external)
    1064             :         }
    1065             : 
    1066           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
    1067             : }
    1068             : 
    1069           1 : func (o *ingestExternalFilesOp) receiver() objID { return o.dbID }
    1070           1 : func (o *ingestExternalFilesOp) syncObjs() objIDSlice {
    1071           1 :         res := make(objIDSlice, len(o.objs))
    1072           1 :         for i := range res {
    1073           1 :                 res[i] = o.objs[i].externalObjID
    1074           1 :         }
    1075             :         // Deduplicate the IDs.
    1076           1 :         slices.Sort(res)
    1077           1 :         return slices.Compact(res)
    1078             : }
    1079             : 
    1080           1 : func (o *ingestExternalFilesOp) formattedString(kf KeyFormat) string {
    1081           1 :         strs := make([]string, len(o.objs))
    1082           1 :         for i, obj := range o.objs {
    1083           1 :                 strs[i] = fmt.Sprintf("%s, %q /* start */, %q /* end */, %q /* syntheticSuffix */, %q /* syntheticPrefix */",
    1084           1 :                         obj.externalObjID, kf.FormatKey(obj.bounds.Start), kf.FormatKey(obj.bounds.End),
    1085           1 :                         kf.FormatKeySuffix(UserKeySuffix(obj.syntheticSuffix)), kf.FormatKey(UserKey(obj.syntheticPrefix)),
    1086           1 :                 )
    1087           1 :         }
    1088           1 :         return fmt.Sprintf("%s.IngestExternalFiles(%s)", o.dbID, strings.Join(strs, ", "))
    1089             : }
    1090             : 
    1091           0 : func (o *ingestExternalFilesOp) rewriteKeys(fn func(UserKey) UserKey) {
    1092           0 :         // If any of the objects have synthetic prefixes, we can't allow
    1093           0 :         // modification of external object bounds.
    1094           0 :         allowModification := true
    1095           0 :         for i := range o.objs {
    1096           0 :                 allowModification = allowModification && !o.objs[i].syntheticPrefix.IsSet()
    1097           0 :         }
    1098             : 
    1099           0 :         for i := range o.objs {
    1100           0 :                 s, e := fn(o.objs[i].bounds.Start), fn(o.objs[i].bounds.End)
    1101           0 :                 if allowModification {
    1102           0 :                         o.objs[i].bounds.Start = s
    1103           0 :                         o.objs[i].bounds.End = e
    1104           0 :                 }
    1105             :         }
    1106             : }
    1107             : 
    1108           0 : func (o *ingestExternalFilesOp) diagramKeyRanges() []pebble.KeyRange {
    1109           0 :         ranges := make([]pebble.KeyRange, len(o.objs))
    1110           0 :         for i, obj := range o.objs {
    1111           0 :                 ranges[i] = obj.bounds
    1112           0 :         }
    1113           0 :         return ranges
    1114             : }
    1115             : 
    1116             : // getOp models a Reader.Get operation.
    1117             : type getOp struct {
    1118             :         readerID    objID
    1119             :         key         UserKey
    1120             :         derivedDBID objID
    1121             : }
    1122             : 
    1123           1 : func (o *getOp) run(t *Test, h historyRecorder) {
    1124           1 :         r := t.getReader(o.readerID)
    1125           1 :         var val []byte
    1126           1 :         var closer io.Closer
    1127           1 :         err := t.withRetries(func() (err error) {
    1128           1 :                 val, closer, err = r.Get(o.key)
    1129           1 :                 return err
    1130           1 :         })
    1131           1 :         h.Recordf("%s // [%q] %v", o.formattedString(t.testOpts.KeyFormat), val, err)
    1132           1 :         if closer != nil {
    1133           1 :                 closer.Close()
    1134           1 :         }
    1135             : }
    1136             : 
    1137           1 : func (o *getOp) formattedString(kf KeyFormat) string {
    1138           1 :         return fmt.Sprintf("%s.Get(%q)", o.readerID, kf.FormatKey(o.key))
    1139           1 : }
    1140           1 : func (o *getOp) receiver() objID { return o.readerID }
    1141           1 : func (o *getOp) syncObjs() objIDSlice {
    1142           1 :         if o.readerID.tag() == dbTag {
    1143           1 :                 return nil
    1144           1 :         }
    1145             :         // batch.Get reads through to the current database state.
    1146           1 :         if o.derivedDBID != 0 {
    1147           1 :                 return []objID{o.derivedDBID}
    1148           1 :         }
    1149           0 :         return nil
    1150             : }
    1151             : 
    1152           0 : func (o *getOp) rewriteKeys(fn func(UserKey) UserKey) {
    1153           0 :         o.key = fn(o.key)
    1154           0 : }
    1155             : 
    1156           0 : func (o *getOp) diagramKeyRanges() []pebble.KeyRange {
    1157           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1158           0 : }
    1159             : 
    1160             : // newIterOp models a Reader.NewIter operation.
    1161             : type newIterOp struct {
    1162             :         readerID objID
    1163             :         iterID   objID
    1164             :         iterOpts
    1165             :         derivedDBID objID
    1166             : }
    1167             : 
    1168             : // Enable this to enable debug logging of range key iterator operations.
    1169             : const debugIterators = false
    1170             : 
    1171           1 : func (o *newIterOp) run(t *Test, h historyRecorder) {
    1172           1 :         r := t.getReader(o.readerID)
    1173           1 :         opts := iterOptions(t.testOpts.KeyFormat, o.iterOpts)
    1174           1 :         if debugIterators {
    1175           0 :                 opts.DebugRangeKeyStack = true
    1176           0 :         }
    1177             : 
    1178           1 :         var i *pebble.Iterator
    1179           1 :         for {
    1180           1 :                 i, _ = r.NewIter(opts)
    1181           1 :                 if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
    1182           1 :                         break
    1183             :                 }
    1184             :                 // close this iter and retry NewIter
    1185           0 :                 _ = i.Close()
    1186             :         }
    1187           1 :         t.setIter(o.iterID, i)
    1188           1 : 
    1189           1 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1190           1 :         // the user-provided bounds.
    1191           1 :         if opts != nil {
    1192           1 :                 rand.Read(opts.LowerBound[:])
    1193           1 :                 rand.Read(opts.UpperBound[:])
    1194           1 :         }
    1195           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), i.Error())
    1196             : }
    1197             : 
    1198           1 : func (o *newIterOp) formattedString(kf KeyFormat) string {
    1199           1 :         return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %q, %q, %t /* use L6 filters */, %q /* masking suffix */)",
    1200           1 :                 o.iterID, o.readerID, kf.FormatKey(o.lower), kf.FormatKey(o.upper),
    1201           1 :                 o.keyTypes, kf.FormatKeySuffix(o.filterMin), kf.FormatKeySuffix(o.filterMax),
    1202           1 :                 o.useL6Filters, kf.FormatKeySuffix(o.maskSuffix))
    1203           1 : }
    1204             : 
    1205           1 : func (o *newIterOp) receiver() objID { return o.readerID }
    1206           1 : func (o *newIterOp) syncObjs() objIDSlice {
    1207           1 :         // Prevent o.iterID ops from running before it exists.
    1208           1 :         objs := []objID{o.iterID}
    1209           1 :         // If reading through a batch or snapshot, the new iterator will also observe database
    1210           1 :         // state, and we must synchronize on the database state for a consistent
    1211           1 :         // view.
    1212           1 :         if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
    1213           1 :                 objs = append(objs, o.derivedDBID)
    1214           1 :         }
    1215           1 :         return objs
    1216             : }
    1217             : 
    1218           0 : func (o *newIterOp) rewriteKeys(fn func(UserKey) UserKey) {
    1219           0 :         if o.lower != nil {
    1220           0 :                 o.lower = fn(o.lower)
    1221           0 :         }
    1222           0 :         if o.upper != nil {
    1223           0 :                 o.upper = fn(o.upper)
    1224           0 :         }
    1225             : }
    1226             : 
    1227           1 : func (o *newIterOp) diagramKeyRanges() []pebble.KeyRange {
    1228           1 :         var res []pebble.KeyRange
    1229           1 :         if o.lower != nil {
    1230           0 :                 res = append(res, pebble.KeyRange{Start: o.lower, End: o.lower})
    1231           0 :         }
    1232           1 :         if o.upper != nil {
    1233           0 :                 res = append(res, pebble.KeyRange{Start: o.upper, End: o.upper})
    1234           0 :         }
    1235           1 :         return res
    1236             : }
    1237             : 
    1238             : // newIterUsingCloneOp models a Iterator.Clone operation.
    1239             : type newIterUsingCloneOp struct {
    1240             :         existingIterID objID
    1241             :         iterID         objID
    1242             :         refreshBatch   bool
    1243             :         iterOpts
    1244             : 
    1245             :         // derivedReaderID is the ID of the underlying reader that backs both the
    1246             :         // existing iterator and the new iterator. The derivedReaderID is NOT
    1247             :         // serialized by String and is derived from other operations during parse.
    1248             :         derivedReaderID objID
    1249             : }
    1250             : 
    1251           1 : func (o *newIterUsingCloneOp) run(t *Test, h historyRecorder) {
    1252           1 :         iter := t.getIter(o.existingIterID)
    1253           1 :         cloneOpts := pebble.CloneOptions{
    1254           1 :                 IterOptions:      iterOptions(t.testOpts.KeyFormat, o.iterOpts),
    1255           1 :                 RefreshBatchView: o.refreshBatch,
    1256           1 :         }
    1257           1 :         i, err := iter.iter.Clone(cloneOpts)
    1258           1 :         if err != nil {
    1259           0 :                 panic(err)
    1260             :         }
    1261           1 :         t.setIter(o.iterID, i)
    1262           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), i.Error())
    1263             : }
    1264             : 
    1265           1 : func (o *newIterUsingCloneOp) formattedString(kf KeyFormat) string {
    1266           1 :         return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %q, %q, %t /* use L6 filters */, %q /* masking suffix */)",
    1267           1 :                 o.iterID, o.existingIterID, o.refreshBatch, kf.FormatKey(o.lower),
    1268           1 :                 kf.FormatKey(o.upper), o.keyTypes, kf.FormatKeySuffix(o.filterMin),
    1269           1 :                 kf.FormatKeySuffix(o.filterMax), o.useL6Filters, kf.FormatKeySuffix(o.maskSuffix))
    1270           1 : }
    1271             : 
    1272           1 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
    1273             : 
    1274           1 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
    1275           1 :         objIDs := []objID{o.iterID}
    1276           1 :         // If the underlying reader is a batch, we must synchronize with the batch.
    1277           1 :         // If refreshBatch=true, synchronizing is necessary to observe all the
    1278           1 :         // mutations up to until this op and no more. Even when refreshBatch=false,
    1279           1 :         // we must synchronize because iterator construction may access state cached
    1280           1 :         // on the indexed batch to avoid refragmenting range tombstones or range
    1281           1 :         // keys.
    1282           1 :         if o.derivedReaderID.tag() == batchTag {
    1283           0 :                 objIDs = append(objIDs, o.derivedReaderID)
    1284           0 :         }
    1285           1 :         return objIDs
    1286             : }
    1287             : 
    1288           0 : func (o *newIterUsingCloneOp) rewriteKeys(fn func(UserKey) UserKey) {}
    1289           0 : func (o *newIterUsingCloneOp) diagramKeyRanges() []pebble.KeyRange  { return nil }
    1290             : 
    1291             : // iterSetBoundsOp models an Iterator.SetBounds operation.
    1292             : type iterSetBoundsOp struct {
    1293             :         iterID objID
    1294             :         lower  UserKey
    1295             :         upper  UserKey
    1296             : }
    1297             : 
    1298           1 : func (o *iterSetBoundsOp) run(t *Test, h historyRecorder) {
    1299           1 :         i := t.getIter(o.iterID)
    1300           1 :         var lower, upper []byte
    1301           1 :         if o.lower != nil {
    1302           1 :                 lower = append(lower, o.lower...)
    1303           1 :         }
    1304           1 :         if o.upper != nil {
    1305           1 :                 upper = append(upper, o.upper...)
    1306           1 :         }
    1307           1 :         i.SetBounds(lower, upper)
    1308           1 : 
    1309           1 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1310           1 :         // the user-provided bounds.
    1311           1 :         rand.Read(lower[:])
    1312           1 :         rand.Read(upper[:])
    1313           1 : 
    1314           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), i.Error())
    1315             : }
    1316             : 
    1317           1 : func (o *iterSetBoundsOp) formattedString(kf KeyFormat) string {
    1318           1 :         return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID,
    1319           1 :                 kf.FormatKey(o.lower), kf.FormatKey(o.upper))
    1320           1 : }
    1321             : 
    1322           1 : func (o *iterSetBoundsOp) receiver() objID      { return o.iterID }
    1323           1 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
    1324             : 
    1325           0 : func (o *iterSetBoundsOp) rewriteKeys(fn func(UserKey) UserKey) {
    1326           0 :         o.lower = fn(o.lower)
    1327           0 :         o.upper = fn(o.upper)
    1328           0 : }
    1329             : 
    1330           0 : func (o *iterSetBoundsOp) diagramKeyRanges() []pebble.KeyRange {
    1331           0 :         return []pebble.KeyRange{{Start: o.lower, End: o.upper}}
    1332           0 : }
    1333             : 
    1334             : // iterSetOptionsOp models an Iterator.SetOptions operation.
    1335             : type iterSetOptionsOp struct {
    1336             :         iterID objID
    1337             :         iterOpts
    1338             : 
    1339             :         // derivedReaderID is the ID of the underlying reader that backs the
    1340             :         // iterator. The derivedReaderID is NOT serialized by String and is derived
    1341             :         // from other operations during parse.
    1342             :         derivedReaderID objID
    1343             : }
    1344             : 
    1345           1 : func (o *iterSetOptionsOp) run(t *Test, h historyRecorder) {
    1346           1 :         i := t.getIter(o.iterID)
    1347           1 : 
    1348           1 :         opts := iterOptions(t.testOpts.KeyFormat, o.iterOpts)
    1349           1 :         if opts == nil {
    1350           0 :                 opts = &pebble.IterOptions{}
    1351           0 :         }
    1352           1 :         i.SetOptions(opts)
    1353           1 : 
    1354           1 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1355           1 :         // the user-provided bounds.
    1356           1 :         rand.Read(opts.LowerBound[:])
    1357           1 :         rand.Read(opts.UpperBound[:])
    1358           1 : 
    1359           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), i.Error())
    1360             : }
    1361             : 
    1362           1 : func (o *iterSetOptionsOp) formattedString(kf KeyFormat) string {
    1363           1 :         return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %q, %q, %t /* use L6 filters */, %q /* masking suffix */)",
    1364           1 :                 o.iterID, kf.FormatKey(o.lower), kf.FormatKey(o.upper),
    1365           1 :                 o.keyTypes, kf.FormatKeySuffix(o.filterMin), kf.FormatKeySuffix(o.filterMax),
    1366           1 :                 o.useL6Filters, kf.FormatKeySuffix(o.maskSuffix))
    1367           1 : }
    1368             : 
    1369           1 : func iterOptions(kf KeyFormat, o iterOpts) *pebble.IterOptions {
    1370           1 :         if o.IsZero() && !debugIterators {
    1371           1 :                 return nil
    1372           1 :         }
    1373           1 :         var lower, upper []byte
    1374           1 :         if o.lower != nil {
    1375           1 :                 lower = append(lower, o.lower...)
    1376           1 :         }
    1377           1 :         if o.upper != nil {
    1378           1 :                 upper = append(upper, o.upper...)
    1379           1 :         }
    1380           1 :         opts := &pebble.IterOptions{
    1381           1 :                 LowerBound: lower,
    1382           1 :                 UpperBound: upper,
    1383           1 :                 KeyTypes:   pebble.IterKeyType(o.keyTypes),
    1384           1 :                 RangeKeyMasking: pebble.RangeKeyMasking{
    1385           1 :                         Suffix: o.maskSuffix,
    1386           1 :                 },
    1387           1 :                 UseL6Filters:       o.useL6Filters,
    1388           1 :                 DebugRangeKeyStack: debugIterators,
    1389           1 :         }
    1390           1 :         if opts.RangeKeyMasking.Suffix != nil {
    1391           1 :                 opts.RangeKeyMasking.Filter = kf.NewSuffixFilterMask
    1392           1 :         }
    1393           1 :         if o.filterMax != nil {
    1394           1 :                 opts.PointKeyFilters = []pebble.BlockPropertyFilter{
    1395           1 :                         kf.NewSuffixBlockPropertyFilter(o.filterMin, o.filterMax),
    1396           1 :                 }
    1397           1 :                 // Enforce the timestamp bounds in SkipPoint, so that the iterator never
    1398           1 :                 // returns a key outside the filterMin, filterMax bounds. This provides
    1399           1 :                 // deterministic iteration.
    1400           1 :                 opts.SkipPoint = func(k []byte) (skip bool) {
    1401           1 :                         n := kf.Comparer.Split(k)
    1402           1 :                         if n == len(k) {
    1403           1 :                                 // No suffix, don't skip it.
    1404           1 :                                 return false
    1405           1 :                         }
    1406           1 :                         if kf.Comparer.ComparePointSuffixes(k[n:], o.filterMin) < 0 {
    1407           1 :                                 return true
    1408           1 :                         }
    1409           1 :                         if kf.Comparer.ComparePointSuffixes(k[n:], o.filterMax) >= 0 {
    1410           1 :                                 return true
    1411           1 :                         }
    1412           0 :                         return false
    1413             :                 }
    1414             :         }
    1415           1 :         return opts
    1416             : }
    1417             : 
    1418           1 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
    1419             : 
    1420           1 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
    1421           1 :         if o.derivedReaderID.tag() == batchTag {
    1422           0 :                 // If the underlying reader is a batch, we must synchronize with the
    1423           0 :                 // batch so that we observe all the mutations up until this operation
    1424           0 :                 // and no more.
    1425           0 :                 return []objID{o.derivedReaderID}
    1426           0 :         }
    1427           1 :         return nil
    1428             : }
    1429             : 
    1430           0 : func (o *iterSetOptionsOp) rewriteKeys(fn func(UserKey) UserKey) {}
    1431           0 : func (o *iterSetOptionsOp) diagramKeyRanges() []pebble.KeyRange  { return nil }
    1432             : 
    1433             : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
    1434             : type iterSeekGEOp struct {
    1435             :         iterID objID
    1436             :         key    UserKey
    1437             :         limit  UserKey
    1438             : 
    1439             :         derivedReaderID objID
    1440             : }
    1441             : 
    1442           1 : func iteratorPos(i *retryableIter) string {
    1443           1 :         var buf bytes.Buffer
    1444           1 :         fmt.Fprintf(&buf, "%q", i.Key())
    1445           1 :         hasPoint, hasRange := i.HasPointAndRange()
    1446           1 :         if hasPoint {
    1447           1 :                 fmt.Fprintf(&buf, ",%q", i.Value())
    1448           1 :         } else {
    1449           1 :                 fmt.Fprint(&buf, ",<no point>")
    1450           1 :         }
    1451           1 :         if hasRange {
    1452           1 :                 start, end := i.RangeBounds()
    1453           1 :                 fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
    1454           1 :                 for i, rk := range i.RangeKeys() {
    1455           1 :                         if i > 0 {
    1456           1 :                                 fmt.Fprint(&buf, ",")
    1457           1 :                         }
    1458           1 :                         fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
    1459             :                 }
    1460           1 :                 fmt.Fprint(&buf, "}")
    1461           1 :         } else {
    1462           1 :                 fmt.Fprint(&buf, ",<no range>")
    1463           1 :         }
    1464           1 :         if i.RangeKeyChanged() {
    1465           1 :                 fmt.Fprint(&buf, "*")
    1466           1 :         }
    1467           1 :         return buf.String()
    1468             : }
    1469             : 
    1470           1 : func validBoolToStr(valid bool) string {
    1471           1 :         return fmt.Sprintf("%t", valid)
    1472           1 : }
    1473             : 
    1474           1 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
    1475           1 :         // We can't distinguish between IterExhausted and IterAtLimit in a
    1476           1 :         // deterministic manner.
    1477           1 :         switch validity {
    1478           1 :         case pebble.IterExhausted, pebble.IterAtLimit:
    1479           1 :                 return false, "invalid"
    1480           1 :         case pebble.IterValid:
    1481           1 :                 return true, "valid"
    1482           0 :         default:
    1483           0 :                 panic("unknown validity")
    1484             :         }
    1485             : }
    1486             : 
    1487           1 : func (o *iterSeekGEOp) run(t *Test, h historyRecorder) {
    1488           1 :         i := t.getIter(o.iterID)
    1489           1 :         var valid bool
    1490           1 :         var validStr string
    1491           1 :         if o.limit == nil {
    1492           1 :                 valid = i.SeekGE(o.key)
    1493           1 :                 validStr = validBoolToStr(valid)
    1494           1 :         } else {
    1495           1 :                 valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
    1496           1 :         }
    1497           1 :         if valid {
    1498           1 :                 h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1499           1 :                         validStr, iteratorPos(i), i.Error())
    1500           1 :         } else {
    1501           1 :                 h.Recordf("%s // [%s] %v",
    1502           1 :                         o.formattedString(t.testOpts.KeyFormat), validStr, i.Error())
    1503           1 :         }
    1504             : }
    1505             : 
    1506           1 : func (o *iterSeekGEOp) formattedString(kf KeyFormat) string {
    1507           1 :         return fmt.Sprintf("%s.SeekGE(%q, %q)",
    1508           1 :                 o.iterID, kf.FormatKey(o.key), kf.FormatKey(o.limit))
    1509           1 : }
    1510           1 : func (o *iterSeekGEOp) receiver() objID      { return o.iterID }
    1511           1 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1512             : 
    1513           0 : func (o *iterSeekGEOp) rewriteKeys(fn func(UserKey) UserKey) {
    1514           0 :         o.key = fn(o.key)
    1515           0 : }
    1516             : 
    1517           1 : func (o *iterSeekGEOp) diagramKeyRanges() []pebble.KeyRange {
    1518           1 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1519           1 : }
    1520             : 
    1521           1 : func onlyBatchIDs(ids ...objID) objIDSlice {
    1522           1 :         var ret objIDSlice
    1523           1 :         for _, id := range ids {
    1524           1 :                 if id.tag() == batchTag {
    1525           1 :                         ret = append(ret, id)
    1526           1 :                 }
    1527             :         }
    1528           1 :         return ret
    1529             : }
    1530             : 
    1531             : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
    1532             : type iterSeekPrefixGEOp struct {
    1533             :         iterID objID
    1534             :         key    UserKey
    1535             : 
    1536             :         derivedReaderID objID
    1537             : }
    1538             : 
    1539           1 : func (o *iterSeekPrefixGEOp) run(t *Test, h historyRecorder) {
    1540           1 :         i := t.getIter(o.iterID)
    1541           1 :         valid := i.SeekPrefixGE(o.key)
    1542           1 :         if valid {
    1543           1 :                 h.Recordf("%s // [%t,%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1544           1 :                         valid, iteratorPos(i), i.Error())
    1545           1 :         } else {
    1546           1 :                 h.Recordf("%s // [%t] %v", o.formattedString(t.testOpts.KeyFormat), valid, i.Error())
    1547           1 :         }
    1548             : }
    1549             : 
    1550           1 : func (o *iterSeekPrefixGEOp) formattedString(kf KeyFormat) string {
    1551           1 :         return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, kf.FormatKey(o.key))
    1552           1 : }
    1553           1 : func (o *iterSeekPrefixGEOp) receiver() objID      { return o.iterID }
    1554           1 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1555             : 
    1556           0 : func (o *iterSeekPrefixGEOp) rewriteKeys(fn func(UserKey) UserKey) {
    1557           0 :         o.key = fn(o.key)
    1558           0 : }
    1559             : 
    1560           0 : func (o *iterSeekPrefixGEOp) diagramKeyRanges() []pebble.KeyRange {
    1561           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1562           0 : }
    1563             : 
    1564             : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
    1565             : type iterSeekLTOp struct {
    1566             :         iterID objID
    1567             :         key    UserKey
    1568             :         limit  UserKey
    1569             : 
    1570             :         derivedReaderID objID
    1571             : }
    1572             : 
    1573           1 : func (o *iterSeekLTOp) run(t *Test, h historyRecorder) {
    1574           1 :         i := t.getIter(o.iterID)
    1575           1 :         var valid bool
    1576           1 :         var validStr string
    1577           1 :         if o.limit == nil {
    1578           1 :                 valid = i.SeekLT(o.key)
    1579           1 :                 validStr = validBoolToStr(valid)
    1580           1 :         } else {
    1581           1 :                 valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
    1582           1 :         }
    1583           1 :         if valid {
    1584           1 :                 h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1585           1 :                         validStr, iteratorPos(i), i.Error())
    1586           1 :         } else {
    1587           1 :                 h.Recordf("%s // [%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1588           1 :                         validStr, i.Error())
    1589           1 :         }
    1590             : }
    1591             : 
    1592           1 : func (o *iterSeekLTOp) formattedString(kf KeyFormat) string {
    1593           1 :         return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID,
    1594           1 :                 kf.FormatKey(o.key), kf.FormatKey(o.limit))
    1595           1 : }
    1596             : 
    1597           1 : func (o *iterSeekLTOp) receiver() objID      { return o.iterID }
    1598           1 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1599             : 
    1600           0 : func (o *iterSeekLTOp) rewriteKeys(fn func(UserKey) UserKey) {
    1601           0 :         o.key = fn(o.key)
    1602           0 : }
    1603             : 
    1604           0 : func (o *iterSeekLTOp) diagramKeyRanges() []pebble.KeyRange {
    1605           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1606           0 : }
    1607             : 
    1608             : // iterFirstOp models an Iterator.First operation.
    1609             : type iterFirstOp struct {
    1610             :         iterID objID
    1611             : 
    1612             :         derivedReaderID objID
    1613             : }
    1614             : 
    1615           1 : func (o *iterFirstOp) run(t *Test, h historyRecorder) {
    1616           1 :         i := t.getIter(o.iterID)
    1617           1 :         valid := i.First()
    1618           1 :         if valid {
    1619           1 :                 h.Recordf("%s // [%t,%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1620           1 :                         valid, iteratorPos(i), i.Error())
    1621           1 :         } else {
    1622           1 :                 h.Recordf("%s // [%t] %v", o.formattedString(t.testOpts.KeyFormat),
    1623           1 :                         valid, i.Error())
    1624           1 :         }
    1625             : }
    1626             : 
    1627           1 : func (o *iterFirstOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.First()", o.iterID) }
    1628           1 : func (o *iterFirstOp) receiver() objID                  { return o.iterID }
    1629           1 : func (o *iterFirstOp) syncObjs() objIDSlice             { return onlyBatchIDs(o.derivedReaderID) }
    1630             : 
    1631           0 : func (o *iterFirstOp) rewriteKeys(func(UserKey) UserKey)   {}
    1632           0 : func (o *iterFirstOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1633             : 
    1634             : // iterLastOp models an Iterator.Last operation.
    1635             : type iterLastOp struct {
    1636             :         iterID objID
    1637             : 
    1638             :         derivedReaderID objID
    1639             : }
    1640             : 
    1641           1 : func (o *iterLastOp) run(t *Test, h historyRecorder) {
    1642           1 :         i := t.getIter(o.iterID)
    1643           1 :         valid := i.Last()
    1644           1 :         if valid {
    1645           1 :                 h.Recordf("%s // [%t,%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1646           1 :                         valid, iteratorPos(i), i.Error())
    1647           1 :         } else {
    1648           1 :                 h.Recordf("%s // [%t] %v", o.formattedString(t.testOpts.KeyFormat),
    1649           1 :                         valid, i.Error())
    1650           1 :         }
    1651             : }
    1652             : 
    1653           1 : func (o *iterLastOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.Last()", o.iterID) }
    1654           1 : func (o *iterLastOp) receiver() objID                  { return o.iterID }
    1655           1 : func (o *iterLastOp) syncObjs() objIDSlice             { return onlyBatchIDs(o.derivedReaderID) }
    1656             : 
    1657           0 : func (o *iterLastOp) rewriteKeys(func(UserKey) UserKey)   {}
    1658           0 : func (o *iterLastOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1659             : 
    1660             : // iterNextOp models an Iterator.Next[WithLimit] operation.
    1661             : type iterNextOp struct {
    1662             :         iterID objID
    1663             :         limit  UserKey
    1664             : 
    1665             :         derivedReaderID objID
    1666             : }
    1667             : 
    1668           1 : func (o *iterNextOp) run(t *Test, h historyRecorder) {
    1669           1 :         i := t.getIter(o.iterID)
    1670           1 :         var valid bool
    1671           1 :         var validStr string
    1672           1 :         if o.limit == nil {
    1673           1 :                 valid = i.Next()
    1674           1 :                 validStr = validBoolToStr(valid)
    1675           1 :         } else {
    1676           1 :                 valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
    1677           1 :         }
    1678           1 :         if valid {
    1679           1 :                 h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1680           1 :                         validStr, iteratorPos(i), i.Error())
    1681           1 :         } else {
    1682           1 :                 h.Recordf("%s // [%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1683           1 :                         validStr, i.Error())
    1684           1 :         }
    1685             : }
    1686             : 
    1687           1 : func (o *iterNextOp) formattedString(kf KeyFormat) string {
    1688           1 :         return fmt.Sprintf("%s.Next(%q)", o.iterID, kf.FormatKey(o.limit))
    1689           1 : }
    1690           1 : func (o *iterNextOp) receiver() objID      { return o.iterID }
    1691           1 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1692             : 
    1693           0 : func (o *iterNextOp) rewriteKeys(func(UserKey) UserKey)   {}
    1694           0 : func (o *iterNextOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1695             : 
    1696             : // iterNextPrefixOp models an Iterator.NextPrefix operation.
    1697             : type iterNextPrefixOp struct {
    1698             :         iterID objID
    1699             : 
    1700             :         derivedReaderID objID
    1701             : }
    1702             : 
    1703           1 : func (o *iterNextPrefixOp) run(t *Test, h historyRecorder) {
    1704           1 :         i := t.getIter(o.iterID)
    1705           1 :         valid := i.NextPrefix()
    1706           1 :         validStr := validBoolToStr(valid)
    1707           1 :         if valid {
    1708           1 :                 h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1709           1 :                         validStr, iteratorPos(i), i.Error())
    1710           1 :         } else {
    1711           1 :                 h.Recordf("%s // [%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1712           1 :                         validStr, i.Error())
    1713           1 :         }
    1714             : }
    1715             : 
    1716           1 : func (o *iterNextPrefixOp) formattedString(KeyFormat) string {
    1717           1 :         return fmt.Sprintf("%s.NextPrefix()", o.iterID)
    1718           1 : }
    1719           1 : func (o *iterNextPrefixOp) receiver() objID      { return o.iterID }
    1720           1 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1721             : 
    1722           0 : func (o *iterNextPrefixOp) rewriteKeys(func(UserKey) UserKey)   {}
    1723           0 : func (o *iterNextPrefixOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1724             : 
    1725             : // iterCanSingleDelOp models a call to CanDeterministicallySingleDelete with an
    1726             : // Iterator.
    1727             : type iterCanSingleDelOp struct {
    1728             :         iterID objID
    1729             : 
    1730             :         derivedReaderID objID
    1731             : }
    1732             : 
    1733           1 : func (o *iterCanSingleDelOp) run(t *Test, h historyRecorder) {
    1734           1 :         // TODO(jackson): When we perform error injection, we'll need to rethink
    1735           1 :         // this.
    1736           1 :         _, err := pebble.CanDeterministicallySingleDelete(t.getIter(o.iterID).iter)
    1737           1 :         // The return value of CanDeterministicallySingleDelete is dependent on
    1738           1 :         // internal LSM state and non-deterministic, so we don't record it.
    1739           1 :         // Including the operation within the metamorphic test at all helps ensure
    1740           1 :         // that it does not change the result of any other Iterator operation that
    1741           1 :         // should be deterministic, regardless of its own outcome.
    1742           1 :         //
    1743           1 :         // We still record the value of the error because it's deterministic, at
    1744           1 :         // least for now. The possible error cases are:
    1745           1 :         //  - The iterator was already in an error state when the operation ran.
    1746           1 :         //  - The operation is deterministically invalid (like using an InternalNext
    1747           1 :         //    to change directions.)
    1748           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
    1749           1 : }
    1750             : 
    1751           1 : func (o *iterCanSingleDelOp) formattedString(KeyFormat) string {
    1752           1 :         return fmt.Sprintf("%s.InternalNext()", o.iterID)
    1753           1 : }
    1754           1 : func (o *iterCanSingleDelOp) receiver() objID      { return o.iterID }
    1755           1 : func (o *iterCanSingleDelOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1756             : 
    1757           0 : func (o *iterCanSingleDelOp) rewriteKeys(func(UserKey) UserKey)   {}
    1758           0 : func (o *iterCanSingleDelOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1759             : 
    1760             : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
    1761             : type iterPrevOp struct {
    1762             :         iterID objID
    1763             :         limit  []byte
    1764             : 
    1765             :         derivedReaderID objID
    1766             : }
    1767             : 
    1768           1 : func (o *iterPrevOp) run(t *Test, h historyRecorder) {
    1769           1 :         i := t.getIter(o.iterID)
    1770           1 :         var valid bool
    1771           1 :         var validStr string
    1772           1 :         if o.limit == nil {
    1773           1 :                 valid = i.Prev()
    1774           1 :                 validStr = validBoolToStr(valid)
    1775           1 :         } else {
    1776           1 :                 valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
    1777           1 :         }
    1778           1 :         if valid {
    1779           1 :                 h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1780           1 :                         validStr, iteratorPos(i), i.Error())
    1781           1 :         } else {
    1782           1 :                 h.Recordf("%s // [%s] %v", o.formattedString(t.testOpts.KeyFormat),
    1783           1 :                         validStr, i.Error())
    1784           1 :         }
    1785             : }
    1786             : 
    1787           1 : func (o *iterPrevOp) formattedString(kf KeyFormat) string {
    1788           1 :         return fmt.Sprintf("%s.Prev(%q)", o.iterID, kf.FormatKey(o.limit))
    1789           1 : }
    1790           1 : func (o *iterPrevOp) receiver() objID      { return o.iterID }
    1791           1 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1792             : 
    1793           0 : func (o *iterPrevOp) rewriteKeys(func(UserKey) UserKey)   {}
    1794           0 : func (o *iterPrevOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1795             : 
    1796             : // newSnapshotOp models a DB.NewSnapshot operation.
    1797             : type newSnapshotOp struct {
    1798             :         dbID   objID
    1799             :         snapID objID
    1800             :         // If nonempty, this snapshot must not be used to read any keys outside of
    1801             :         // the provided bounds. This allows some implementations to use 'Eventually
    1802             :         // file-only snapshots,' which require bounds.
    1803             :         bounds []pebble.KeyRange
    1804             : }
    1805             : 
    1806           1 : func (o *newSnapshotOp) run(t *Test, h historyRecorder) {
    1807           1 :         bounds := o.bounds
    1808           1 :         if len(bounds) == 0 {
    1809           0 :                 panic("bounds unexpectedly unset for newSnapshotOp")
    1810             :         }
    1811             :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
    1812           1 :         createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1
    1813           1 :         // If either of these options is true, an EFOS _must_ be created, regardless
    1814           1 :         // of what the fibonacci hash returned.
    1815           1 :         excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExternalReplicate || t.testOpts.useExcise
    1816           1 :         if createEfos || excisePossible {
    1817           1 :                 s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds)
    1818           1 :                 t.setSnapshot(o.snapID, s)
    1819           1 :         } else {
    1820           1 :                 s := t.getDB(o.dbID).NewSnapshot()
    1821           1 :                 t.setSnapshot(o.snapID, s)
    1822           1 :         }
    1823           1 :         h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
    1824             : }
    1825             : 
    1826           1 : func (o *newSnapshotOp) formattedString(kf KeyFormat) string {
    1827           1 :         var buf bytes.Buffer
    1828           1 :         fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
    1829           1 :         for i := range o.bounds {
    1830           1 :                 if i > 0 {
    1831           1 :                         fmt.Fprint(&buf, ", ")
    1832           1 :                 }
    1833           1 :                 fmt.Fprintf(&buf, "%q, %q", kf.FormatKey(o.bounds[i].Start), kf.FormatKey(o.bounds[i].End))
    1834             :         }
    1835           1 :         fmt.Fprint(&buf, ")")
    1836           1 :         return buf.String()
    1837             : }
    1838           1 : func (o *newSnapshotOp) receiver() objID      { return o.dbID }
    1839           1 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
    1840             : 
    1841           1 : func (o *newSnapshotOp) rewriteKeys(fn func(UserKey) UserKey) {
    1842           1 :         for i := range o.bounds {
    1843           1 :                 o.bounds[i].Start = fn(o.bounds[i].Start)
    1844           1 :                 o.bounds[i].End = fn(o.bounds[i].End)
    1845           1 :         }
    1846             : }
    1847             : 
    1848           1 : func (o *newSnapshotOp) diagramKeyRanges() []pebble.KeyRange {
    1849           1 :         return o.bounds
    1850           1 : }
    1851             : 
    1852             : // newExternalObjOp models a DB.NewExternalObj operation.
    1853             : type newExternalObjOp struct {
    1854             :         batchID       objID
    1855             :         externalObjID objID
    1856             : }
    1857             : 
    1858           1 : func externalObjName(externalObjID objID) string {
    1859           1 :         if externalObjID.tag() != externalObjTag {
    1860           0 :                 panic(fmt.Sprintf("invalid externalObjID %s", externalObjID))
    1861             :         }
    1862           1 :         return fmt.Sprintf("external-for-ingest-%d.sst", externalObjID.slot())
    1863             : }
    1864             : 
    1865           1 : func (o *newExternalObjOp) run(t *Test, h historyRecorder) {
    1866           1 :         b := t.getBatch(o.batchID)
    1867           1 :         t.clearObj(o.batchID)
    1868           1 : 
    1869           1 :         writeCloser, err := t.externalStorage.CreateObject(externalObjName(o.externalObjID))
    1870           1 :         if err != nil {
    1871           0 :                 panic(err)
    1872             :         }
    1873           1 :         writable := objstorageprovider.NewRemoteWritable(writeCloser)
    1874           1 : 
    1875           1 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
    1876           1 : 
    1877           1 :         sstMeta, err := writeSSTForIngestion(
    1878           1 :                 t,
    1879           1 :                 iter, rangeDelIter, rangeKeyIter,
    1880           1 :                 true, /* uniquePrefixes */
    1881           1 :                 nil,  /* syntheticSuffix */
    1882           1 :                 nil,  /* syntheticPrefix */
    1883           1 :                 writable,
    1884           1 :                 t.minFMV(),
    1885           1 :         )
    1886           1 :         if err != nil {
    1887           0 :                 panic(err)
    1888             :         }
    1889           1 :         if !sstMeta.HasPointKeys && !sstMeta.HasRangeDelKeys && !sstMeta.HasRangeKeys {
    1890           0 :                 // This can occur when using --try-to-reduce.
    1891           0 :                 panic("metamorphic test internal error: external object empty")
    1892             :         }
    1893           1 :         t.setExternalObj(o.externalObjID, externalObjMeta{
    1894           1 :                 sstMeta: sstMeta,
    1895           1 :         })
    1896           1 :         h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
    1897             : }
    1898             : 
    1899           1 : func (o *newExternalObjOp) formattedString(KeyFormat) string {
    1900           1 :         return fmt.Sprintf("%s = %s.NewExternalObj()", o.externalObjID, o.batchID)
    1901           1 : }
    1902           1 : func (o *newExternalObjOp) receiver() objID      { return o.batchID }
    1903           1 : func (o *newExternalObjOp) syncObjs() objIDSlice { return []objID{o.externalObjID} }
    1904             : 
    1905           0 : func (o *newExternalObjOp) rewriteKeys(func(UserKey) UserKey)   {}
    1906           0 : func (o *newExternalObjOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1907             : 
    1908             : type dbRatchetFormatMajorVersionOp struct {
    1909             :         dbID objID
    1910             :         vers pebble.FormatMajorVersion
    1911             : }
    1912             : 
    1913           1 : func (o *dbRatchetFormatMajorVersionOp) run(t *Test, h historyRecorder) {
    1914           1 :         var err error
    1915           1 :         // NB: We no-op the operation if we're already at or above the provided
    1916           1 :         // format major version. Different runs start at different format major
    1917           1 :         // versions, making the presence of an error and the error message itself
    1918           1 :         // non-deterministic if we attempt to upgrade to an older version.
    1919           1 :         //
    1920           1 :         //Regardless, subsequent operations should behave identically, which is what
    1921           1 :         //we're really aiming to test by including this format major version ratchet
    1922           1 :         //operation.
    1923           1 :         if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
    1924           1 :                 err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
    1925           1 :         }
    1926           1 :         h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
    1927             : }
    1928             : 
    1929           1 : func (o *dbRatchetFormatMajorVersionOp) formattedString(KeyFormat) string {
    1930           1 :         return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
    1931           1 : }
    1932           1 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID      { return o.dbID }
    1933           1 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
    1934             : 
    1935           0 : func (o *dbRatchetFormatMajorVersionOp) rewriteKeys(func(UserKey) UserKey)   {}
    1936           0 : func (o *dbRatchetFormatMajorVersionOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1937             : 
    1938             : type dbRestartOp struct {
    1939             :         dbID objID
    1940             : 
    1941             :         // affectedObjects is the list of additional objects that are affected by this
    1942             :         // operation, and which syncObjs() must return so that we don't perform the
    1943             :         // restart in parallel with other operations to affected objects.
    1944             :         affectedObjects []objID
    1945             : }
    1946             : 
    1947           1 : func (o *dbRestartOp) run(t *Test, h historyRecorder) {
    1948           1 :         if err := t.restartDB(o.dbID); err != nil {
    1949           0 :                 h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
    1950           0 :                 h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
    1951           1 :         } else {
    1952           1 :                 h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
    1953           1 :         }
    1954             : }
    1955             : 
    1956           1 : func (o *dbRestartOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.Restart()", o.dbID) }
    1957           1 : func (o *dbRestartOp) receiver() objID                  { return o.dbID }
    1958           1 : func (o *dbRestartOp) syncObjs() objIDSlice             { return o.affectedObjects }
    1959             : 
    1960           0 : func (o *dbRestartOp) rewriteKeys(func(UserKey) UserKey)   {}
    1961           0 : func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1962             : 
    1963           1 : func formatOps(kf KeyFormat, ops []op) string {
    1964           1 :         var buf strings.Builder
    1965           1 :         for _, op := range ops {
    1966           1 :                 fmt.Fprintf(&buf, "%s\n", op.formattedString(kf))
    1967           1 :         }
    1968           1 :         return buf.String()
    1969             : }
    1970             : 
    1971             : // replicateOp models an operation that could copy keys from one db to
    1972             : // another through either an IngestAndExcise, or an Ingest.
    1973             : type replicateOp struct {
    1974             :         source, dest objID
    1975             :         start, end   UserKey
    1976             : }
    1977             : 
    1978             : func (r *replicateOp) runSharedReplicate(
    1979             :         t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
    1980           0 : ) {
    1981           0 :         var sharedSSTs []pebble.SharedSSTMeta
    1982           0 :         var err error
    1983           0 :         err = source.ScanInternal(context.TODO(), sstable.CategoryUnknown, r.start, r.end,
    1984           0 :                 func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
    1985           0 :                         val, _, err := value.Value(nil)
    1986           0 :                         if err != nil {
    1987           0 :                                 panic(err)
    1988             :                         }
    1989           0 :                         return w.Raw().AddWithForceObsolete(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false)
    1990             :                 },
    1991           0 :                 func(start, end []byte, seqNum base.SeqNum) error {
    1992           0 :                         return w.DeleteRange(start, end)
    1993           0 :                 },
    1994           0 :                 func(start, end []byte, keys []keyspan.Key) error {
    1995           0 :                         return w.Raw().EncodeSpan(keyspan.Span{
    1996           0 :                                 Start: start,
    1997           0 :                                 End:   end,
    1998           0 :                                 Keys:  keys,
    1999           0 :                         })
    2000           0 :                 },
    2001           0 :                 func(sst *pebble.SharedSSTMeta) error {
    2002           0 :                         sharedSSTs = append(sharedSSTs, *sst)
    2003           0 :                         return nil
    2004           0 :                 },
    2005             :                 nil,
    2006             :         )
    2007           0 :         if err != nil {
    2008           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2009           0 :                 return
    2010           0 :         }
    2011             : 
    2012           0 :         err = w.Close()
    2013           0 :         if err != nil {
    2014           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2015           0 :                 return
    2016           0 :         }
    2017           0 :         meta, err := w.Raw().Metadata()
    2018           0 :         if err != nil {
    2019           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2020           0 :                 return
    2021           0 :         }
    2022           0 :         if len(sharedSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
    2023           0 :                 // IngestAndExcise below will be a no-op. We should do a
    2024           0 :                 // DeleteRange+RangeKeyDel to mimic the behaviour of the non-shared-replicate
    2025           0 :                 // case.
    2026           0 :                 //
    2027           0 :                 // TODO(bilal): Remove this when we support excises with no matching ingests.
    2028           0 :                 if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
    2029           0 :                         h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2030           0 :                         return
    2031           0 :                 }
    2032           0 :                 err := dest.DeleteRange(r.start, r.end, t.writeOpts)
    2033           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2034           0 :                 return
    2035             :         }
    2036             : 
    2037           0 :         _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end})
    2038           0 :         h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2039             : }
    2040             : 
    2041             : func (r *replicateOp) runExternalReplicate(
    2042             :         t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
    2043           0 : ) {
    2044           0 :         var externalSSTs []pebble.ExternalFile
    2045           0 :         var err error
    2046           0 :         err = source.ScanInternal(context.TODO(), sstable.CategoryUnknown, r.start, r.end,
    2047           0 :                 func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
    2048           0 :                         val, _, err := value.Value(nil)
    2049           0 :                         if err != nil {
    2050           0 :                                 panic(err)
    2051             :                         }
    2052           0 :                         return w.Raw().AddWithForceObsolete(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false)
    2053             :                 },
    2054           0 :                 func(start, end []byte, seqNum base.SeqNum) error {
    2055           0 :                         return w.DeleteRange(start, end)
    2056           0 :                 },
    2057           0 :                 func(start, end []byte, keys []keyspan.Key) error {
    2058           0 :                         return w.Raw().EncodeSpan(keyspan.Span{
    2059           0 :                                 Start: start,
    2060           0 :                                 End:   end,
    2061           0 :                                 Keys:  keys,
    2062           0 :                         })
    2063           0 :                 },
    2064             :                 nil,
    2065           0 :                 func(sst *pebble.ExternalFile) error {
    2066           0 :                         externalSSTs = append(externalSSTs, *sst)
    2067           0 :                         return nil
    2068           0 :                 },
    2069             :         )
    2070           0 :         if err != nil {
    2071           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2072           0 :                 return
    2073           0 :         }
    2074             : 
    2075           0 :         err = w.Close()
    2076           0 :         if err != nil {
    2077           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2078           0 :                 return
    2079           0 :         }
    2080           0 :         meta, err := w.Raw().Metadata()
    2081           0 :         if err != nil {
    2082           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2083           0 :                 return
    2084           0 :         }
    2085           0 :         if len(externalSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
    2086           0 :                 // IngestAndExcise below will be a no-op. We should do a
    2087           0 :                 // DeleteRange+RangeKeyDel to mimic the behaviour of the non-external-replicate
    2088           0 :                 // case.
    2089           0 :                 //
    2090           0 :                 // TODO(bilal): Remove this when we support excises with no matching ingests.
    2091           0 :                 if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
    2092           0 :                         h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2093           0 :                         return
    2094           0 :                 }
    2095           0 :                 err := dest.DeleteRange(r.start, r.end, t.writeOpts)
    2096           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2097           0 :                 return
    2098             :         }
    2099             : 
    2100           0 :         _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end})
    2101           0 :         h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2102             : }
    2103             : 
    2104           0 : func (r *replicateOp) run(t *Test, h historyRecorder) {
    2105           0 :         // Shared replication only works if shared storage is enabled.
    2106           0 :         useSharedIngest := t.testOpts.useSharedReplicate && t.testOpts.sharedStorageEnabled
    2107           0 :         useExternalIngest := t.testOpts.useExternalReplicate && t.testOpts.externalStorageEnabled
    2108           0 : 
    2109           0 :         source := t.getDB(r.source)
    2110           0 :         dest := t.getDB(r.dest)
    2111           0 :         sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
    2112           0 :         f, err := t.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
    2113           0 :         if err != nil {
    2114           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2115           0 :                 return
    2116           0 :         }
    2117           0 :         w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.TableFormat()))
    2118           0 : 
    2119           0 :         // NB: In practice we'll either do shared replicate or external replicate,
    2120           0 :         // as ScanInternal does not support both. We arbitrarily choose to prioritize
    2121           0 :         // external replication if both are enabled, as those are likely to hit
    2122           0 :         // widespread usage first.
    2123           0 :         if useExternalIngest {
    2124           0 :                 r.runExternalReplicate(t, h, source, dest, w, sstPath)
    2125           0 :                 return
    2126           0 :         }
    2127           0 :         if useSharedIngest {
    2128           0 :                 r.runSharedReplicate(t, h, source, dest, w, sstPath)
    2129           0 :                 return
    2130           0 :         }
    2131             : 
    2132             :         // First, do a RangeKeyDelete and DeleteRange on the whole span.
    2133           0 :         if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
    2134           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2135           0 :                 return
    2136           0 :         }
    2137           0 :         if err := dest.DeleteRange(r.start, r.end, t.writeOpts); err != nil {
    2138           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2139           0 :                 return
    2140           0 :         }
    2141           0 :         iter, err := source.NewIter(&pebble.IterOptions{
    2142           0 :                 LowerBound: r.start,
    2143           0 :                 UpperBound: r.end,
    2144           0 :                 KeyTypes:   pebble.IterKeyTypePointsAndRanges,
    2145           0 :         })
    2146           0 :         if err != nil {
    2147           0 :                 panic(err)
    2148             :         }
    2149           0 :         defer iter.Close()
    2150           0 : 
    2151           0 :         for ok := iter.SeekGE(r.start); ok && iter.Error() == nil; ok = iter.Next() {
    2152           0 :                 hasPoint, hasRange := iter.HasPointAndRange()
    2153           0 :                 if hasPoint {
    2154           0 :                         val, err := iter.ValueAndErr()
    2155           0 :                         if err != nil {
    2156           0 :                                 panic(err)
    2157             :                         }
    2158           0 :                         if err := w.Set(iter.Key(), val); err != nil {
    2159           0 :                                 panic(err)
    2160             :                         }
    2161             :                 }
    2162           0 :                 if hasRange && iter.RangeKeyChanged() {
    2163           0 :                         rangeKeys := iter.RangeKeys()
    2164           0 :                         rkStart, rkEnd := iter.RangeBounds()
    2165           0 : 
    2166           0 :                         span := keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))}
    2167           0 :                         for i := range rangeKeys {
    2168           0 :                                 span.Keys[i] = keyspan.Key{
    2169           0 :                                         Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
    2170           0 :                                         Suffix:  rangeKeys[i].Suffix,
    2171           0 :                                         Value:   rangeKeys[i].Value,
    2172           0 :                                 }
    2173           0 :                         }
    2174           0 :                         keyspan.SortKeysByTrailer(span.Keys)
    2175           0 :                         if err := w.Raw().EncodeSpan(span); err != nil {
    2176           0 :                                 panic(err)
    2177             :                         }
    2178             :                 }
    2179             :         }
    2180           0 :         if err := iter.Error(); err != nil {
    2181           0 :                 h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2182           0 :                 return
    2183           0 :         }
    2184           0 :         if err := w.Close(); err != nil {
    2185           0 :                 panic(err)
    2186             :         }
    2187             : 
    2188           0 :         err = dest.Ingest(context.Background(), []string{sstPath})
    2189           0 :         h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
    2190             : }
    2191             : 
    2192           1 : func (r *replicateOp) formattedString(kf KeyFormat) string {
    2193           1 :         return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest,
    2194           1 :                 kf.FormatKey(r.start), kf.FormatKey(r.end))
    2195           1 : }
    2196             : 
    2197           0 : func (r *replicateOp) receiver() objID      { return r.source }
    2198           0 : func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }
    2199             : 
    2200           0 : func (r *replicateOp) rewriteKeys(fn func(UserKey) UserKey) {
    2201           0 :         r.start = fn(r.start)
    2202           0 :         r.end = fn(r.end)
    2203           0 : }
    2204             : 
    2205           1 : func (r *replicateOp) diagramKeyRanges() []pebble.KeyRange {
    2206           1 :         return []pebble.KeyRange{{Start: r.start, End: r.end}}
    2207           1 : }

Generated by: LCOV version 1.14