LCOV - code coverage report
Current view: top level - pebble/metamorphic - test.go (source / functions) Hit Total Coverage
Test: 2025-01-02 08:16Z f87b4ded - tests only.lcov Lines: 304 424 71.7 %
Date: 2025-01-02 08:18:00 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "io"
      11             :         "os"
      12             :         "path"
      13             :         "path/filepath"
      14             :         "sort"
      15             :         "strings"
      16             :         "time"
      17             : 
      18             :         "github.com/cockroachdb/errors"
      19             :         "github.com/cockroachdb/pebble"
      20             :         "github.com/cockroachdb/pebble/objstorage/remote"
      21             :         "github.com/cockroachdb/pebble/sstable"
      22             :         "github.com/cockroachdb/pebble/vfs"
      23             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      24             : )
      25             : 
      26             : // New constructs a new metamorphic test that runs the provided operations
      27             : // against a database using the provided TestOptions and outputs the history of
      28             : // events to an io.Writer.
      29             : //
      30             : // dir specifies the path within opts.Opts.FS to open the database.
      31           1 : func New(ops Ops, opts *TestOptions, dir string, w io.Writer) (*Test, error) {
      32           1 :         t := newTest(ops)
      33           1 :         h := newHistory(nil /* failRegexp */, w)
      34           1 :         if err := t.init(h, dir, opts, 1 /* numInstances */, 0 /* opTimeout */); err != nil {
      35           0 :                 return nil, err
      36           0 :         }
      37           1 :         return t, nil
      38             : }
      39             : 
      40             : // A Test configures an individual test run consisting of a set of operations,
      41             : // TestOptions configuring the target database to which the operations should be
      42             : // applied, and a sink for outputting test history.
      43             : type Test struct {
      44             :         // The list of ops to execute. The ops refer to slots in the batches, iters,
      45             :         // and snapshots slices.
      46             :         ops       []op
      47             :         opsWaitOn [][]int         // op index -> op indexes
      48             :         opsDone   []chan struct{} // op index -> done channel
      49             :         idx       int
      50             :         dir       string
      51             :         h         *history
      52             :         opTimeout time.Duration
      53             :         opts      *pebble.Options
      54             :         testOpts  *TestOptions
      55             :         writeOpts *pebble.WriteOptions
      56             :         tmpDir    string
      57             :         // The DBs the test is run on.
      58             :         dbs []*pebble.DB
      59             :         // The slots for the batches, iterators, and snapshots. These are read and
      60             :         // written by the ops to pass state from one op to another.
      61             :         batches      []*pebble.Batch
      62             :         iters        []*retryableIter
      63             :         snapshots    []readerCloser
      64             :         externalObjs []externalObjMeta
      65             : 
      66             :         // externalStorage is used to write external objects. If external storage is
      67             :         // enabled, this is the same with testOpts.externalStorageFS; otherwise, this
      68             :         // is an in-memory implementation used only by the test.
      69             :         externalStorage remote.Storage
      70             : }
      71             : 
      72             : type externalObjMeta struct {
      73             :         sstMeta *sstable.WriterMetadata
      74             : }
      75             : 
      76           1 : func newTest(ops []op) *Test {
      77           1 :         return &Test{
      78           1 :                 ops: ops,
      79           1 :         }
      80           1 : }
      81             : 
      82             : func (t *Test) init(
      83             :         h *history, dir string, testOpts *TestOptions, numInstances int, opTimeout time.Duration,
      84           1 : ) error {
      85           1 :         t.dir = dir
      86           1 :         t.h = h
      87           1 :         t.opTimeout = opTimeout
      88           1 :         t.testOpts = testOpts
      89           1 :         t.writeOpts = pebble.NoSync
      90           1 :         if testOpts.strictFS {
      91           1 :                 t.writeOpts = pebble.Sync
      92           1 :         } else {
      93           1 :                 t.writeOpts = pebble.NoSync
      94           1 :         }
      95           1 :         testOpts.Opts.WithFSDefaults()
      96           1 :         t.opts = testOpts.Opts.EnsureDefaults()
      97           1 :         t.opts.Logger = h
      98           1 :         lel := pebble.MakeLoggingEventListener(t.opts.Logger)
      99           1 :         t.opts.EventListener = &lel
     100           1 :         // If the test options set a DebugCheck func, wrap it with retrying of
     101           1 :         // retriable errors (according to the test's retry policy).
     102           1 :         if debugCheck := t.opts.DebugCheck; debugCheck != nil {
     103           1 :                 t.opts.DebugCheck = func(db *pebble.DB) error {
     104           1 :                         return t.withRetries(func() error { return debugCheck(db) })
     105             :                 }
     106             :         }
     107           1 :         if numInstances < 1 {
     108           1 :                 numInstances = 1
     109           1 :         }
     110           1 :         if t.testOpts.externalStorageEnabled {
     111           1 :                 t.externalStorage = t.testOpts.externalStorageFS
     112           1 :         } else {
     113           1 :                 t.externalStorage = remote.NewInMem()
     114           1 :         }
     115             : 
     116           1 :         t.opsWaitOn, t.opsDone = computeSynchronizationPoints(t.ops)
     117           1 : 
     118           1 :         defer t.opts.Cache.Unref()
     119           1 : 
     120           1 :         // If an error occurs and we were using an in-memory FS, attempt to clone to
     121           1 :         // on-disk in order to allow post-mortem debugging. Note that always using
     122           1 :         // the on-disk FS isn't desirable because there is a large performance
     123           1 :         // difference between in-memory and on-disk which causes different code paths
     124           1 :         // and timings to be exercised.
     125           1 :         maybeExit := func(err error) {
     126           1 :                 if err == nil || errors.Is(err, errorfs.ErrInjected) || errors.Is(err, pebble.ErrCancelledCompaction) {
     127           1 :                         return
     128           1 :                 }
     129           0 :                 t.saveInMemoryData()
     130           0 :                 fmt.Fprintln(os.Stderr, err)
     131           0 :                 os.Exit(1)
     132             :         }
     133             : 
     134             :         // Exit early on any error from a background operation.
     135           1 :         t.opts.EventListener.BackgroundError = func(err error) {
     136           1 :                 t.opts.Logger.Infof("background error: %s", err)
     137           1 :                 maybeExit(err)
     138           1 :         }
     139           1 :         t.opts.EventListener.CompactionEnd = func(info pebble.CompactionInfo) {
     140           1 :                 t.opts.Logger.Infof("%s", info)
     141           1 :                 maybeExit(info.Err)
     142           1 :         }
     143           1 :         t.opts.EventListener.FlushEnd = func(info pebble.FlushInfo) {
     144           1 :                 t.opts.Logger.Infof("%s", info)
     145           1 :                 if info.Err != nil && !strings.Contains(info.Err.Error(), "pebble: empty table") {
     146           0 :                         maybeExit(info.Err)
     147           0 :                 }
     148             :         }
     149           1 :         t.opts.EventListener.DownloadEnd = func(info pebble.DownloadInfo) {
     150           1 :                 t.opts.Logger.Infof("%s", info)
     151           1 :                 maybeExit(info.Err)
     152           1 :         }
     153           1 :         t.opts.EventListener.ManifestCreated = func(info pebble.ManifestCreateInfo) {
     154           1 :                 t.opts.Logger.Infof("%s", info)
     155           1 :                 maybeExit(info.Err)
     156           1 :         }
     157           1 :         t.opts.EventListener.ManifestDeleted = func(info pebble.ManifestDeleteInfo) {
     158           1 :                 t.opts.Logger.Infof("%s", info)
     159           1 :                 maybeExit(info.Err)
     160           1 :         }
     161           1 :         t.opts.EventListener.TableDeleted = func(info pebble.TableDeleteInfo) {
     162           1 :                 t.opts.Logger.Infof("%s", info)
     163           1 :                 maybeExit(info.Err)
     164           1 :         }
     165           1 :         t.opts.EventListener.TableIngested = func(info pebble.TableIngestInfo) {
     166           1 :                 t.opts.Logger.Infof("%s", info)
     167           1 :                 maybeExit(info.Err)
     168           1 :         }
     169           1 :         t.opts.EventListener.WALCreated = func(info pebble.WALCreateInfo) {
     170           1 :                 t.opts.Logger.Infof("%s", info)
     171           1 :                 maybeExit(info.Err)
     172           1 :         }
     173           1 :         t.opts.EventListener.WALDeleted = func(info pebble.WALDeleteInfo) {
     174           1 :                 t.opts.Logger.Infof("%s", info)
     175           1 :                 maybeExit(info.Err)
     176           1 :         }
     177             : 
     178           1 :         for i := range t.testOpts.CustomOpts {
     179           0 :                 if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
     180           0 :                         return err
     181           0 :                 }
     182             :         }
     183             : 
     184           1 :         t.dbs = make([]*pebble.DB, numInstances)
     185           1 :         for i := range t.dbs {
     186           1 :                 var db *pebble.DB
     187           1 :                 var err error
     188           1 :                 if len(t.dbs) > 1 {
     189           0 :                         dir = path.Join(t.dir, fmt.Sprintf("db%d", i+1))
     190           0 :                 }
     191           1 :                 err = t.withRetries(func() error {
     192           1 :                         db, err = pebble.Open(dir, t.opts)
     193           1 :                         return err
     194           1 :                 })
     195           1 :                 if err != nil {
     196           0 :                         return err
     197           0 :                 }
     198           1 :                 t.dbs[i] = db
     199           1 :                 h.log.Printf("// db%d.Open() %v", i+1, err)
     200           1 : 
     201           1 :                 if t.testOpts.sharedStorageEnabled {
     202           1 :                         err = t.withRetries(func() error {
     203           1 :                                 return db.SetCreatorID(uint64(i + 1))
     204           1 :                         })
     205           1 :                         if err != nil {
     206           0 :                                 return err
     207           0 :                         }
     208           1 :                         h.log.Printf("// db%d.SetCreatorID() %v", i+1, err)
     209             :                 }
     210             :         }
     211             : 
     212           1 :         var err error
     213           1 :         t.tmpDir = t.opts.FS.PathJoin(t.dir, "tmp")
     214           1 :         if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil {
     215           0 :                 return err
     216           0 :         }
     217           1 :         if t.testOpts.strictFS {
     218           1 :                 // Sync the whole directory path for the tmpDir, since restartDB() is executed during
     219           1 :                 // the test. That would reset MemFS to the synced state, which would make an unsynced
     220           1 :                 // directory disappear in the middle of the test. It is the responsibility of the test
     221           1 :                 // (not Pebble) to ensure that it can write the ssts that it will subsequently ingest
     222           1 :                 // into Pebble.
     223           1 :                 for {
     224           1 :                         f, err := t.opts.FS.OpenDir(dir)
     225           1 :                         if err != nil {
     226           0 :                                 return err
     227           0 :                         }
     228           1 :                         if err = f.Sync(); err != nil {
     229           0 :                                 return err
     230           0 :                         }
     231           1 :                         if err = f.Close(); err != nil {
     232           0 :                                 return err
     233           0 :                         }
     234           1 :                         if len(dir) == 1 {
     235           1 :                                 break
     236             :                         }
     237           1 :                         dir = t.opts.FS.PathDir(dir)
     238           1 :                         // TODO(sbhola): PathDir returns ".", which OpenDir() complains about. Fix.
     239           1 :                         if len(dir) == 1 {
     240           1 :                                 dir = "/"
     241           1 :                         }
     242             :                 }
     243             :         }
     244             : 
     245           1 :         return nil
     246             : }
     247             : 
     248           1 : func (t *Test) withRetries(fn func() error) error {
     249           1 :         return withRetries(fn, t.testOpts.RetryPolicy)
     250           1 : }
     251             : 
     252           1 : func (t *Test) isFMV(dbID objID, fmv pebble.FormatMajorVersion) bool {
     253           1 :         db := t.getDB(dbID)
     254           1 :         return db.FormatMajorVersion() >= fmv
     255           1 : }
     256             : 
     257             : // minFMV returns the minimum FormatMajorVersion between all databases.
     258           1 : func (t *Test) minFMV() pebble.FormatMajorVersion {
     259           1 :         minVersion := pebble.FormatNewest
     260           1 :         for _, db := range t.dbs {
     261           1 :                 if db != nil {
     262           1 :                         minVersion = min(minVersion, db.FormatMajorVersion())
     263           1 :                 }
     264             :         }
     265           1 :         return minVersion
     266             : }
     267             : 
     268           1 : func (t *Test) restartDB(dbID objID) error {
     269           1 :         db := t.getDB(dbID)
     270           1 :         // If strictFS is not used, we use pebble.NoSync for writeOpts, so we can't
     271           1 :         // restart the database (even if we don't revert to synced data).
     272           1 :         if !t.testOpts.strictFS {
     273           1 :                 return nil
     274           1 :         }
     275           1 :         if t.testOpts.sharedStorageEnabled {
     276           0 :                 // We simulate a crash by essentially ignoring writes to disk after a
     277           0 :                 // certain point. However, we cannot prevent the process (which didn't
     278           0 :                 // actually crash) from deleting an external object before we call Close().
     279           0 :                 // TODO(radu): perhaps we want all syncs to fail after the "crash" point?
     280           0 :                 return nil
     281           0 :         }
     282             :         // We can't do this if we have more than one database since they share the
     283             :         // same FS (and we only close/reopen one of them).
     284             :         // TODO(radu): have each database use its own MemFS.
     285           1 :         if len(t.dbs) > 1 {
     286           0 :                 return nil
     287           0 :         }
     288           1 :         t.opts.Cache.Ref()
     289           1 :         fs := vfs.Root(t.opts.FS).(*vfs.MemFS)
     290           1 :         crashFS := fs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
     291           1 :         if err := db.Close(); err != nil {
     292           0 :                 return err
     293           0 :         }
     294             :         // Release any resources held by custom options. This may be used, for
     295             :         // example, by the encryption-at-rest custom option (within the Cockroach
     296             :         // repository) to close the file registry.
     297           1 :         for i := range t.testOpts.CustomOpts {
     298           0 :                 if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
     299           0 :                         return err
     300           0 :                 }
     301             :         }
     302           1 :         t.opts.FS = crashFS
     303           1 :         t.opts.WithFSDefaults()
     304           1 :         if t.opts.WALFailover != nil {
     305           1 :                 t.opts.WALFailover.Secondary.FS = t.opts.FS
     306           1 :         }
     307             : 
     308             :         // TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
     309             :         // are well defined within the context of retries.
     310           1 :         err := t.withRetries(func() (err error) {
     311           1 :                 // Reacquire any resources required by custom options. This may be used, for
     312           1 :                 // example, by the encryption-at-rest custom option (within the Cockroach
     313           1 :                 // repository) to reopen the file registry.
     314           1 :                 for i := range t.testOpts.CustomOpts {
     315           0 :                         if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
     316           0 :                                 return err
     317           0 :                         }
     318             :                 }
     319           1 :                 dir := t.dir
     320           1 :                 if len(t.dbs) > 1 {
     321           0 :                         dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
     322           0 :                 }
     323           1 :                 t.dbs[dbID.slot()-1], err = pebble.Open(dir, t.opts)
     324           1 :                 if err != nil {
     325           0 :                         return err
     326           0 :                 }
     327           1 :                 return err
     328             :         })
     329           1 :         t.opts.Cache.Unref()
     330           1 :         return err
     331             : }
     332             : 
     333           1 : func (t *Test) saveInMemoryDataInternal() error {
     334           1 :         if rootFS := vfs.Root(t.opts.FS); rootFS != vfs.Default {
     335           1 :                 // t.opts.FS is an in-memory system; copy it to disk.
     336           1 :                 if err := os.RemoveAll(t.dir); err != nil {
     337           0 :                         return err
     338           0 :                 }
     339           1 :                 if _, err := vfs.Clone(rootFS, vfs.Default, t.dir, t.dir); err != nil {
     340           0 :                         return err
     341           0 :                 }
     342             :         }
     343           1 :         if t.testOpts.sharedStorageEnabled {
     344           0 :                 if err := copyRemoteStorage(t.testOpts.sharedStorageFS, filepath.Join(t.dir, "shared")); err != nil {
     345           0 :                         return err
     346           0 :                 }
     347             :         }
     348           1 :         if t.testOpts.externalStorageEnabled {
     349           0 :                 if err := copyRemoteStorage(t.testOpts.externalStorageFS, filepath.Join(t.dir, "external")); err != nil {
     350           0 :                         return err
     351           0 :                 }
     352             :         }
     353           1 :         return nil
     354             : }
     355             : 
     356           0 : func copyRemoteStorage(fs remote.Storage, outputDir string) error {
     357           0 :         if err := vfs.Default.MkdirAll(outputDir, 0755); err != nil {
     358           0 :                 return err
     359           0 :         }
     360           0 :         objs, err := fs.List("", "")
     361           0 :         if err != nil {
     362           0 :                 return err
     363           0 :         }
     364           0 :         for i := range objs {
     365           0 :                 reader, readSize, err := fs.ReadObject(context.TODO(), objs[i])
     366           0 :                 if err != nil {
     367           0 :                         return err
     368           0 :                 }
     369           0 :                 buf := make([]byte, readSize)
     370           0 :                 if err := reader.ReadAt(context.TODO(), buf, 0); err != nil {
     371           0 :                         return err
     372           0 :                 }
     373           0 :                 outputPath := vfs.Default.PathJoin(outputDir, objs[i])
     374           0 :                 outputFile, err := vfs.Default.Create(outputPath, vfs.WriteCategoryUnspecified)
     375           0 :                 if err != nil {
     376           0 :                         return err
     377           0 :                 }
     378           0 :                 if _, err := outputFile.Write(buf); err != nil {
     379           0 :                         outputFile.Close()
     380           0 :                         return err
     381           0 :                 }
     382           0 :                 if err := outputFile.Close(); err != nil {
     383           0 :                         return err
     384           0 :                 }
     385             :         }
     386           0 :         return nil
     387             : }
     388             : 
     389             : // If an in-memory FS is being used, save the contents to disk.
     390           1 : func (t *Test) saveInMemoryData() {
     391           1 :         if err := t.saveInMemoryDataInternal(); err != nil {
     392           0 :                 t.opts.Logger.Infof("unable to save data: %s: %v", t.dir, err)
     393           0 :         }
     394             : }
     395             : 
     396             : // Step runs one single operation, returning: whether there are additional
     397             : // operations remaining; the operation's output; and an error if any occurred
     398             : // while running the operation.
     399             : //
     400             : // Step may be used instead of Execute to advance a test one operation at a
     401             : // time.
     402           1 : func (t *Test) Step() (more bool, operationOutput string, err error) {
     403           1 :         more = t.step(t.h, func(format string, args ...interface{}) {
     404           1 :                 operationOutput = fmt.Sprintf(format, args...)
     405           1 :         })
     406           1 :         err = t.h.Error()
     407           1 :         return more, operationOutput, err
     408             : }
     409             : 
     410           1 : func (t *Test) step(h *history, optionalRecordf func(string, ...interface{})) bool {
     411           1 :         if t.idx >= len(t.ops) {
     412           1 :                 return false
     413           1 :         }
     414           1 :         t.runOp(t.idx, h.recorder(-1 /* thread */, t.idx, optionalRecordf))
     415           1 :         t.idx++
     416           1 :         return true
     417             : }
     418             : 
     419             : // runOp runs t.ops[idx] with t.opTimeout.
     420           1 : func (t *Test) runOp(idx int, h historyRecorder) {
     421           1 :         op := t.ops[idx]
     422           1 :         var timer *time.Timer
     423           1 :         if t.opTimeout > 0 {
     424           0 :                 opTimeout := t.opTimeout
     425           0 :                 switch op.(type) {
     426           0 :                 case *compactOp, *newSnapshotOp, *ingestOp, *ingestAndExciseOp, *ingestExternalFilesOp:
     427           0 :                         // These ops can be very slow, especially if we end up with many tiny
     428           0 :                         // tables. Bump up the timout by a factor.
     429           0 :                         opTimeout *= 4
     430           0 :                 case *downloadOp:
     431           0 :                         opTimeout *= 8
     432             :                 }
     433           0 :                 timer = time.AfterFunc(opTimeout, func() {
     434           0 :                         panic(fmt.Sprintf("operation took longer than %s: %s",
     435           0 :                                 opTimeout, op.formattedString(t.testOpts.KeyFormat)))
     436             :                 })
     437             :         }
     438           1 :         op.run(t, h)
     439           1 :         if timer != nil {
     440           0 :                 timer.Stop()
     441           0 :         }
     442             : }
     443             : 
     444           1 : func (t *Test) setBatch(id objID, b *pebble.Batch) {
     445           1 :         if id.tag() != batchTag {
     446           0 :                 panic(fmt.Sprintf("invalid batch ID: %s", id))
     447             :         }
     448           1 :         t.batches[id.slot()] = b
     449             : }
     450             : 
     451           1 : func (t *Test) setIter(id objID, i *pebble.Iterator) {
     452           1 :         if id.tag() != iterTag {
     453           0 :                 panic(fmt.Sprintf("invalid iter ID: %s", id))
     454             :         }
     455           1 :         t.iters[id.slot()] = &retryableIter{
     456           1 :                 iter:      i,
     457           1 :                 lastKey:   nil,
     458           1 :                 needRetry: t.testOpts.RetryPolicy,
     459           1 :         }
     460             : }
     461             : 
     462             : type readerCloser interface {
     463             :         pebble.Reader
     464             :         io.Closer
     465             : }
     466             : 
     467           1 : func (t *Test) setSnapshot(id objID, s readerCloser) {
     468           1 :         if id.tag() != snapTag {
     469           0 :                 panic(fmt.Sprintf("invalid snapshot ID: %s", id))
     470             :         }
     471           1 :         t.snapshots[id.slot()] = s
     472             : }
     473             : 
     474           1 : func (t *Test) setExternalObj(id objID, meta externalObjMeta) {
     475           1 :         if id.tag() != externalObjTag {
     476           0 :                 panic(fmt.Sprintf("invalid external object ID: %s", id))
     477             :         }
     478           1 :         t.externalObjs[id.slot()] = meta
     479             : }
     480             : 
     481           1 : func (t *Test) getExternalObj(id objID) externalObjMeta {
     482           1 :         if id.tag() != externalObjTag || t.externalObjs[id.slot()].sstMeta == nil {
     483           0 :                 panic(fmt.Sprintf("metamorphic test internal error: invalid external object ID: %s", id))
     484             :         }
     485           1 :         return t.externalObjs[id.slot()]
     486             : }
     487             : 
     488           1 : func (t *Test) clearObj(id objID) {
     489           1 :         switch id.tag() {
     490           1 :         case dbTag:
     491           1 :                 t.dbs[id.slot()-1] = nil
     492           1 :         case batchTag:
     493           1 :                 t.batches[id.slot()] = nil
     494           1 :         case iterTag:
     495           1 :                 t.iters[id.slot()] = nil
     496           1 :         case snapTag:
     497           1 :                 t.snapshots[id.slot()] = nil
     498           0 :         default:
     499           0 :                 panic(fmt.Sprintf("cannot clear ID: %s", id))
     500             :         }
     501             : }
     502             : 
     503           1 : func (t *Test) getBatch(id objID) *pebble.Batch {
     504           1 :         if id.tag() != batchTag || t.batches[id.slot()] == nil {
     505           0 :                 panic(fmt.Sprintf("metamorphic test internal error: invalid batch ID: %s", id))
     506             :         }
     507           1 :         return t.batches[id.slot()]
     508             : }
     509             : 
     510           1 : func (t *Test) getCloser(id objID) io.Closer {
     511           1 :         switch id.tag() {
     512           1 :         case dbTag:
     513           1 :                 return t.dbs[id.slot()-1]
     514           1 :         case batchTag:
     515           1 :                 return t.batches[id.slot()]
     516           1 :         case iterTag:
     517           1 :                 return t.iters[id.slot()]
     518           1 :         case snapTag:
     519           1 :                 return t.snapshots[id.slot()]
     520           0 :         default:
     521           0 :                 panic(fmt.Sprintf("cannot close ID: %s", id))
     522             :         }
     523             : }
     524             : 
     525           1 : func (t *Test) getIter(id objID) *retryableIter {
     526           1 :         if id.tag() != iterTag {
     527           0 :                 panic(fmt.Sprintf("invalid iter ID: %s", id))
     528             :         }
     529           1 :         return t.iters[id.slot()]
     530             : }
     531             : 
     532           1 : func (t *Test) getReader(id objID) pebble.Reader {
     533           1 :         switch id.tag() {
     534           1 :         case dbTag:
     535           1 :                 return t.dbs[id.slot()-1]
     536           1 :         case batchTag:
     537           1 :                 return t.batches[id.slot()]
     538           1 :         case snapTag:
     539           1 :                 return t.snapshots[id.slot()]
     540           0 :         default:
     541           0 :                 panic(fmt.Sprintf("invalid reader ID: %s", id))
     542             :         }
     543             : }
     544             : 
     545           1 : func (t *Test) getWriter(id objID) pebble.Writer {
     546           1 :         switch id.tag() {
     547           1 :         case dbTag:
     548           1 :                 return t.dbs[id.slot()-1]
     549           1 :         case batchTag:
     550           1 :                 return t.batches[id.slot()]
     551           0 :         default:
     552           0 :                 panic(fmt.Sprintf("invalid writer ID: %s", id))
     553             :         }
     554             : }
     555             : 
     556           1 : func (t *Test) getDB(id objID) *pebble.DB {
     557           1 :         switch id.tag() {
     558           1 :         case dbTag:
     559           1 :                 return t.dbs[id.slot()-1]
     560           0 :         default:
     561           0 :                 panic(fmt.Sprintf("invalid writer tag: %v", id.tag()))
     562             :         }
     563             : }
     564             : 
     565             : // Compute the synchronization points between operations. When operating
     566             : // with more than 1 thread, operations must synchronize access to shared
     567             : // objects. Compute two slices the same length as ops.
     568             : //
     569             : // opsWaitOn: the value v at index i indicates that operation i must wait
     570             : // for the operation at index v to finish before it may run. NB: v < i
     571             : //
     572             : // opsDone: the channel at index i must be closed when the operation at index i
     573             : // completes. This slice is sparse. Operations that are never used as
     574             : // synchronization points may have a nil channel.
     575           1 : func computeSynchronizationPoints(ops []op) (opsWaitOn [][]int, opsDone []chan struct{}) {
     576           1 :         opsDone = make([]chan struct{}, len(ops)) // operation index -> done channel
     577           1 :         opsWaitOn = make([][]int, len(ops))       // operation index -> operation index
     578           1 :         lastOpReference := make(map[objID]int)    // objID -> operation index
     579           1 :         for i, o := range ops {
     580           1 :                 // Find the last operation that involved the same receiver object. We at
     581           1 :                 // least need to wait on that operation.
     582           1 :                 receiver := o.receiver()
     583           1 :                 waitIndex, ok := lastOpReference[receiver]
     584           1 :                 lastOpReference[receiver] = i
     585           1 :                 if !ok {
     586           1 :                         // Only valid for i=0. For all other operations, the receiver should
     587           1 :                         // have been referenced by some other operation before it's used as
     588           1 :                         // a receiver.
     589           1 :                         if i != 0 && receiver.tag() != dbTag {
     590           0 :                                 panic(fmt.Sprintf("op %T on receiver %s; first reference of %s",
     591           0 :                                         ops[i], receiver, receiver))
     592             :                         }
     593             :                         // The initOp is a little special. We do want to store the objects it's
     594             :                         // syncing on, in `lastOpReference`.
     595           1 :                         if i != 0 {
     596           0 :                                 continue
     597             :                         }
     598             :                 }
     599             : 
     600             :                 // The last operation that referenced `receiver` is the one at index
     601             :                 // `waitIndex`. All operations with the same receiver are performed on
     602             :                 // the same thread. We only need to synchronize on the operation at
     603             :                 // `waitIndex` if `receiver` isn't also the receiver on that operation
     604             :                 // too.
     605           1 :                 if ops[waitIndex].receiver() != receiver {
     606           1 :                         opsWaitOn[i] = append(opsWaitOn[i], waitIndex)
     607           1 :                 }
     608             : 
     609             :                 // In additional to synchronizing on the operation's receiver operation,
     610             :                 // we may need to synchronize on additional objects. For example,
     611             :                 // batch0.Commit() must synchronize its receiver, batch0, but also on
     612             :                 // the DB since it mutates database state.
     613           1 :                 for _, syncObjID := range o.syncObjs() {
     614           1 :                         if vi, vok := lastOpReference[syncObjID]; vok {
     615           1 :                                 if vi == i {
     616           0 :                                         panic(fmt.Sprintf("%T has %s as syncObj multiple times", ops[i], syncObjID))
     617             :                                 }
     618           1 :                                 opsWaitOn[i] = append(opsWaitOn[i], vi)
     619             :                         }
     620           1 :                         lastOpReference[syncObjID] = i
     621             :                 }
     622             : 
     623           1 :                 waitIndexes := opsWaitOn[i]
     624           1 :                 sort.Ints(waitIndexes)
     625           1 :                 for _, waitIndex := range waitIndexes {
     626           1 :                         // If this is the first operation that must wait on the operation at
     627           1 :                         // `waitIndex`, then there will be no channel for the operation yet.
     628           1 :                         // Create one.
     629           1 :                         if opsDone[waitIndex] == nil {
     630           1 :                                 opsDone[waitIndex] = make(chan struct{})
     631           1 :                         }
     632             :                 }
     633             :         }
     634           1 :         return opsWaitOn, opsDone
     635             : }

Generated by: LCOV version 1.14