LCOV - code coverage report
Current view: top level - pebble/metamorphic - ops.go (source / functions) Hit Total Coverage
Test: 2024-08-13 08:17Z 57ff76d1 - tests + meta.lcov Lines: 1104 1402 78.7 %
Date: 2024-08-13 08:18:14 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "crypto/rand"
      11             :         "encoding/binary"
      12             :         "fmt"
      13             :         "io"
      14             :         "path"
      15             :         "path/filepath"
      16             :         "slices"
      17             :         "strings"
      18             : 
      19             :         "github.com/cockroachdb/errors"
      20             :         "github.com/cockroachdb/pebble"
      21             :         "github.com/cockroachdb/pebble/internal/base"
      22             :         "github.com/cockroachdb/pebble/internal/keyspan"
      23             :         "github.com/cockroachdb/pebble/internal/private"
      24             :         "github.com/cockroachdb/pebble/internal/rangekey"
      25             :         "github.com/cockroachdb/pebble/internal/testkeys"
      26             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      27             :         "github.com/cockroachdb/pebble/sstable"
      28             :         "github.com/cockroachdb/pebble/vfs"
      29             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      30             : )
      31             : 
      32             : // Ops holds a sequence of operations to be executed by the metamorphic tests.
      33             : type Ops []op
      34             : 
      35             : // op defines the interface for a single operation, such as creating a batch,
      36             : // or advancing an iterator.
      37             : type op interface {
      38             :         String() string
      39             : 
      40             :         run(t *Test, h historyRecorder)
      41             : 
      42             :         // receiver returns the object ID of the object the operation is performed
      43             :         // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
      44             :         // its receiver). Receivers are used for synchronization when running with
      45             :         // concurrency.
      46             :         receiver() objID
      47             : 
      48             :         // syncObjs returns an additional set of object IDs—excluding the
      49             :         // receiver—that the operation must synchronize with. At execution time,
      50             :         // the operation will run serially with respect to all other operations
      51             :         // that return these objects from their own syncObjs or receiver methods.
      52             :         syncObjs() objIDSlice
      53             : 
      54             :         // keys returns all user keys used by the operation, as pointers to slices.
      55             :         // The caller can then modify these slices to rewrite the keys.
      56             :         //
      57             :         // Used for simplification of operations for easier investigations.
      58             :         keys() []*[]byte
      59             : 
      60             :         // diagramKeyRanges() returns key spans associated with this operation, to be
      61             :         // shown on an ASCII diagram of operations.
      62             :         diagramKeyRanges() []pebble.KeyRange
      63             : }
      64             : 
      65             : // initOp performs test initialization
      66             : type initOp struct {
      67             :         dbSlots          uint32
      68             :         batchSlots       uint32
      69             :         iterSlots        uint32
      70             :         snapshotSlots    uint32
      71             :         externalObjSlots uint32
      72             : }
      73             : 
      74           2 : func (o *initOp) run(t *Test, h historyRecorder) {
      75           2 :         t.batches = make([]*pebble.Batch, o.batchSlots)
      76           2 :         t.iters = make([]*retryableIter, o.iterSlots)
      77           2 :         t.snapshots = make([]readerCloser, o.snapshotSlots)
      78           2 :         t.externalObjs = make([]externalObjMeta, o.externalObjSlots)
      79           2 :         h.Recordf("%s", o)
      80           2 : }
      81             : 
      82           2 : func (o *initOp) String() string {
      83           2 :         return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */, %d /* externalObjs */)",
      84           2 :                 o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots, o.externalObjSlots)
      85           2 : }
      86             : 
      87           2 : func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
      88           2 : func (o *initOp) syncObjs() objIDSlice {
      89           2 :         syncObjs := make([]objID, 0)
      90           2 :         // Add any additional DBs to syncObjs.
      91           2 :         for i := uint32(2); i < o.dbSlots+1; i++ {
      92           1 :                 syncObjs = append(syncObjs, makeObjID(dbTag, i))
      93           1 :         }
      94           2 :         return syncObjs
      95             : }
      96             : 
      97           0 : func (o *initOp) keys() []*[]byte                     { return nil }
      98           1 : func (o *initOp) diagramKeyRanges() []pebble.KeyRange { return nil }
      99             : 
     100             : // applyOp models a Writer.Apply operation.
     101             : type applyOp struct {
     102             :         writerID objID
     103             :         batchID  objID
     104             : }
     105             : 
     106           2 : func (o *applyOp) run(t *Test, h historyRecorder) {
     107           2 :         b := t.getBatch(o.batchID)
     108           2 :         w := t.getWriter(o.writerID)
     109           2 :         var err error
     110           2 :         if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
     111           1 :                 err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
     112           1 :                 if err == nil {
     113           1 :                         err = b.SyncWait()
     114           1 :                 }
     115           2 :         } else {
     116           2 :                 err = w.Apply(b, t.writeOpts)
     117           2 :         }
     118           2 :         h.Recordf("%s // %v", o, err)
     119             :         // batch will be closed by a closeOp which is guaranteed to be generated
     120             : }
     121             : 
     122           2 : func (o *applyOp) String() string  { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
     123           2 : func (o *applyOp) receiver() objID { return o.writerID }
     124           2 : func (o *applyOp) syncObjs() objIDSlice {
     125           2 :         // Apply should not be concurrent with operations that are mutating the
     126           2 :         // batch.
     127           2 :         return []objID{o.batchID}
     128           2 : }
     129             : 
     130           0 : func (o *applyOp) keys() []*[]byte                     { return nil }
     131           0 : func (o *applyOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     132             : 
     133             : // checkpointOp models a DB.Checkpoint operation.
     134             : type checkpointOp struct {
     135             :         dbID objID
     136             :         // If non-empty, the checkpoint is restricted to these spans.
     137             :         spans []pebble.CheckpointSpan
     138             : }
     139             : 
     140           2 : func (o *checkpointOp) run(t *Test, h historyRecorder) {
     141           2 :         // TODO(josh): db.Checkpoint does not work with shared storage yet.
     142           2 :         // It would be better to filter out ahead of calling run on the op,
     143           2 :         // by setting the weight that generator.go uses to zero, or similar.
     144           2 :         // But IIUC the ops are shared for ALL the metamorphic test runs, so
     145           2 :         // not sure how to do that easily:
     146           2 :         // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
     147           2 :         if t.testOpts.sharedStorageEnabled || t.testOpts.externalStorageEnabled {
     148           2 :                 h.Recordf("%s // %v", o, nil)
     149           2 :                 return
     150           2 :         }
     151           2 :         var opts []pebble.CheckpointOption
     152           2 :         if len(o.spans) > 0 {
     153           2 :                 opts = append(opts, pebble.WithRestrictToSpans(o.spans))
     154           2 :         }
     155           2 :         db := t.getDB(o.dbID)
     156           2 :         err := t.withRetries(func() error {
     157           2 :                 return db.Checkpoint(o.dir(t.dir, h.op), opts...)
     158           2 :         })
     159           2 :         h.Recordf("%s // %v", o, err)
     160             : }
     161             : 
     162           2 : func (o *checkpointOp) dir(dataDir string, idx int) string {
     163           2 :         return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
     164           2 : }
     165             : 
     166           2 : func (o *checkpointOp) String() string {
     167           2 :         var spanStr bytes.Buffer
     168           2 :         for i, span := range o.spans {
     169           2 :                 if i > 0 {
     170           2 :                         spanStr.WriteString(",")
     171           2 :                 }
     172           2 :                 fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
     173             :         }
     174           2 :         return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
     175             : }
     176             : 
     177           2 : func (o *checkpointOp) receiver() objID      { return o.dbID }
     178           2 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
     179             : 
     180           0 : func (o *checkpointOp) keys() []*[]byte {
     181           0 :         var res []*[]byte
     182           0 :         for i := range o.spans {
     183           0 :                 res = append(res, &o.spans[i].Start, &o.spans[i].End)
     184           0 :         }
     185           0 :         return res
     186             : }
     187             : 
     188           0 : func (o *checkpointOp) diagramKeyRanges() []pebble.KeyRange {
     189           0 :         var res []pebble.KeyRange
     190           0 :         for i := range o.spans {
     191           0 :                 res = append(res, pebble.KeyRange{
     192           0 :                         Start: o.spans[i].Start,
     193           0 :                         End:   o.spans[i].End,
     194           0 :                 })
     195           0 :         }
     196           0 :         return res
     197             : }
     198             : 
     199             : // downloadOp models a DB.Download operation.
     200             : type downloadOp struct {
     201             :         dbID  objID
     202             :         spans []pebble.DownloadSpan
     203             : }
     204             : 
     205           2 : func (o *downloadOp) run(t *Test, h historyRecorder) {
     206           2 :         db := t.getDB(o.dbID)
     207           2 :         err := t.withRetries(func() error {
     208           2 :                 return db.Download(context.Background(), o.spans)
     209           2 :         })
     210           2 :         h.Recordf("%s // %v", o, err)
     211             : }
     212             : 
     213           2 : func (o *downloadOp) String() string {
     214           2 :         var spanStr bytes.Buffer
     215           2 :         for i, span := range o.spans {
     216           2 :                 if i > 0 {
     217           2 :                         spanStr.WriteString(", ")
     218           2 :                 }
     219           2 :                 fmt.Fprintf(&spanStr, "%q /* start */, %q /* end */, %v /* viaBackingFileDownload */",
     220           2 :                         span.StartKey, span.EndKey, span.ViaBackingFileDownload)
     221             :         }
     222           2 :         return fmt.Sprintf("%s.Download(%s)", o.dbID, spanStr.String())
     223             : }
     224             : 
     225           2 : func (o *downloadOp) receiver() objID     { return o.dbID }
     226           2 : func (o downloadOp) syncObjs() objIDSlice { return nil }
     227             : 
     228           0 : func (o *downloadOp) keys() []*[]byte {
     229           0 :         var res []*[]byte
     230           0 :         for i := range o.spans {
     231           0 :                 res = append(res, &o.spans[i].StartKey, &o.spans[i].EndKey)
     232           0 :         }
     233           0 :         return res
     234             : }
     235             : 
     236           0 : func (o *downloadOp) diagramKeyRanges() []pebble.KeyRange {
     237           0 :         var res []pebble.KeyRange
     238           0 :         for i := range o.spans {
     239           0 :                 res = append(res, pebble.KeyRange{
     240           0 :                         Start: o.spans[i].StartKey,
     241           0 :                         End:   o.spans[i].EndKey,
     242           0 :                 })
     243           0 :         }
     244           0 :         return res
     245             : }
     246             : 
     247             : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
     248             : type closeOp struct {
     249             :         objID objID
     250             : 
     251             :         // affectedObjects is the list of additional objects that are affected by this
     252             :         // operation, and which syncObjs() must return so that we don't perform the
     253             :         // close in parallel with other operations to affected objects.
     254             :         affectedObjects []objID
     255             : }
     256             : 
     257           2 : func (o *closeOp) run(t *Test, h historyRecorder) {
     258           2 :         c := t.getCloser(o.objID)
     259           2 :         if o.objID.tag() == dbTag && t.opts.DisableWAL {
     260           2 :                 // Special case: If WAL is disabled, do a flush right before DB Close. This
     261           2 :                 // allows us to reuse this run's data directory as initial state for
     262           2 :                 // future runs without losing any mutations.
     263           2 :                 _ = t.getDB(o.objID).Flush()
     264           2 :         }
     265           2 :         t.clearObj(o.objID)
     266           2 :         err := c.Close()
     267           2 :         h.Recordf("%s // %v", o, err)
     268             : }
     269             : 
     270           2 : func (o *closeOp) String() string  { return fmt.Sprintf("%s.Close()", o.objID) }
     271           2 : func (o *closeOp) receiver() objID { return o.objID }
     272           2 : func (o *closeOp) syncObjs() objIDSlice {
     273           2 :         return o.affectedObjects
     274           2 : }
     275             : 
     276           0 : func (o *closeOp) keys() []*[]byte                     { return nil }
     277           0 : func (o *closeOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     278             : 
     279             : // compactOp models a DB.Compact operation.
     280             : type compactOp struct {
     281             :         dbID        objID
     282             :         start       []byte
     283             :         end         []byte
     284             :         parallelize bool
     285             : }
     286             : 
     287           2 : func (o *compactOp) run(t *Test, h historyRecorder) {
     288           2 :         err := t.withRetries(func() error {
     289           2 :                 return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
     290           2 :         })
     291           2 :         h.Recordf("%s // %v", o, err)
     292             : }
     293             : 
     294           2 : func (o *compactOp) String() string {
     295           2 :         return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)", o.dbID, o.start, o.end, o.parallelize)
     296           2 : }
     297             : 
     298           2 : func (o *compactOp) receiver() objID      { return o.dbID }
     299           2 : func (o *compactOp) syncObjs() objIDSlice { return nil }
     300             : 
     301           1 : func (o *compactOp) keys() []*[]byte {
     302           1 :         return []*[]byte{&o.start, &o.end}
     303           1 : }
     304             : 
     305           1 : func (o *compactOp) diagramKeyRanges() []pebble.KeyRange {
     306           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     307           1 : }
     308             : 
     309             : // deleteOp models a Write.Delete operation.
     310             : type deleteOp struct {
     311             :         writerID objID
     312             :         key      []byte
     313             : 
     314             :         derivedDBID objID
     315             : }
     316             : 
     317           2 : func (o *deleteOp) run(t *Test, h historyRecorder) {
     318           2 :         w := t.getWriter(o.writerID)
     319           2 :         var err error
     320           2 :         if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
     321           2 :                 // Call DeleteSized with a deterministic size derived from the index.
     322           2 :                 // The size does not need to be accurate for correctness.
     323           2 :                 err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
     324           2 :         } else {
     325           2 :                 err = w.Delete(o.key, t.writeOpts)
     326           2 :         }
     327           2 :         h.Recordf("%s // %v", o, err)
     328             : }
     329             : 
     330           2 : func hashSize(index int) uint32 {
     331           2 :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
     332           2 :         return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
     333           2 : }
     334             : 
     335           2 : func (o *deleteOp) String() string {
     336           2 :         return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
     337           2 : }
     338           2 : func (o *deleteOp) receiver() objID      { return o.writerID }
     339           2 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
     340             : 
     341           0 : func (o *deleteOp) keys() []*[]byte {
     342           0 :         return []*[]byte{&o.key}
     343           0 : }
     344             : 
     345           0 : func (o *deleteOp) diagramKeyRanges() []pebble.KeyRange {
     346           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     347           0 : }
     348             : 
     349             : // singleDeleteOp models a Write.SingleDelete operation.
     350             : type singleDeleteOp struct {
     351             :         writerID           objID
     352             :         key                []byte
     353             :         maybeReplaceDelete bool
     354             : }
     355             : 
     356           2 : func (o *singleDeleteOp) run(t *Test, h historyRecorder) {
     357           2 :         w := t.getWriter(o.writerID)
     358           2 :         var err error
     359           2 :         if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
     360           2 :                 err = w.Delete(o.key, t.writeOpts)
     361           2 :         } else {
     362           2 :                 err = w.SingleDelete(o.key, t.writeOpts)
     363           2 :         }
     364             :         // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
     365             :         // write the former to the history log. The log line will indicate whether
     366             :         // or not the delete *could* have been replaced. The OPTIONS file should
     367             :         // also be consulted to determine what happened at runtime (i.e. by taking
     368             :         // the logical AND).
     369           2 :         h.Recordf("%s // %v", o, err)
     370             : }
     371             : 
     372           2 : func (o *singleDeleteOp) String() string {
     373           2 :         return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
     374           2 : }
     375             : 
     376           2 : func (o *singleDeleteOp) receiver() objID      { return o.writerID }
     377           2 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
     378             : 
     379           0 : func (o *singleDeleteOp) keys() []*[]byte {
     380           0 :         return []*[]byte{&o.key}
     381           0 : }
     382             : 
     383           0 : func (o *singleDeleteOp) diagramKeyRanges() []pebble.KeyRange {
     384           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     385           0 : }
     386             : 
     387             : // deleteRangeOp models a Write.DeleteRange operation.
     388             : type deleteRangeOp struct {
     389             :         writerID objID
     390             :         start    []byte
     391             :         end      []byte
     392             : }
     393             : 
     394           2 : func (o *deleteRangeOp) run(t *Test, h historyRecorder) {
     395           2 :         w := t.getWriter(o.writerID)
     396           2 :         err := w.DeleteRange(o.start, o.end, t.writeOpts)
     397           2 :         h.Recordf("%s // %v", o, err)
     398           2 : }
     399             : 
     400           2 : func (o *deleteRangeOp) String() string {
     401           2 :         return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
     402           2 : }
     403             : 
     404           2 : func (o *deleteRangeOp) receiver() objID      { return o.writerID }
     405           2 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
     406             : 
     407           0 : func (o *deleteRangeOp) keys() []*[]byte {
     408           0 :         return []*[]byte{&o.start, &o.end}
     409           0 : }
     410             : 
     411           0 : func (o *deleteRangeOp) diagramKeyRanges() []pebble.KeyRange {
     412           0 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     413           0 : }
     414             : 
     415             : // flushOp models a DB.Flush operation.
     416             : type flushOp struct {
     417             :         db objID
     418             : }
     419             : 
     420           2 : func (o *flushOp) run(t *Test, h historyRecorder) {
     421           2 :         db := t.getDB(o.db)
     422           2 :         err := db.Flush()
     423           2 :         h.Recordf("%s // %v", o, err)
     424           2 : }
     425             : 
     426           2 : func (o *flushOp) String() string                      { return fmt.Sprintf("%s.Flush()", o.db) }
     427           2 : func (o *flushOp) receiver() objID                     { return o.db }
     428           2 : func (o *flushOp) syncObjs() objIDSlice                { return nil }
     429           0 : func (o *flushOp) keys() []*[]byte                     { return nil }
     430           0 : func (o *flushOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     431             : 
     432             : // mergeOp models a Write.Merge operation.
     433             : type mergeOp struct {
     434             :         writerID objID
     435             :         key      []byte
     436             :         value    []byte
     437             : }
     438             : 
     439           2 : func (o *mergeOp) run(t *Test, h historyRecorder) {
     440           2 :         w := t.getWriter(o.writerID)
     441           2 :         err := w.Merge(o.key, o.value, t.writeOpts)
     442           2 :         h.Recordf("%s // %v", o, err)
     443           2 : }
     444             : 
     445           2 : func (o *mergeOp) String() string       { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
     446           2 : func (o *mergeOp) receiver() objID      { return o.writerID }
     447           2 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
     448             : 
     449           0 : func (o *mergeOp) keys() []*[]byte {
     450           0 :         return []*[]byte{&o.key}
     451           0 : }
     452             : 
     453           0 : func (o *mergeOp) diagramKeyRanges() []pebble.KeyRange {
     454           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     455           0 : }
     456             : 
     457             : // setOp models a Write.Set operation.
     458             : type setOp struct {
     459             :         writerID objID
     460             :         key      []byte
     461             :         value    []byte
     462             : }
     463             : 
     464           2 : func (o *setOp) run(t *Test, h historyRecorder) {
     465           2 :         w := t.getWriter(o.writerID)
     466           2 :         err := w.Set(o.key, o.value, t.writeOpts)
     467           2 :         h.Recordf("%s // %v", o, err)
     468           2 : }
     469             : 
     470           2 : func (o *setOp) String() string       { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
     471           2 : func (o *setOp) receiver() objID      { return o.writerID }
     472           2 : func (o *setOp) syncObjs() objIDSlice { return nil }
     473             : 
     474           0 : func (o *setOp) keys() []*[]byte {
     475           0 :         return []*[]byte{&o.key}
     476           0 : }
     477             : 
     478           0 : func (o *setOp) diagramKeyRanges() []pebble.KeyRange {
     479           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
     480           0 : }
     481             : 
     482             : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
     483             : type rangeKeyDeleteOp struct {
     484             :         writerID objID
     485             :         start    []byte
     486             :         end      []byte
     487             : }
     488             : 
     489           2 : func (o *rangeKeyDeleteOp) run(t *Test, h historyRecorder) {
     490           2 :         w := t.getWriter(o.writerID)
     491           2 :         err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
     492           2 :         h.Recordf("%s // %v", o, err)
     493           2 : }
     494             : 
     495           2 : func (o *rangeKeyDeleteOp) String() string {
     496           2 :         return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
     497           2 : }
     498             : 
     499           2 : func (o *rangeKeyDeleteOp) receiver() objID      { return o.writerID }
     500           2 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
     501             : 
     502           0 : func (o *rangeKeyDeleteOp) keys() []*[]byte {
     503           0 :         return []*[]byte{&o.start, &o.end}
     504           0 : }
     505             : 
     506           1 : func (o *rangeKeyDeleteOp) diagramKeyRanges() []pebble.KeyRange {
     507           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     508           1 : }
     509             : 
     510             : // rangeKeySetOp models a Write.RangeKeySet operation.
     511             : type rangeKeySetOp struct {
     512             :         writerID objID
     513             :         start    []byte
     514             :         end      []byte
     515             :         suffix   []byte
     516             :         value    []byte
     517             : }
     518             : 
     519           2 : func (o *rangeKeySetOp) run(t *Test, h historyRecorder) {
     520           2 :         w := t.getWriter(o.writerID)
     521           2 :         err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
     522           2 :         h.Recordf("%s // %v", o, err)
     523           2 : }
     524             : 
     525           2 : func (o *rangeKeySetOp) String() string {
     526           2 :         return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
     527           2 :                 o.writerID, o.start, o.end, o.suffix, o.value)
     528           2 : }
     529             : 
     530           2 : func (o *rangeKeySetOp) receiver() objID      { return o.writerID }
     531           2 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
     532             : 
     533           1 : func (o *rangeKeySetOp) keys() []*[]byte {
     534           1 :         return []*[]byte{&o.start, &o.end}
     535           1 : }
     536             : 
     537           1 : func (o *rangeKeySetOp) diagramKeyRanges() []pebble.KeyRange {
     538           1 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     539           1 : }
     540             : 
     541             : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
     542             : type rangeKeyUnsetOp struct {
     543             :         writerID objID
     544             :         start    []byte
     545             :         end      []byte
     546             :         suffix   []byte
     547             : }
     548             : 
     549           2 : func (o *rangeKeyUnsetOp) run(t *Test, h historyRecorder) {
     550           2 :         w := t.getWriter(o.writerID)
     551           2 :         err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
     552           2 :         h.Recordf("%s // %v", o, err)
     553           2 : }
     554             : 
     555           2 : func (o *rangeKeyUnsetOp) String() string {
     556           2 :         return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
     557           2 :                 o.writerID, o.start, o.end, o.suffix)
     558           2 : }
     559             : 
     560           2 : func (o *rangeKeyUnsetOp) receiver() objID      { return o.writerID }
     561           2 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
     562             : 
     563           0 : func (o *rangeKeyUnsetOp) keys() []*[]byte {
     564           0 :         return []*[]byte{&o.start, &o.end}
     565           0 : }
     566             : 
     567           0 : func (o *rangeKeyUnsetOp) diagramKeyRanges() []pebble.KeyRange {
     568           0 :         return []pebble.KeyRange{{Start: o.start, End: o.end}}
     569           0 : }
     570             : 
     571             : // logDataOp models a Writer.LogData operation.
     572             : type logDataOp struct {
     573             :         writerID objID
     574             :         data     []byte
     575             : }
     576             : 
     577           2 : func (o *logDataOp) run(t *Test, h historyRecorder) {
     578           2 :         w := t.getWriter(o.writerID)
     579           2 :         err := w.LogData(o.data, t.writeOpts)
     580           2 :         h.Recordf("%s // %v", o, err)
     581           2 : }
     582             : 
     583           2 : func (o *logDataOp) String() string {
     584           2 :         return fmt.Sprintf("%s.LogData(%q)", o.writerID, o.data)
     585           2 : }
     586             : 
     587           2 : func (o *logDataOp) receiver() objID                     { return o.writerID }
     588           2 : func (o *logDataOp) syncObjs() objIDSlice                { return nil }
     589           0 : func (o *logDataOp) keys() []*[]byte                     { return []*[]byte{} }
     590           0 : func (o *logDataOp) diagramKeyRanges() []pebble.KeyRange { return []pebble.KeyRange{} }
     591             : 
     592             : // newBatchOp models a Write.NewBatch operation.
     593             : type newBatchOp struct {
     594             :         dbID    objID
     595             :         batchID objID
     596             : }
     597             : 
     598           2 : func (o *newBatchOp) run(t *Test, h historyRecorder) {
     599           2 :         b := t.getDB(o.dbID).NewBatch()
     600           2 :         t.setBatch(o.batchID, b)
     601           2 :         h.Recordf("%s", o)
     602           2 : }
     603             : 
     604           2 : func (o *newBatchOp) String() string  { return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID) }
     605           2 : func (o *newBatchOp) receiver() objID { return o.dbID }
     606           2 : func (o *newBatchOp) syncObjs() objIDSlice {
     607           2 :         // NewBatch should not be concurrent with operations that interact with that
     608           2 :         // same batch.
     609           2 :         return []objID{o.batchID}
     610           2 : }
     611             : 
     612           0 : func (o *newBatchOp) keys() []*[]byte                     { return nil }
     613           0 : func (o *newBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     614             : 
     615             : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
     616             : type newIndexedBatchOp struct {
     617             :         dbID    objID
     618             :         batchID objID
     619             : }
     620             : 
     621           2 : func (o *newIndexedBatchOp) run(t *Test, h historyRecorder) {
     622           2 :         b := t.getDB(o.dbID).NewIndexedBatch()
     623           2 :         t.setBatch(o.batchID, b)
     624           2 :         h.Recordf("%s", o)
     625           2 : }
     626             : 
     627           2 : func (o *newIndexedBatchOp) String() string {
     628           2 :         return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
     629           2 : }
     630           2 : func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
     631           2 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
     632           2 :         // NewIndexedBatch should not be concurrent with operations that interact
     633           2 :         // with that same batch.
     634           2 :         return []objID{o.batchID}
     635           2 : }
     636             : 
     637           0 : func (o *newIndexedBatchOp) keys() []*[]byte                     { return nil }
     638           0 : func (o *newIndexedBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     639             : 
     640             : // batchCommitOp models a Batch.Commit operation.
     641             : type batchCommitOp struct {
     642             :         dbID    objID
     643             :         batchID objID
     644             : }
     645             : 
     646           2 : func (o *batchCommitOp) run(t *Test, h historyRecorder) {
     647           2 :         b := t.getBatch(o.batchID)
     648           2 :         err := b.Commit(t.writeOpts)
     649           2 :         h.Recordf("%s // %v", o, err)
     650           2 : }
     651             : 
     652           2 : func (o *batchCommitOp) String() string  { return fmt.Sprintf("%s.Commit()", o.batchID) }
     653           2 : func (o *batchCommitOp) receiver() objID { return o.batchID }
     654           2 : func (o *batchCommitOp) syncObjs() objIDSlice {
     655           2 :         // Synchronize on the database so that NewIters wait for the commit.
     656           2 :         return []objID{o.dbID}
     657           2 : }
     658             : 
     659           0 : func (o *batchCommitOp) keys() []*[]byte                     { return nil }
     660           0 : func (o *batchCommitOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     661             : 
     662             : // ingestOp models a DB.Ingest operation.
     663             : type ingestOp struct {
     664             :         dbID     objID
     665             :         batchIDs []objID
     666             : 
     667             :         derivedDBIDs []objID
     668             : }
     669             : 
     670           2 : func (o *ingestOp) run(t *Test, h historyRecorder) {
     671           2 :         // We can only use apply as an alternative for ingestion if we are ingesting
     672           2 :         // a single batch. If we are ingesting multiple batches, the batches may
     673           2 :         // overlap which would cause ingestion to fail but apply would succeed.
     674           2 :         if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
     675           2 :                 id := o.batchIDs[0]
     676           2 :                 b := t.getBatch(id)
     677           2 :                 iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     678           2 :                 db := t.getDB(o.dbID)
     679           2 :                 c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
     680           2 :                 if err == nil {
     681           2 :                         err = db.Apply(c, t.writeOpts)
     682           2 :                 }
     683           2 :                 _ = b.Close()
     684           2 :                 _ = c.Close()
     685           2 :                 t.clearObj(id)
     686           2 :                 h.Recordf("%s // %v", o, err)
     687           2 :                 return
     688             :         }
     689             : 
     690           2 :         var paths []string
     691           2 :         var err error
     692           2 :         for i, id := range o.batchIDs {
     693           2 :                 b := t.getBatch(id)
     694           2 :                 t.clearObj(id)
     695           2 :                 path, _, err2 := buildForIngest(t, o.dbID, b, i)
     696           2 :                 if err2 != nil {
     697           0 :                         h.Recordf("Build(%s) // %v", id, err2)
     698           0 :                 }
     699           2 :                 err = firstError(err, err2)
     700           2 :                 if err2 == nil {
     701           2 :                         paths = append(paths, path)
     702           2 :                 }
     703           2 :                 err = firstError(err, b.Close())
     704             :         }
     705             : 
     706           2 :         err = firstError(err, t.withRetries(func() error {
     707           2 :                 return t.getDB(o.dbID).Ingest(paths)
     708           2 :         }))
     709             : 
     710           2 :         h.Recordf("%s // %v", o, err)
     711             : }
     712             : 
     713           2 : func (o *ingestOp) receiver() objID { return o.dbID }
     714           2 : func (o *ingestOp) syncObjs() objIDSlice {
     715           2 :         // Ingest should not be concurrent with mutating the batches that will be
     716           2 :         // ingested as sstables.
     717           2 :         objs := make([]objID, 0, len(o.batchIDs)+1)
     718           2 :         objs = append(objs, o.batchIDs...)
     719           2 :         addedDBs := make(map[objID]struct{})
     720           2 :         for i := range o.derivedDBIDs {
     721           2 :                 _, ok := addedDBs[o.derivedDBIDs[i]]
     722           2 :                 if !ok && o.derivedDBIDs[i] != o.dbID {
     723           1 :                         objs = append(objs, o.derivedDBIDs[i])
     724           1 :                         addedDBs[o.derivedDBIDs[i]] = struct{}{}
     725           1 :                 }
     726             :         }
     727           2 :         return objs
     728             : }
     729             : 
     730             : func closeIters(
     731             :         pointIter base.InternalIterator,
     732             :         rangeDelIter keyspan.FragmentIterator,
     733             :         rangeKeyIter keyspan.FragmentIterator,
     734           2 : ) {
     735           2 :         if pointIter != nil {
     736           2 :                 pointIter.Close()
     737           2 :         }
     738           2 :         if rangeDelIter != nil {
     739           2 :                 rangeDelIter.Close()
     740           2 :         }
     741           2 :         if rangeKeyIter != nil {
     742           2 :                 rangeKeyIter.Close()
     743           2 :         }
     744             : }
     745             : 
     746             : // collapseBatch collapses the mutations in a batch to be equivalent to an
     747             : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
     748             : // so that only the latest update is performed. All range deletions are
     749             : // performed first in the batch to match the semantics of ingestion where a
     750             : // range deletion does not delete a point record contained in the sstable.
     751             : func (o *ingestOp) collapseBatch(
     752             :         t *Test,
     753             :         db *pebble.DB,
     754             :         pointIter base.InternalIterator,
     755             :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     756             :         b *pebble.Batch,
     757           2 : ) (*pebble.Batch, error) {
     758           2 :         defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
     759           2 :         equal := t.opts.Comparer.Equal
     760           2 :         collapsed := db.NewBatch()
     761           2 : 
     762           2 :         if rangeDelIter != nil {
     763           2 :                 // NB: The range tombstones have already been fragmented by the Batch.
     764           2 :                 t, err := rangeDelIter.First()
     765           2 :                 for ; t != nil; t, err = rangeDelIter.Next() {
     766           2 :                         // NB: We don't have to copy the key or value since we're reading from a
     767           2 :                         // batch which doesn't do prefix compression.
     768           2 :                         if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
     769           0 :                                 return nil, err
     770           0 :                         }
     771             :                 }
     772           2 :                 if err != nil {
     773           0 :                         return nil, err
     774           0 :                 }
     775           2 :                 rangeDelIter.Close()
     776           2 :                 rangeDelIter = nil
     777             :         }
     778             : 
     779           2 :         if pointIter != nil {
     780           2 :                 var lastUserKey []byte
     781           2 :                 for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
     782           2 :                         // Ignore duplicate keys.
     783           2 :                         //
     784           2 :                         // Note: this is necessary due to MERGE keys, otherwise it would be
     785           2 :                         // fine to include all the keys in the batch and let the normal
     786           2 :                         // sequence number precedence determine which of the keys "wins".
     787           2 :                         // But the code to build the ingested sstable will only keep the
     788           2 :                         // most recent internal key and will not merge across internal keys.
     789           2 :                         if equal(lastUserKey, kv.K.UserKey) {
     790           2 :                                 continue
     791             :                         }
     792             :                         // NB: We don't have to copy the key or value since we're reading from a
     793             :                         // batch which doesn't do prefix compression.
     794           2 :                         lastUserKey = kv.K.UserKey
     795           2 : 
     796           2 :                         var err error
     797           2 :                         switch kv.Kind() {
     798           2 :                         case pebble.InternalKeyKindDelete:
     799           2 :                                 err = collapsed.Delete(kv.K.UserKey, nil)
     800           2 :                         case pebble.InternalKeyKindDeleteSized:
     801           2 :                                 v, _ := binary.Uvarint(kv.InPlaceValue())
     802           2 :                                 // Batch.DeleteSized takes just the length of the value being
     803           2 :                                 // deleted and adds the key's length to derive the overall entry
     804           2 :                                 // size of the value being deleted. This has already been done
     805           2 :                                 // to the key we're reading from the batch, so we must subtract
     806           2 :                                 // the key length from the encoded value before calling
     807           2 :                                 // collapsed.DeleteSized, which will again add the key length
     808           2 :                                 // before encoding.
     809           2 :                                 err = collapsed.DeleteSized(kv.K.UserKey, uint32(v-uint64(len(kv.K.UserKey))), nil)
     810           2 :                         case pebble.InternalKeyKindSingleDelete:
     811           2 :                                 err = collapsed.SingleDelete(kv.K.UserKey, nil)
     812           2 :                         case pebble.InternalKeyKindSet:
     813           2 :                                 err = collapsed.Set(kv.K.UserKey, kv.InPlaceValue(), nil)
     814           2 :                         case pebble.InternalKeyKindMerge:
     815           2 :                                 err = collapsed.Merge(kv.K.UserKey, kv.InPlaceValue(), nil)
     816           0 :                         case pebble.InternalKeyKindLogData:
     817           0 :                                 err = collapsed.LogData(kv.K.UserKey, nil)
     818           0 :                         default:
     819           0 :                                 err = errors.Errorf("unknown batch record kind: %d", kv.Kind())
     820             :                         }
     821           2 :                         if err != nil {
     822           0 :                                 return nil, err
     823           0 :                         }
     824             :                 }
     825           2 :                 if err := pointIter.Close(); err != nil {
     826           0 :                         return nil, err
     827           0 :                 }
     828           2 :                 pointIter = nil
     829             :         }
     830             : 
     831             :         // There's no equivalent of a MERGE operator for range keys, so there's no
     832             :         // need to collapse the range keys here. Rather than reading the range keys
     833             :         // from `rangeKeyIter`, which will already be fragmented, read the range
     834             :         // keys from the batch and copy them verbatim. This marginally improves our
     835             :         // test coverage over the alternative approach of pre-fragmenting and
     836             :         // pre-coalescing before writing to the batch.
     837             :         //
     838             :         // The `rangeKeyIter` is used only to determine if there are any range keys
     839             :         // in the batch at all, and only because we already have it handy from
     840             :         // private.BatchSort.
     841           2 :         if rangeKeyIter != nil {
     842           2 :                 for r := b.Reader(); ; {
     843           2 :                         kind, key, value, ok, err := r.Next()
     844           2 :                         if !ok {
     845           2 :                                 if err != nil {
     846           0 :                                         return nil, err
     847           0 :                                 }
     848           2 :                                 break
     849           2 :                         } else if !rangekey.IsRangeKey(kind) {
     850           2 :                                 continue
     851             :                         }
     852           2 :                         ik := base.MakeInternalKey(key, 0, kind)
     853           2 :                         if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
     854           0 :                                 return nil, err
     855           0 :                         }
     856             :                 }
     857           2 :                 rangeKeyIter.Close()
     858           2 :                 rangeKeyIter = nil
     859             :         }
     860             : 
     861           2 :         return collapsed, nil
     862             : }
     863             : 
     864           2 : func (o *ingestOp) String() string {
     865           2 :         var buf strings.Builder
     866           2 :         buf.WriteString(o.dbID.String())
     867           2 :         buf.WriteString(".Ingest(")
     868           2 :         for i, id := range o.batchIDs {
     869           2 :                 if i > 0 {
     870           2 :                         buf.WriteString(", ")
     871           2 :                 }
     872           2 :                 buf.WriteString(id.String())
     873             :         }
     874           2 :         buf.WriteString(")")
     875           2 :         return buf.String()
     876             : }
     877             : 
     878           0 : func (o *ingestOp) keys() []*[]byte                     { return nil }
     879           0 : func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil }
     880             : 
     881             : type ingestAndExciseOp struct {
     882             :         dbID                       objID
     883             :         batchID                    objID
     884             :         derivedDBID                objID
     885             :         exciseStart, exciseEnd     []byte
     886             :         sstContainsExciseTombstone bool
     887             : }
     888             : 
     889           2 : func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) {
     890           2 :         var err error
     891           2 :         b := t.getBatch(o.batchID)
     892           2 :         t.clearObj(o.batchID)
     893           2 :         if t.testOpts.Opts.Comparer.Compare(o.exciseEnd, o.exciseStart) <= 0 {
     894           0 :                 panic("non-well-formed excise span")
     895             :         }
     896           2 :         db := t.getDB(o.dbID)
     897           2 :         if b.Empty() {
     898           2 :                 h.Recordf("%s // %v", o, o.simulateExcise(db, t))
     899           2 :                 return
     900           2 :         }
     901             : 
     902           2 :         if o.sstContainsExciseTombstone {
     903           2 :                 // Add a rangedel and rangekeydel to the batch. This ensures it'll end up
     904           2 :                 // inside the sstable. Note that all entries in the sstable will have the
     905           2 :                 // same sequence number, so the ordering within the batch doesn't matter.
     906           2 :                 err = firstError(err, b.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts))
     907           2 :                 err = firstError(err, b.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts))
     908           2 :         }
     909           2 :         path, writerMeta, err2 := buildForIngest(t, o.dbID, b, 0 /* i */)
     910           2 :         if err2 != nil {
     911           0 :                 h.Recordf("Build(%s) // %v", o.batchID, err2)
     912           0 :                 return
     913           0 :         }
     914           2 :         err = firstError(err, b.Close())
     915           2 : 
     916           2 :         if writerMeta.Properties.NumEntries == 0 && writerMeta.Properties.NumRangeKeys() == 0 {
     917           2 :                 h.Recordf("%s // %v", o, o.simulateExcise(db, t))
     918           2 :                 return
     919           2 :         }
     920             : 
     921           2 :         if t.testOpts.useExcise {
     922           2 :                 err = firstError(err, t.withRetries(func() error {
     923           2 :                         _, err := db.IngestAndExcise([]string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{
     924           2 :                                 Start: o.exciseStart,
     925           2 :                                 End:   o.exciseEnd,
     926           2 :                         }, o.sstContainsExciseTombstone)
     927           2 :                         return err
     928           2 :                 }))
     929           2 :         } else {
     930           2 :                 err = firstError(err, o.simulateExcise(db, t))
     931           2 :                 err = firstError(err, t.withRetries(func() error {
     932           2 :                         return db.Ingest([]string{path})
     933           2 :                 }))
     934             :         }
     935             : 
     936           2 :         h.Recordf("%s // %v", o, err)
     937             : }
     938             : 
     939           2 : func (o *ingestAndExciseOp) simulateExcise(db *pebble.DB, t *Test) error {
     940           2 :         // Simulate the excise using a DeleteRange and RangeKeyDelete.
     941           2 :         return errors.CombineErrors(
     942           2 :                 db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts),
     943           2 :                 db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts),
     944           2 :         )
     945           2 : }
     946             : 
     947           2 : func (o *ingestAndExciseOp) receiver() objID { return o.dbID }
     948           2 : func (o *ingestAndExciseOp) syncObjs() objIDSlice {
     949           2 :         // Ingest should not be concurrent with mutating the batches that will be
     950           2 :         // ingested as sstables.
     951           2 :         objs := []objID{o.batchID}
     952           2 :         if o.derivedDBID != o.dbID {
     953           1 :                 objs = append(objs, o.derivedDBID)
     954           1 :         }
     955           2 :         return objs
     956             : }
     957             : 
     958           2 : func (o *ingestAndExciseOp) String() string {
     959           2 :         return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q, %t /* sstContainsExciseTombstone */)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd, o.sstContainsExciseTombstone)
     960           2 : }
     961             : 
     962           0 : func (o *ingestAndExciseOp) keys() []*[]byte {
     963           0 :         return []*[]byte{&o.exciseStart, &o.exciseEnd}
     964           0 : }
     965             : 
     966           0 : func (o *ingestAndExciseOp) diagramKeyRanges() []pebble.KeyRange {
     967           0 :         return []pebble.KeyRange{{Start: o.exciseStart, End: o.exciseEnd}}
     968           0 : }
     969             : 
     970             : // ingestExternalFilesOp models a DB.IngestExternalFiles operation.
     971             : //
     972             : // When remote storage is not enabled, the operation is emulated using the
     973             : // regular DB.Ingest; this serves as a cross-check of the result.
     974             : type ingestExternalFilesOp struct {
     975             :         dbID objID
     976             :         // The bounds of the objects cannot overlap.
     977             :         objs []externalObjWithBounds
     978             : }
     979             : 
     980             : type externalObjWithBounds struct {
     981             :         externalObjID objID
     982             : 
     983             :         // bounds for the external object. These bounds apply after keys undergo
     984             :         // any prefix or suffix transforms.
     985             :         bounds pebble.KeyRange
     986             : 
     987             :         syntheticPrefix sstable.SyntheticPrefix
     988             :         syntheticSuffix sstable.SyntheticSuffix
     989             : }
     990             : 
     991           2 : func (o *ingestExternalFilesOp) run(t *Test, h historyRecorder) {
     992           2 :         db := t.getDB(o.dbID)
     993           2 : 
     994           2 :         // Verify the objects exist (useful for --try-to-reduce).
     995           2 :         for i := range o.objs {
     996           2 :                 t.getExternalObj(o.objs[i].externalObjID)
     997           2 :         }
     998             : 
     999           2 :         var err error
    1000           2 :         if !t.testOpts.externalStorageEnabled {
    1001           2 :                 // Emulate the operation by crating local, truncated SST files and ingesting
    1002           2 :                 // them.
    1003           2 :                 var paths []string
    1004           2 :                 for i, obj := range o.objs {
    1005           2 :                         // Make sure the object exists and is not empty.
    1006           2 :                         path, sstMeta := buildForIngestExternalEmulation(
    1007           2 :                                 t, o.dbID, obj.externalObjID, obj.bounds, obj.syntheticSuffix, obj.syntheticPrefix, i,
    1008           2 :                         )
    1009           2 :                         if sstMeta.HasPointKeys || sstMeta.HasRangeKeys || sstMeta.HasRangeDelKeys {
    1010           2 :                                 paths = append(paths, path)
    1011           2 :                         }
    1012             :                 }
    1013           2 :                 if len(paths) > 0 {
    1014           2 :                         err = db.Ingest(paths)
    1015           2 :                 }
    1016           2 :         } else {
    1017           2 :                 external := make([]pebble.ExternalFile, len(o.objs))
    1018           2 :                 for i, obj := range o.objs {
    1019           2 :                         meta := t.getExternalObj(obj.externalObjID)
    1020           2 :                         external[i] = pebble.ExternalFile{
    1021           2 :                                 Locator:           "external",
    1022           2 :                                 ObjName:           externalObjName(obj.externalObjID),
    1023           2 :                                 Size:              meta.sstMeta.Size,
    1024           2 :                                 StartKey:          obj.bounds.Start,
    1025           2 :                                 EndKey:            obj.bounds.End,
    1026           2 :                                 EndKeyIsInclusive: false,
    1027           2 :                                 // Note: if the table has point/range keys, we don't know for sure whether
    1028           2 :                                 // this particular range has any, but that's acceptable.
    1029           2 :                                 HasPointKey:     meta.sstMeta.HasPointKeys || meta.sstMeta.HasRangeDelKeys,
    1030           2 :                                 HasRangeKey:     meta.sstMeta.HasRangeKeys,
    1031           2 :                                 SyntheticSuffix: obj.syntheticSuffix,
    1032           2 :                         }
    1033           2 :                         if obj.syntheticPrefix.IsSet() {
    1034           2 :                                 external[i].SyntheticPrefix = obj.syntheticPrefix
    1035           2 :                         }
    1036             :                 }
    1037           2 :                 _, err = db.IngestExternalFiles(external)
    1038             :         }
    1039             : 
    1040           2 :         h.Recordf("%s // %v", o, err)
    1041             : }
    1042             : 
    1043           2 : func (o *ingestExternalFilesOp) receiver() objID { return o.dbID }
    1044           2 : func (o *ingestExternalFilesOp) syncObjs() objIDSlice {
    1045           2 :         res := make(objIDSlice, len(o.objs))
    1046           2 :         for i := range res {
    1047           2 :                 res[i] = o.objs[i].externalObjID
    1048           2 :         }
    1049             :         // Deduplicate the IDs.
    1050           2 :         slices.Sort(res)
    1051           2 :         return slices.Compact(res)
    1052             : }
    1053             : 
    1054           2 : func (o *ingestExternalFilesOp) String() string {
    1055           2 :         strs := make([]string, len(o.objs))
    1056           2 :         for i, obj := range o.objs {
    1057           2 :                 strs[i] = fmt.Sprintf("%s, %q /* start */, %q /* end */, %q /* syntheticSuffix */, %q /* syntheticPrefix */",
    1058           2 :                         obj.externalObjID, obj.bounds.Start, obj.bounds.End, obj.syntheticSuffix, obj.syntheticPrefix,
    1059           2 :                 )
    1060           2 :         }
    1061           2 :         return fmt.Sprintf("%s.IngestExternalFiles(%s)", o.dbID, strings.Join(strs, ", "))
    1062             : }
    1063             : 
    1064           0 : func (o *ingestExternalFilesOp) keys() []*[]byte {
    1065           0 :         // If any of the objects have synthetic prefixes, we can't allow modification
    1066           0 :         // of external object bounds.
    1067           0 :         for i := range o.objs {
    1068           0 :                 if o.objs[i].syntheticPrefix.IsSet() {
    1069           0 :                         return nil
    1070           0 :                 }
    1071             :         }
    1072             : 
    1073           0 :         var res []*[]byte
    1074           0 :         for i := range o.objs {
    1075           0 :                 res = append(res, &o.objs[i].bounds.Start, &o.objs[i].bounds.End)
    1076           0 :         }
    1077           0 :         return res
    1078             : }
    1079             : 
    1080           0 : func (o *ingestExternalFilesOp) diagramKeyRanges() []pebble.KeyRange {
    1081           0 :         ranges := make([]pebble.KeyRange, len(o.objs))
    1082           0 :         for i, obj := range o.objs {
    1083           0 :                 ranges[i] = obj.bounds
    1084           0 :         }
    1085           0 :         return ranges
    1086             : }
    1087             : 
    1088             : // getOp models a Reader.Get operation.
    1089             : type getOp struct {
    1090             :         readerID    objID
    1091             :         key         []byte
    1092             :         derivedDBID objID
    1093             : }
    1094             : 
    1095           2 : func (o *getOp) run(t *Test, h historyRecorder) {
    1096           2 :         r := t.getReader(o.readerID)
    1097           2 :         var val []byte
    1098           2 :         var closer io.Closer
    1099           2 :         err := t.withRetries(func() (err error) {
    1100           2 :                 val, closer, err = r.Get(o.key)
    1101           2 :                 return err
    1102           2 :         })
    1103           2 :         h.Recordf("%s // [%q] %v", o, val, err)
    1104           2 :         if closer != nil {
    1105           2 :                 closer.Close()
    1106           2 :         }
    1107             : }
    1108             : 
    1109           2 : func (o *getOp) String() string  { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
    1110           2 : func (o *getOp) receiver() objID { return o.readerID }
    1111           2 : func (o *getOp) syncObjs() objIDSlice {
    1112           2 :         if o.readerID.tag() == dbTag {
    1113           2 :                 return nil
    1114           2 :         }
    1115             :         // batch.Get reads through to the current database state.
    1116           2 :         if o.derivedDBID != 0 {
    1117           2 :                 return []objID{o.derivedDBID}
    1118           2 :         }
    1119           0 :         return nil
    1120             : }
    1121             : 
    1122           0 : func (o *getOp) keys() []*[]byte {
    1123           0 :         return []*[]byte{&o.key}
    1124           0 : }
    1125             : 
    1126           0 : func (o *getOp) diagramKeyRanges() []pebble.KeyRange {
    1127           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1128           0 : }
    1129             : 
    1130             : // newIterOp models a Reader.NewIter operation.
    1131             : type newIterOp struct {
    1132             :         readerID objID
    1133             :         iterID   objID
    1134             :         iterOpts
    1135             :         derivedDBID objID
    1136             : }
    1137             : 
    1138             : // Enable this to enable debug logging of range key iterator operations.
    1139             : const debugIterators = false
    1140             : 
    1141           2 : func (o *newIterOp) run(t *Test, h historyRecorder) {
    1142           2 :         r := t.getReader(o.readerID)
    1143           2 :         opts := iterOptions(o.iterOpts)
    1144           2 :         if debugIterators {
    1145           0 :                 opts.DebugRangeKeyStack = true
    1146           0 :         }
    1147             : 
    1148           2 :         var i *pebble.Iterator
    1149           2 :         for {
    1150           2 :                 i, _ = r.NewIter(opts)
    1151           2 :                 if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
    1152           2 :                         break
    1153             :                 }
    1154             :                 // close this iter and retry NewIter
    1155           0 :                 _ = i.Close()
    1156             :         }
    1157           2 :         t.setIter(o.iterID, i)
    1158           2 : 
    1159           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1160           2 :         // the user-provided bounds.
    1161           2 :         if opts != nil {
    1162           2 :                 rand.Read(opts.LowerBound[:])
    1163           2 :                 rand.Read(opts.UpperBound[:])
    1164           2 :         }
    1165           2 :         h.Recordf("%s // %v", o, i.Error())
    1166             : }
    1167             : 
    1168           2 : func (o *newIterOp) String() string {
    1169           2 :         return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
    1170           2 :                 o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
    1171           2 : }
    1172             : 
    1173           2 : func (o *newIterOp) receiver() objID { return o.readerID }
    1174           2 : func (o *newIterOp) syncObjs() objIDSlice {
    1175           2 :         // Prevent o.iterID ops from running before it exists.
    1176           2 :         objs := []objID{o.iterID}
    1177           2 :         // If reading through a batch or snapshot, the new iterator will also observe database
    1178           2 :         // state, and we must synchronize on the database state for a consistent
    1179           2 :         // view.
    1180           2 :         if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
    1181           2 :                 objs = append(objs, o.derivedDBID)
    1182           2 :         }
    1183           2 :         return objs
    1184             : }
    1185             : 
    1186           0 : func (o *newIterOp) keys() []*[]byte {
    1187           0 :         var res []*[]byte
    1188           0 :         if o.lower != nil {
    1189           0 :                 res = append(res, &o.lower)
    1190           0 :         }
    1191           0 :         if o.upper != nil {
    1192           0 :                 res = append(res, &o.upper)
    1193           0 :         }
    1194           0 :         return res
    1195             : }
    1196             : 
    1197           1 : func (o *newIterOp) diagramKeyRanges() []pebble.KeyRange {
    1198           1 :         var res []pebble.KeyRange
    1199           1 :         if o.lower != nil {
    1200           0 :                 res = append(res, pebble.KeyRange{Start: o.lower, End: o.lower})
    1201           0 :         }
    1202           1 :         if o.upper != nil {
    1203           0 :                 res = append(res, pebble.KeyRange{Start: o.upper, End: o.upper})
    1204           0 :         }
    1205           1 :         return res
    1206             : }
    1207             : 
    1208             : // newIterUsingCloneOp models a Iterator.Clone operation.
    1209             : type newIterUsingCloneOp struct {
    1210             :         existingIterID objID
    1211             :         iterID         objID
    1212             :         refreshBatch   bool
    1213             :         iterOpts
    1214             : 
    1215             :         // derivedReaderID is the ID of the underlying reader that backs both the
    1216             :         // existing iterator and the new iterator. The derivedReaderID is NOT
    1217             :         // serialized by String and is derived from other operations during parse.
    1218             :         derivedReaderID objID
    1219             : }
    1220             : 
    1221           2 : func (o *newIterUsingCloneOp) run(t *Test, h historyRecorder) {
    1222           2 :         iter := t.getIter(o.existingIterID)
    1223           2 :         cloneOpts := pebble.CloneOptions{
    1224           2 :                 IterOptions:      iterOptions(o.iterOpts),
    1225           2 :                 RefreshBatchView: o.refreshBatch,
    1226           2 :         }
    1227           2 :         i, err := iter.iter.Clone(cloneOpts)
    1228           2 :         if err != nil {
    1229           0 :                 panic(err)
    1230             :         }
    1231           2 :         t.setIter(o.iterID, i)
    1232           2 :         h.Recordf("%s // %v", o, i.Error())
    1233             : }
    1234             : 
    1235           2 : func (o *newIterUsingCloneOp) String() string {
    1236           2 :         return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
    1237           2 :                 o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
    1238           2 :                 o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
    1239           2 : }
    1240             : 
    1241           2 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
    1242             : 
    1243           2 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
    1244           2 :         objIDs := []objID{o.iterID}
    1245           2 :         // If the underlying reader is a batch, we must synchronize with the batch.
    1246           2 :         // If refreshBatch=true, synchronizing is necessary to observe all the
    1247           2 :         // mutations up to until this op and no more. Even when refreshBatch=false,
    1248           2 :         // we must synchronize because iterator construction may access state cached
    1249           2 :         // on the indexed batch to avoid refragmenting range tombstones or range
    1250           2 :         // keys.
    1251           2 :         if o.derivedReaderID.tag() == batchTag {
    1252           1 :                 objIDs = append(objIDs, o.derivedReaderID)
    1253           1 :         }
    1254           2 :         return objIDs
    1255             : }
    1256             : 
    1257           0 : func (o *newIterUsingCloneOp) keys() []*[]byte                     { return nil }
    1258           0 : func (o *newIterUsingCloneOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1259             : 
    1260             : // iterSetBoundsOp models an Iterator.SetBounds operation.
    1261             : type iterSetBoundsOp struct {
    1262             :         iterID objID
    1263             :         lower  []byte
    1264             :         upper  []byte
    1265             : }
    1266             : 
    1267           2 : func (o *iterSetBoundsOp) run(t *Test, h historyRecorder) {
    1268           2 :         i := t.getIter(o.iterID)
    1269           2 :         var lower, upper []byte
    1270           2 :         if o.lower != nil {
    1271           2 :                 lower = append(lower, o.lower...)
    1272           2 :         }
    1273           2 :         if o.upper != nil {
    1274           2 :                 upper = append(upper, o.upper...)
    1275           2 :         }
    1276           2 :         i.SetBounds(lower, upper)
    1277           2 : 
    1278           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1279           2 :         // the user-provided bounds.
    1280           2 :         rand.Read(lower[:])
    1281           2 :         rand.Read(upper[:])
    1282           2 : 
    1283           2 :         h.Recordf("%s // %v", o, i.Error())
    1284             : }
    1285             : 
    1286           2 : func (o *iterSetBoundsOp) String() string {
    1287           2 :         return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
    1288           2 : }
    1289             : 
    1290           2 : func (o *iterSetBoundsOp) receiver() objID      { return o.iterID }
    1291           2 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
    1292             : 
    1293           0 : func (o *iterSetBoundsOp) keys() []*[]byte {
    1294           0 :         return []*[]byte{&o.lower, &o.upper}
    1295           0 : }
    1296             : 
    1297           0 : func (o *iterSetBoundsOp) diagramKeyRanges() []pebble.KeyRange {
    1298           0 :         return []pebble.KeyRange{{Start: o.lower, End: o.upper}}
    1299           0 : }
    1300             : 
    1301             : // iterSetOptionsOp models an Iterator.SetOptions operation.
    1302             : type iterSetOptionsOp struct {
    1303             :         iterID objID
    1304             :         iterOpts
    1305             : 
    1306             :         // derivedReaderID is the ID of the underlying reader that backs the
    1307             :         // iterator. The derivedReaderID is NOT serialized by String and is derived
    1308             :         // from other operations during parse.
    1309             :         derivedReaderID objID
    1310             : }
    1311             : 
    1312           2 : func (o *iterSetOptionsOp) run(t *Test, h historyRecorder) {
    1313           2 :         i := t.getIter(o.iterID)
    1314           2 : 
    1315           2 :         opts := iterOptions(o.iterOpts)
    1316           2 :         if opts == nil {
    1317           2 :                 opts = &pebble.IterOptions{}
    1318           2 :         }
    1319           2 :         i.SetOptions(opts)
    1320           2 : 
    1321           2 :         // Trash the bounds to ensure that Pebble doesn't rely on the stability of
    1322           2 :         // the user-provided bounds.
    1323           2 :         rand.Read(opts.LowerBound[:])
    1324           2 :         rand.Read(opts.UpperBound[:])
    1325           2 : 
    1326           2 :         h.Recordf("%s // %v", o, i.Error())
    1327             : }
    1328             : 
    1329           2 : func (o *iterSetOptionsOp) String() string {
    1330           2 :         return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
    1331           2 :                 o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
    1332           2 : }
    1333             : 
    1334           2 : func iterOptions(o iterOpts) *pebble.IterOptions {
    1335           2 :         if o.IsZero() && !debugIterators {
    1336           2 :                 return nil
    1337           2 :         }
    1338           2 :         var lower, upper []byte
    1339           2 :         if o.lower != nil {
    1340           2 :                 lower = append(lower, o.lower...)
    1341           2 :         }
    1342           2 :         if o.upper != nil {
    1343           2 :                 upper = append(upper, o.upper...)
    1344           2 :         }
    1345           2 :         opts := &pebble.IterOptions{
    1346           2 :                 LowerBound: lower,
    1347           2 :                 UpperBound: upper,
    1348           2 :                 KeyTypes:   pebble.IterKeyType(o.keyTypes),
    1349           2 :                 RangeKeyMasking: pebble.RangeKeyMasking{
    1350           2 :                         Suffix: o.maskSuffix,
    1351           2 :                 },
    1352           2 :                 UseL6Filters:       o.useL6Filters,
    1353           2 :                 DebugRangeKeyStack: debugIterators,
    1354           2 :         }
    1355           2 :         if opts.RangeKeyMasking.Suffix != nil {
    1356           2 :                 opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
    1357           2 :                         return sstable.NewTestKeysMaskingFilter()
    1358           2 :                 }
    1359             :         }
    1360           2 :         if o.filterMax > 0 {
    1361           2 :                 opts.PointKeyFilters = []pebble.BlockPropertyFilter{
    1362           2 :                         sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
    1363           2 :                 }
    1364           2 :                 // Enforce the timestamp bounds in SkipPoint, so that the iterator never
    1365           2 :                 // returns a key outside the filterMin, filterMax bounds. This provides
    1366           2 :                 // deterministic iteration.
    1367           2 :                 opts.SkipPoint = func(k []byte) (skip bool) {
    1368           2 :                         n := testkeys.Comparer.Split(k)
    1369           2 :                         if n == len(k) {
    1370           2 :                                 // No suffix, don't skip it.
    1371           2 :                                 return false
    1372           2 :                         }
    1373           2 :                         v, err := testkeys.ParseSuffix(k[n:])
    1374           2 :                         if err != nil {
    1375           0 :                                 panic(err)
    1376             :                         }
    1377           2 :                         ts := uint64(v)
    1378           2 :                         return ts < o.filterMin || ts >= o.filterMax
    1379             :                 }
    1380             :         }
    1381           2 :         return opts
    1382             : }
    1383             : 
    1384           2 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
    1385             : 
    1386           2 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
    1387           2 :         if o.derivedReaderID.tag() == batchTag {
    1388           1 :                 // If the underlying reader is a batch, we must synchronize with the
    1389           1 :                 // batch so that we observe all the mutations up until this operation
    1390           1 :                 // and no more.
    1391           1 :                 return []objID{o.derivedReaderID}
    1392           1 :         }
    1393           2 :         return nil
    1394             : }
    1395             : 
    1396           0 : func (o *iterSetOptionsOp) keys() []*[]byte                     { return nil }
    1397           0 : func (o *iterSetOptionsOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1398             : 
    1399             : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
    1400             : type iterSeekGEOp struct {
    1401             :         iterID objID
    1402             :         key    []byte
    1403             :         limit  []byte
    1404             : 
    1405             :         derivedReaderID objID
    1406             : }
    1407             : 
    1408           2 : func iteratorPos(i *retryableIter) string {
    1409           2 :         var buf bytes.Buffer
    1410           2 :         fmt.Fprintf(&buf, "%q", i.Key())
    1411           2 :         hasPoint, hasRange := i.HasPointAndRange()
    1412           2 :         if hasPoint {
    1413           2 :                 fmt.Fprintf(&buf, ",%q", i.Value())
    1414           2 :         } else {
    1415           2 :                 fmt.Fprint(&buf, ",<no point>")
    1416           2 :         }
    1417           2 :         if hasRange {
    1418           2 :                 start, end := i.RangeBounds()
    1419           2 :                 fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
    1420           2 :                 for i, rk := range i.RangeKeys() {
    1421           2 :                         if i > 0 {
    1422           2 :                                 fmt.Fprint(&buf, ",")
    1423           2 :                         }
    1424           2 :                         fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
    1425             :                 }
    1426           2 :                 fmt.Fprint(&buf, "}")
    1427           2 :         } else {
    1428           2 :                 fmt.Fprint(&buf, ",<no range>")
    1429           2 :         }
    1430           2 :         if i.RangeKeyChanged() {
    1431           2 :                 fmt.Fprint(&buf, "*")
    1432           2 :         }
    1433           2 :         return buf.String()
    1434             : }
    1435             : 
    1436           2 : func validBoolToStr(valid bool) string {
    1437           2 :         return fmt.Sprintf("%t", valid)
    1438           2 : }
    1439             : 
    1440           2 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
    1441           2 :         // We can't distinguish between IterExhausted and IterAtLimit in a
    1442           2 :         // deterministic manner.
    1443           2 :         switch validity {
    1444           2 :         case pebble.IterExhausted, pebble.IterAtLimit:
    1445           2 :                 return false, "invalid"
    1446           2 :         case pebble.IterValid:
    1447           2 :                 return true, "valid"
    1448           0 :         default:
    1449           0 :                 panic("unknown validity")
    1450             :         }
    1451             : }
    1452             : 
    1453           2 : func (o *iterSeekGEOp) run(t *Test, h historyRecorder) {
    1454           2 :         i := t.getIter(o.iterID)
    1455           2 :         var valid bool
    1456           2 :         var validStr string
    1457           2 :         if o.limit == nil {
    1458           2 :                 valid = i.SeekGE(o.key)
    1459           2 :                 validStr = validBoolToStr(valid)
    1460           2 :         } else {
    1461           2 :                 valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
    1462           2 :         }
    1463           2 :         if valid {
    1464           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1465           2 :         } else {
    1466           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1467           2 :         }
    1468             : }
    1469             : 
    1470           2 : func (o *iterSeekGEOp) String() string {
    1471           2 :         return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
    1472           2 : }
    1473           2 : func (o *iterSeekGEOp) receiver() objID      { return o.iterID }
    1474           2 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1475             : 
    1476           0 : func (o *iterSeekGEOp) keys() []*[]byte {
    1477           0 :         return []*[]byte{&o.key}
    1478           0 : }
    1479             : 
    1480           1 : func (o *iterSeekGEOp) diagramKeyRanges() []pebble.KeyRange {
    1481           1 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1482           1 : }
    1483             : 
    1484           2 : func onlyBatchIDs(ids ...objID) objIDSlice {
    1485           2 :         var ret objIDSlice
    1486           2 :         for _, id := range ids {
    1487           2 :                 if id.tag() == batchTag {
    1488           1 :                         ret = append(ret, id)
    1489           1 :                 }
    1490             :         }
    1491           2 :         return ret
    1492             : }
    1493             : 
    1494             : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
    1495             : type iterSeekPrefixGEOp struct {
    1496             :         iterID objID
    1497             :         key    []byte
    1498             : 
    1499             :         derivedReaderID objID
    1500             : }
    1501             : 
    1502           2 : func (o *iterSeekPrefixGEOp) run(t *Test, h historyRecorder) {
    1503           2 :         i := t.getIter(o.iterID)
    1504           2 :         valid := i.SeekPrefixGE(o.key)
    1505           2 :         if valid {
    1506           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1507           2 :         } else {
    1508           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1509           2 :         }
    1510             : }
    1511             : 
    1512           2 : func (o *iterSeekPrefixGEOp) String() string {
    1513           2 :         return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
    1514           2 : }
    1515           2 : func (o *iterSeekPrefixGEOp) receiver() objID      { return o.iterID }
    1516           2 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1517             : 
    1518           0 : func (o *iterSeekPrefixGEOp) keys() []*[]byte {
    1519           0 :         return []*[]byte{&o.key}
    1520           0 : }
    1521             : 
    1522           0 : func (o *iterSeekPrefixGEOp) diagramKeyRanges() []pebble.KeyRange {
    1523           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1524           0 : }
    1525             : 
    1526             : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
    1527             : type iterSeekLTOp struct {
    1528             :         iterID objID
    1529             :         key    []byte
    1530             :         limit  []byte
    1531             : 
    1532             :         derivedReaderID objID
    1533             : }
    1534             : 
    1535           2 : func (o *iterSeekLTOp) run(t *Test, h historyRecorder) {
    1536           2 :         i := t.getIter(o.iterID)
    1537           2 :         var valid bool
    1538           2 :         var validStr string
    1539           2 :         if o.limit == nil {
    1540           2 :                 valid = i.SeekLT(o.key)
    1541           2 :                 validStr = validBoolToStr(valid)
    1542           2 :         } else {
    1543           2 :                 valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
    1544           2 :         }
    1545           2 :         if valid {
    1546           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1547           2 :         } else {
    1548           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1549           2 :         }
    1550             : }
    1551             : 
    1552           2 : func (o *iterSeekLTOp) String() string {
    1553           2 :         return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
    1554           2 : }
    1555             : 
    1556           2 : func (o *iterSeekLTOp) receiver() objID      { return o.iterID }
    1557           2 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1558             : 
    1559           0 : func (o *iterSeekLTOp) keys() []*[]byte {
    1560           0 :         return []*[]byte{&o.key}
    1561           0 : }
    1562             : 
    1563           0 : func (o *iterSeekLTOp) diagramKeyRanges() []pebble.KeyRange {
    1564           0 :         return []pebble.KeyRange{{Start: o.key, End: o.key}}
    1565           0 : }
    1566             : 
    1567             : // iterFirstOp models an Iterator.First operation.
    1568             : type iterFirstOp struct {
    1569             :         iterID objID
    1570             : 
    1571             :         derivedReaderID objID
    1572             : }
    1573             : 
    1574           2 : func (o *iterFirstOp) run(t *Test, h historyRecorder) {
    1575           2 :         i := t.getIter(o.iterID)
    1576           2 :         valid := i.First()
    1577           2 :         if valid {
    1578           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1579           2 :         } else {
    1580           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1581           2 :         }
    1582             : }
    1583             : 
    1584           2 : func (o *iterFirstOp) String() string       { return fmt.Sprintf("%s.First()", o.iterID) }
    1585           2 : func (o *iterFirstOp) receiver() objID      { return o.iterID }
    1586           2 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1587             : 
    1588           0 : func (o *iterFirstOp) keys() []*[]byte                     { return nil }
    1589           0 : func (o *iterFirstOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1590             : 
    1591             : // iterLastOp models an Iterator.Last operation.
    1592             : type iterLastOp struct {
    1593             :         iterID objID
    1594             : 
    1595             :         derivedReaderID objID
    1596             : }
    1597             : 
    1598           2 : func (o *iterLastOp) run(t *Test, h historyRecorder) {
    1599           2 :         i := t.getIter(o.iterID)
    1600           2 :         valid := i.Last()
    1601           2 :         if valid {
    1602           2 :                 h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
    1603           2 :         } else {
    1604           2 :                 h.Recordf("%s // [%t] %v", o, valid, i.Error())
    1605           2 :         }
    1606             : }
    1607             : 
    1608           2 : func (o *iterLastOp) String() string       { return fmt.Sprintf("%s.Last()", o.iterID) }
    1609           2 : func (o *iterLastOp) receiver() objID      { return o.iterID }
    1610           2 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1611             : 
    1612           0 : func (o *iterLastOp) keys() []*[]byte                     { return nil }
    1613           0 : func (o *iterLastOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1614             : 
    1615             : // iterNextOp models an Iterator.Next[WithLimit] operation.
    1616             : type iterNextOp struct {
    1617             :         iterID objID
    1618             :         limit  []byte
    1619             : 
    1620             :         derivedReaderID objID
    1621             : }
    1622             : 
    1623           2 : func (o *iterNextOp) run(t *Test, h historyRecorder) {
    1624           2 :         i := t.getIter(o.iterID)
    1625           2 :         var valid bool
    1626           2 :         var validStr string
    1627           2 :         if o.limit == nil {
    1628           2 :                 valid = i.Next()
    1629           2 :                 validStr = validBoolToStr(valid)
    1630           2 :         } else {
    1631           2 :                 valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
    1632           2 :         }
    1633           2 :         if valid {
    1634           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1635           2 :         } else {
    1636           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1637           2 :         }
    1638             : }
    1639             : 
    1640           2 : func (o *iterNextOp) String() string       { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
    1641           2 : func (o *iterNextOp) receiver() objID      { return o.iterID }
    1642           2 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1643             : 
    1644           0 : func (o *iterNextOp) keys() []*[]byte                     { return nil }
    1645           0 : func (o *iterNextOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1646             : 
    1647             : // iterNextPrefixOp models an Iterator.NextPrefix operation.
    1648             : type iterNextPrefixOp struct {
    1649             :         iterID objID
    1650             : 
    1651             :         derivedReaderID objID
    1652             : }
    1653             : 
    1654           2 : func (o *iterNextPrefixOp) run(t *Test, h historyRecorder) {
    1655           2 :         i := t.getIter(o.iterID)
    1656           2 :         valid := i.NextPrefix()
    1657           2 :         validStr := validBoolToStr(valid)
    1658           2 :         if valid {
    1659           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1660           2 :         } else {
    1661           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1662           2 :         }
    1663             : }
    1664             : 
    1665           2 : func (o *iterNextPrefixOp) String() string       { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
    1666           2 : func (o *iterNextPrefixOp) receiver() objID      { return o.iterID }
    1667           2 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1668             : 
    1669           0 : func (o *iterNextPrefixOp) keys() []*[]byte                     { return nil }
    1670           0 : func (o *iterNextPrefixOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1671             : 
    1672             : // iterCanSingleDelOp models a call to CanDeterministicallySingleDelete with an
    1673             : // Iterator.
    1674             : type iterCanSingleDelOp struct {
    1675             :         iterID objID
    1676             : 
    1677             :         derivedReaderID objID
    1678             : }
    1679             : 
    1680           2 : func (o *iterCanSingleDelOp) run(t *Test, h historyRecorder) {
    1681           2 :         // TODO(jackson): When we perform error injection, we'll need to rethink
    1682           2 :         // this.
    1683           2 :         _, err := pebble.CanDeterministicallySingleDelete(t.getIter(o.iterID).iter)
    1684           2 :         // The return value of CanDeterministicallySingleDelete is dependent on
    1685           2 :         // internal LSM state and non-deterministic, so we don't record it.
    1686           2 :         // Including the operation within the metamorphic test at all helps ensure
    1687           2 :         // that it does not change the result of any other Iterator operation that
    1688           2 :         // should be deterministic, regardless of its own outcome.
    1689           2 :         //
    1690           2 :         // We still record the value of the error because it's deterministic, at
    1691           2 :         // least for now. The possible error cases are:
    1692           2 :         //  - The iterator was already in an error state when the operation ran.
    1693           2 :         //  - The operation is deterministically invalid (like using an InternalNext
    1694           2 :         //    to change directions.)
    1695           2 :         h.Recordf("%s // %v", o, err)
    1696           2 : }
    1697             : 
    1698           2 : func (o *iterCanSingleDelOp) String() string       { return fmt.Sprintf("%s.InternalNext()", o.iterID) }
    1699           2 : func (o *iterCanSingleDelOp) receiver() objID      { return o.iterID }
    1700           2 : func (o *iterCanSingleDelOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1701             : 
    1702           0 : func (o *iterCanSingleDelOp) keys() []*[]byte                     { return nil }
    1703           0 : func (o *iterCanSingleDelOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1704             : 
    1705             : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
    1706             : type iterPrevOp struct {
    1707             :         iterID objID
    1708             :         limit  []byte
    1709             : 
    1710             :         derivedReaderID objID
    1711             : }
    1712             : 
    1713           2 : func (o *iterPrevOp) run(t *Test, h historyRecorder) {
    1714           2 :         i := t.getIter(o.iterID)
    1715           2 :         var valid bool
    1716           2 :         var validStr string
    1717           2 :         if o.limit == nil {
    1718           2 :                 valid = i.Prev()
    1719           2 :                 validStr = validBoolToStr(valid)
    1720           2 :         } else {
    1721           2 :                 valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
    1722           2 :         }
    1723           2 :         if valid {
    1724           2 :                 h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
    1725           2 :         } else {
    1726           2 :                 h.Recordf("%s // [%s] %v", o, validStr, i.Error())
    1727           2 :         }
    1728             : }
    1729             : 
    1730           2 : func (o *iterPrevOp) String() string       { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
    1731           2 : func (o *iterPrevOp) receiver() objID      { return o.iterID }
    1732           2 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
    1733             : 
    1734           0 : func (o *iterPrevOp) keys() []*[]byte                     { return nil }
    1735           0 : func (o *iterPrevOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1736             : 
    1737             : // newSnapshotOp models a DB.NewSnapshot operation.
    1738             : type newSnapshotOp struct {
    1739             :         dbID   objID
    1740             :         snapID objID
    1741             :         // If nonempty, this snapshot must not be used to read any keys outside of
    1742             :         // the provided bounds. This allows some implementations to use 'Eventually
    1743             :         // file-only snapshots,' which require bounds.
    1744             :         bounds []pebble.KeyRange
    1745             : }
    1746             : 
    1747           2 : func (o *newSnapshotOp) run(t *Test, h historyRecorder) {
    1748           2 :         bounds := o.bounds
    1749           2 :         if len(bounds) == 0 {
    1750           0 :                 panic("bounds unexpectedly unset for newSnapshotOp")
    1751             :         }
    1752             :         // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
    1753           2 :         createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1
    1754           2 :         // If either of these options is true, an EFOS _must_ be created, regardless
    1755           2 :         // of what the fibonacci hash returned.
    1756           2 :         excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExternalReplicate || t.testOpts.useExcise
    1757           2 :         if createEfos || excisePossible {
    1758           2 :                 s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds)
    1759           2 :                 t.setSnapshot(o.snapID, s)
    1760           2 :         } else {
    1761           2 :                 s := t.getDB(o.dbID).NewSnapshot()
    1762           2 :                 t.setSnapshot(o.snapID, s)
    1763           2 :         }
    1764           2 :         h.Recordf("%s", o)
    1765             : }
    1766             : 
    1767           2 : func (o *newSnapshotOp) String() string {
    1768           2 :         var buf bytes.Buffer
    1769           2 :         fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
    1770           2 :         for i := range o.bounds {
    1771           2 :                 if i > 0 {
    1772           2 :                         fmt.Fprint(&buf, ", ")
    1773           2 :                 }
    1774           2 :                 fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
    1775             :         }
    1776           2 :         fmt.Fprint(&buf, ")")
    1777           2 :         return buf.String()
    1778             : }
    1779           2 : func (o *newSnapshotOp) receiver() objID      { return o.dbID }
    1780           2 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
    1781             : 
    1782           1 : func (o *newSnapshotOp) keys() []*[]byte {
    1783           1 :         var res []*[]byte
    1784           1 :         for i := range o.bounds {
    1785           1 :                 res = append(res, &o.bounds[i].Start, &o.bounds[i].End)
    1786           1 :         }
    1787           1 :         return res
    1788             : }
    1789             : 
    1790           1 : func (o *newSnapshotOp) diagramKeyRanges() []pebble.KeyRange {
    1791           1 :         return o.bounds
    1792           1 : }
    1793             : 
    1794             : // newExternalObjOp models a DB.NewExternalObj operation.
    1795             : type newExternalObjOp struct {
    1796             :         batchID       objID
    1797             :         externalObjID objID
    1798             : }
    1799             : 
    1800           2 : func externalObjName(externalObjID objID) string {
    1801           2 :         if externalObjID.tag() != externalObjTag {
    1802           0 :                 panic(fmt.Sprintf("invalid externalObjID %s", externalObjID))
    1803             :         }
    1804           2 :         return fmt.Sprintf("external-for-ingest-%d.sst", externalObjID.slot())
    1805             : }
    1806             : 
    1807           2 : func (o *newExternalObjOp) run(t *Test, h historyRecorder) {
    1808           2 :         b := t.getBatch(o.batchID)
    1809           2 :         t.clearObj(o.batchID)
    1810           2 : 
    1811           2 :         writeCloser, err := t.externalStorage.CreateObject(externalObjName(o.externalObjID))
    1812           2 :         if err != nil {
    1813           0 :                 panic(err)
    1814             :         }
    1815           2 :         writable := objstorageprovider.NewRemoteWritable(writeCloser)
    1816           2 : 
    1817           2 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
    1818           2 : 
    1819           2 :         sstMeta, err := writeSSTForIngestion(
    1820           2 :                 t,
    1821           2 :                 iter, rangeDelIter, rangeKeyIter,
    1822           2 :                 true, /* uniquePrefixes */
    1823           2 :                 nil,  /* syntheticSuffix */
    1824           2 :                 nil,  /* syntheticPrefix */
    1825           2 :                 writable,
    1826           2 :                 t.minFMV(),
    1827           2 :         )
    1828           2 :         if err != nil {
    1829           0 :                 panic(err)
    1830             :         }
    1831           2 :         if !sstMeta.HasPointKeys && !sstMeta.HasRangeDelKeys && !sstMeta.HasRangeKeys {
    1832           0 :                 // This can occur when using --try-to-reduce.
    1833           0 :                 panic("metamorphic test internal error: external object empty")
    1834             :         }
    1835           2 :         t.setExternalObj(o.externalObjID, externalObjMeta{
    1836           2 :                 sstMeta: sstMeta,
    1837           2 :         })
    1838           2 :         h.Recordf("%s", o)
    1839             : }
    1840             : 
    1841           2 : func (o *newExternalObjOp) String() string {
    1842           2 :         return fmt.Sprintf("%s = %s.NewExternalObj()", o.externalObjID, o.batchID)
    1843           2 : }
    1844           2 : func (o *newExternalObjOp) receiver() objID      { return o.batchID }
    1845           2 : func (o *newExternalObjOp) syncObjs() objIDSlice { return []objID{o.externalObjID} }
    1846             : 
    1847           0 : func (o *newExternalObjOp) keys() []*[]byte                     { return nil }
    1848           0 : func (o *newExternalObjOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1849             : 
    1850             : type dbRatchetFormatMajorVersionOp struct {
    1851             :         dbID objID
    1852             :         vers pebble.FormatMajorVersion
    1853             : }
    1854             : 
    1855           2 : func (o *dbRatchetFormatMajorVersionOp) run(t *Test, h historyRecorder) {
    1856           2 :         var err error
    1857           2 :         // NB: We no-op the operation if we're already at or above the provided
    1858           2 :         // format major version. Different runs start at different format major
    1859           2 :         // versions, making the presence of an error and the error message itself
    1860           2 :         // non-deterministic if we attempt to upgrade to an older version.
    1861           2 :         //
    1862           2 :         //Regardless, subsequent operations should behave identically, which is what
    1863           2 :         //we're really aiming to test by including this format major version ratchet
    1864           2 :         //operation.
    1865           2 :         if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
    1866           2 :                 err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
    1867           2 :         }
    1868           2 :         h.Recordf("%s // %v", o, err)
    1869             : }
    1870             : 
    1871           2 : func (o *dbRatchetFormatMajorVersionOp) String() string {
    1872           2 :         return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
    1873           2 : }
    1874           2 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID      { return o.dbID }
    1875           2 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
    1876             : 
    1877           0 : func (o *dbRatchetFormatMajorVersionOp) keys() []*[]byte                     { return nil }
    1878           0 : func (o *dbRatchetFormatMajorVersionOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1879             : 
    1880             : type dbRestartOp struct {
    1881             :         dbID objID
    1882             : 
    1883             :         // affectedObjects is the list of additional objects that are affected by this
    1884             :         // operation, and which syncObjs() must return so that we don't perform the
    1885             :         // restart in parallel with other operations to affected objects.
    1886             :         affectedObjects []objID
    1887             : }
    1888             : 
    1889           2 : func (o *dbRestartOp) run(t *Test, h historyRecorder) {
    1890           2 :         if err := t.restartDB(o.dbID); err != nil {
    1891           0 :                 h.Recordf("%s // %v", o, err)
    1892           0 :                 h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
    1893           2 :         } else {
    1894           2 :                 h.Recordf("%s", o)
    1895           2 :         }
    1896             : }
    1897             : 
    1898           2 : func (o *dbRestartOp) String() string       { return fmt.Sprintf("%s.Restart()", o.dbID) }
    1899           2 : func (o *dbRestartOp) receiver() objID      { return o.dbID }
    1900           2 : func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjects }
    1901             : 
    1902           0 : func (o *dbRestartOp) keys() []*[]byte                     { return nil }
    1903           0 : func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
    1904             : 
    1905           1 : func formatOps(ops []op) string {
    1906           1 :         var buf strings.Builder
    1907           1 :         for _, op := range ops {
    1908           1 :                 fmt.Fprintf(&buf, "%s\n", op)
    1909           1 :         }
    1910           1 :         return buf.String()
    1911             : }
    1912             : 
    1913             : // replicateOp models an operation that could copy keys from one db to
    1914             : // another through either an IngestAndExcise, or an Ingest.
    1915             : type replicateOp struct {
    1916             :         source, dest objID
    1917             :         start, end   []byte
    1918             : }
    1919             : 
    1920             : func (r *replicateOp) runSharedReplicate(
    1921             :         t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
    1922           1 : ) {
    1923           1 :         var sharedSSTs []pebble.SharedSSTMeta
    1924           1 :         var err error
    1925           1 :         err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
    1926           1 :                 func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
    1927           1 :                         val, _, err := value.Value(nil)
    1928           1 :                         if err != nil {
    1929           0 :                                 panic(err)
    1930             :                         }
    1931           1 :                         return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
    1932             :                 },
    1933           1 :                 func(start, end []byte, seqNum base.SeqNum) error {
    1934           1 :                         return w.DeleteRange(start, end)
    1935           1 :                 },
    1936           1 :                 func(start, end []byte, keys []keyspan.Key) error {
    1937           1 :                         return w.Raw().EncodeSpan(keyspan.Span{
    1938           1 :                                 Start: start,
    1939           1 :                                 End:   end,
    1940           1 :                                 Keys:  keys,
    1941           1 :                         })
    1942           1 :                 },
    1943           1 :                 func(sst *pebble.SharedSSTMeta) error {
    1944           1 :                         sharedSSTs = append(sharedSSTs, *sst)
    1945           1 :                         return nil
    1946           1 :                 },
    1947             :                 nil,
    1948             :         )
    1949           1 :         if err != nil {
    1950           0 :                 h.Recordf("%s // %v", r, err)
    1951           0 :                 return
    1952           0 :         }
    1953             : 
    1954           1 :         err = w.Close()
    1955           1 :         if err != nil {
    1956           0 :                 h.Recordf("%s // %v", r, err)
    1957           0 :                 return
    1958           0 :         }
    1959           1 :         meta, err := w.Raw().Metadata()
    1960           1 :         if err != nil {
    1961           0 :                 h.Recordf("%s // %v", r, err)
    1962           0 :                 return
    1963           0 :         }
    1964           1 :         if len(sharedSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
    1965           1 :                 // IngestAndExcise below will be a no-op. We should do a
    1966           1 :                 // DeleteRange+RangeKeyDel to mimic the behaviour of the non-shared-replicate
    1967           1 :                 // case.
    1968           1 :                 //
    1969           1 :                 // TODO(bilal): Remove this when we support excises with no matching ingests.
    1970           1 :                 if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
    1971           0 :                         h.Recordf("%s // %v", r, err)
    1972           0 :                         return
    1973           0 :                 }
    1974           1 :                 err := dest.DeleteRange(r.start, r.end, t.writeOpts)
    1975           1 :                 h.Recordf("%s // %v", r, err)
    1976           1 :                 return
    1977             :         }
    1978             : 
    1979           1 :         _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false)
    1980           1 :         h.Recordf("%s // %v", r, err)
    1981             : }
    1982             : 
    1983             : func (r *replicateOp) runExternalReplicate(
    1984             :         t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
    1985           0 : ) {
    1986           0 :         var externalSSTs []pebble.ExternalFile
    1987           0 :         var err error
    1988           0 :         err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
    1989           0 :                 func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
    1990           0 :                         val, _, err := value.Value(nil)
    1991           0 :                         if err != nil {
    1992           0 :                                 panic(err)
    1993             :                         }
    1994           0 :                         return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
    1995             :                 },
    1996           0 :                 func(start, end []byte, seqNum base.SeqNum) error {
    1997           0 :                         return w.DeleteRange(start, end)
    1998           0 :                 },
    1999           0 :                 func(start, end []byte, keys []keyspan.Key) error {
    2000           0 :                         return w.Raw().EncodeSpan(keyspan.Span{
    2001           0 :                                 Start: start,
    2002           0 :                                 End:   end,
    2003           0 :                                 Keys:  keys,
    2004           0 :                         })
    2005           0 :                 },
    2006             :                 nil,
    2007           0 :                 func(sst *pebble.ExternalFile) error {
    2008           0 :                         externalSSTs = append(externalSSTs, *sst)
    2009           0 :                         return nil
    2010           0 :                 },
    2011             :         )
    2012           0 :         if err != nil {
    2013           0 :                 h.Recordf("%s // %v", r, err)
    2014           0 :                 return
    2015           0 :         }
    2016             : 
    2017           0 :         err = w.Close()
    2018           0 :         if err != nil {
    2019           0 :                 h.Recordf("%s // %v", r, err)
    2020           0 :                 return
    2021           0 :         }
    2022           0 :         meta, err := w.Raw().Metadata()
    2023           0 :         if err != nil {
    2024           0 :                 h.Recordf("%s // %v", r, err)
    2025           0 :                 return
    2026           0 :         }
    2027           0 :         if len(externalSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
    2028           0 :                 // IngestAndExcise below will be a no-op. We should do a
    2029           0 :                 // DeleteRange+RangeKeyDel to mimic the behaviour of the non-external-replicate
    2030           0 :                 // case.
    2031           0 :                 //
    2032           0 :                 // TODO(bilal): Remove this when we support excises with no matching ingests.
    2033           0 :                 if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
    2034           0 :                         h.Recordf("%s // %v", r, err)
    2035           0 :                         return
    2036           0 :                 }
    2037           0 :                 err := dest.DeleteRange(r.start, r.end, t.writeOpts)
    2038           0 :                 h.Recordf("%s // %v", r, err)
    2039           0 :                 return
    2040             :         }
    2041             : 
    2042           0 :         _, err = dest.IngestAndExcise([]string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false /* sstContainsExciseTombstone */)
    2043           0 :         h.Recordf("%s // %v", r, err)
    2044             : }
    2045             : 
    2046           1 : func (r *replicateOp) run(t *Test, h historyRecorder) {
    2047           1 :         // Shared replication only works if shared storage is enabled.
    2048           1 :         useSharedIngest := t.testOpts.useSharedReplicate && t.testOpts.sharedStorageEnabled
    2049           1 :         useExternalIngest := t.testOpts.useExternalReplicate && t.testOpts.externalStorageEnabled
    2050           1 : 
    2051           1 :         source := t.getDB(r.source)
    2052           1 :         dest := t.getDB(r.dest)
    2053           1 :         sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
    2054           1 :         f, err := t.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
    2055           1 :         if err != nil {
    2056           0 :                 h.Recordf("%s // %v", r, err)
    2057           0 :                 return
    2058           0 :         }
    2059           1 :         w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.FormatMajorVersion().MaxTableFormat()))
    2060           1 : 
    2061           1 :         // NB: In practice we'll either do shared replicate or external replicate,
    2062           1 :         // as ScanInternal does not support both. We arbitrarily choose to prioritize
    2063           1 :         // external replication if both are enabled, as those are likely to hit
    2064           1 :         // widespread usage first.
    2065           1 :         if useExternalIngest {
    2066           0 :                 r.runExternalReplicate(t, h, source, dest, w, sstPath)
    2067           0 :                 return
    2068           0 :         }
    2069           1 :         if useSharedIngest {
    2070           1 :                 r.runSharedReplicate(t, h, source, dest, w, sstPath)
    2071           1 :                 return
    2072           1 :         }
    2073             : 
    2074             :         // First, do a RangeKeyDelete and DeleteRange on the whole span.
    2075           1 :         if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
    2076           0 :                 h.Recordf("%s // %v", r, err)
    2077           0 :                 return
    2078           0 :         }
    2079           1 :         if err := dest.DeleteRange(r.start, r.end, t.writeOpts); err != nil {
    2080           0 :                 h.Recordf("%s // %v", r, err)
    2081           0 :                 return
    2082           0 :         }
    2083           1 :         iter, err := source.NewIter(&pebble.IterOptions{
    2084           1 :                 LowerBound: r.start,
    2085           1 :                 UpperBound: r.end,
    2086           1 :                 KeyTypes:   pebble.IterKeyTypePointsAndRanges,
    2087           1 :         })
    2088           1 :         if err != nil {
    2089           0 :                 panic(err)
    2090             :         }
    2091           1 :         defer iter.Close()
    2092           1 : 
    2093           1 :         for ok := iter.SeekGE(r.start); ok && iter.Error() == nil; ok = iter.Next() {
    2094           1 :                 hasPoint, hasRange := iter.HasPointAndRange()
    2095           1 :                 if hasPoint {
    2096           1 :                         val, err := iter.ValueAndErr()
    2097           1 :                         if err != nil {
    2098           0 :                                 panic(err)
    2099             :                         }
    2100           1 :                         if err := w.Set(iter.Key(), val); err != nil {
    2101           0 :                                 panic(err)
    2102             :                         }
    2103             :                 }
    2104           1 :                 if hasRange && iter.RangeKeyChanged() {
    2105           1 :                         rangeKeys := iter.RangeKeys()
    2106           1 :                         rkStart, rkEnd := iter.RangeBounds()
    2107           1 : 
    2108           1 :                         span := keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))}
    2109           1 :                         for i := range rangeKeys {
    2110           1 :                                 span.Keys[i] = keyspan.Key{
    2111           1 :                                         Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
    2112           1 :                                         Suffix:  rangeKeys[i].Suffix,
    2113           1 :                                         Value:   rangeKeys[i].Value,
    2114           1 :                                 }
    2115           1 :                         }
    2116           1 :                         keyspan.SortKeysByTrailer(span.Keys)
    2117           1 :                         if err := w.Raw().EncodeSpan(span); err != nil {
    2118           0 :                                 panic(err)
    2119             :                         }
    2120             :                 }
    2121             :         }
    2122           1 :         if err := iter.Error(); err != nil {
    2123           0 :                 h.Recordf("%s // %v", r, err)
    2124           0 :                 return
    2125           0 :         }
    2126           1 :         if err := w.Close(); err != nil {
    2127           0 :                 panic(err)
    2128             :         }
    2129             : 
    2130           1 :         err = dest.Ingest([]string{sstPath})
    2131           1 :         h.Recordf("%s // %v", r, err)
    2132             : }
    2133             : 
    2134           2 : func (r *replicateOp) String() string {
    2135           2 :         return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest, r.start, r.end)
    2136           2 : }
    2137             : 
    2138           1 : func (r *replicateOp) receiver() objID      { return r.source }
    2139           1 : func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }
    2140             : 
    2141           0 : func (r *replicateOp) keys() []*[]byte {
    2142           0 :         return []*[]byte{&r.start, &r.end}
    2143           0 : }
    2144             : 
    2145           1 : func (r *replicateOp) diagramKeyRanges() []pebble.KeyRange {
    2146           1 :         return []pebble.KeyRange{{Start: r.start, End: r.end}}
    2147           1 : }

Generated by: LCOV version 1.14