LCOV - code coverage report
Current view: top level - pebble/metamorphic - test.go (source / functions) Hit Total Coverage
Test: 2023-12-11 08:16Z f16e0f48 - tests only.lcov Lines: 192 300 64.0 %
Date: 2023-12-11 08:16:34 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "os"
      11             :         "path"
      12             :         "sort"
      13             :         "strings"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/pebble"
      17             :         "github.com/cockroachdb/pebble/vfs"
      18             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      19             : )
      20             : 
      21             : type test struct {
      22             :         // The list of ops to execute. The ops refer to slots in the batches, iters,
      23             :         // and snapshots slices.
      24             :         ops       []op
      25             :         opsWaitOn [][]int         // op index -> op indexes
      26             :         opsDone   []chan struct{} // op index -> done channel
      27             :         idx       int
      28             :         dir       string
      29             :         opts      *pebble.Options
      30             :         testOpts  *TestOptions
      31             :         writeOpts *pebble.WriteOptions
      32             :         tmpDir    string
      33             :         // The DBs the test is run on.
      34             :         dbs []*pebble.DB
      35             :         // The slots for the batches, iterators, and snapshots. These are read and
      36             :         // written by the ops to pass state from one op to another.
      37             :         batches   []*pebble.Batch
      38             :         iters     []*retryableIter
      39             :         snapshots []readerCloser
      40             : }
      41             : 
      42           1 : func newTest(ops []op) *test {
      43           1 :         return &test{
      44           1 :                 ops: ops,
      45           1 :         }
      46           1 : }
      47             : 
      48           1 : func (t *test) init(h *history, dir string, testOpts *TestOptions, numInstances int) error {
      49           1 :         t.dir = dir
      50           1 :         t.testOpts = testOpts
      51           1 :         t.writeOpts = pebble.NoSync
      52           1 :         if testOpts.strictFS {
      53           0 :                 t.writeOpts = pebble.Sync
      54           0 :         }
      55           1 :         t.opts = testOpts.Opts.EnsureDefaults()
      56           1 :         t.opts.Logger = h
      57           1 :         lel := pebble.MakeLoggingEventListener(t.opts.Logger)
      58           1 :         t.opts.EventListener = &lel
      59           1 :         t.opts.DebugCheck = func(db *pebble.DB) error {
      60           1 :                 // Wrap the ordinary DebugCheckLevels with retrying
      61           1 :                 // of injected errors.
      62           1 :                 return withRetries(func() error {
      63           1 :                         return pebble.DebugCheckLevels(db)
      64           1 :                 })
      65             :         }
      66           1 :         if numInstances < 1 {
      67           1 :                 numInstances = 1
      68           1 :         }
      69             : 
      70           1 :         t.opsWaitOn, t.opsDone = computeSynchronizationPoints(t.ops)
      71           1 : 
      72           1 :         defer t.opts.Cache.Unref()
      73           1 : 
      74           1 :         // If an error occurs and we were using an in-memory FS, attempt to clone to
      75           1 :         // on-disk in order to allow post-mortem debugging. Note that always using
      76           1 :         // the on-disk FS isn't desirable because there is a large performance
      77           1 :         // difference between in-memory and on-disk which causes different code paths
      78           1 :         // and timings to be exercised.
      79           1 :         maybeExit := func(err error) {
      80           1 :                 if err == nil || errors.Is(err, errorfs.ErrInjected) || errors.Is(err, pebble.ErrCancelledCompaction) {
      81           1 :                         return
      82           1 :                 }
      83           0 :                 t.maybeSaveData()
      84           0 :                 fmt.Fprintln(os.Stderr, err)
      85           0 :                 os.Exit(1)
      86             :         }
      87             : 
      88             :         // Exit early on any error from a background operation.
      89           1 :         t.opts.EventListener.BackgroundError = func(err error) {
      90           0 :                 t.opts.Logger.Infof("background error: %s", err)
      91           0 :                 maybeExit(err)
      92           0 :         }
      93           1 :         t.opts.EventListener.CompactionEnd = func(info pebble.CompactionInfo) {
      94           1 :                 t.opts.Logger.Infof("%s", info)
      95           1 :                 maybeExit(info.Err)
      96           1 :         }
      97           1 :         t.opts.EventListener.FlushEnd = func(info pebble.FlushInfo) {
      98           1 :                 t.opts.Logger.Infof("%s", info)
      99           1 :                 if info.Err != nil && !strings.Contains(info.Err.Error(), "pebble: empty table") {
     100           0 :                         maybeExit(info.Err)
     101           0 :                 }
     102             :         }
     103           1 :         t.opts.EventListener.ManifestCreated = func(info pebble.ManifestCreateInfo) {
     104           1 :                 t.opts.Logger.Infof("%s", info)
     105           1 :                 maybeExit(info.Err)
     106           1 :         }
     107           1 :         t.opts.EventListener.ManifestDeleted = func(info pebble.ManifestDeleteInfo) {
     108           0 :                 t.opts.Logger.Infof("%s", info)
     109           0 :                 maybeExit(info.Err)
     110           0 :         }
     111           1 :         t.opts.EventListener.TableDeleted = func(info pebble.TableDeleteInfo) {
     112           1 :                 t.opts.Logger.Infof("%s", info)
     113           1 :                 maybeExit(info.Err)
     114           1 :         }
     115           1 :         t.opts.EventListener.TableIngested = func(info pebble.TableIngestInfo) {
     116           1 :                 t.opts.Logger.Infof("%s", info)
     117           1 :                 maybeExit(info.Err)
     118           1 :         }
     119           1 :         t.opts.EventListener.WALCreated = func(info pebble.WALCreateInfo) {
     120           1 :                 t.opts.Logger.Infof("%s", info)
     121           1 :                 maybeExit(info.Err)
     122           1 :         }
     123           1 :         t.opts.EventListener.WALDeleted = func(info pebble.WALDeleteInfo) {
     124           1 :                 t.opts.Logger.Infof("%s", info)
     125           1 :                 maybeExit(info.Err)
     126           1 :         }
     127             : 
     128           1 :         for i := range t.testOpts.CustomOpts {
     129           0 :                 if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
     130           0 :                         return err
     131           0 :                 }
     132             :         }
     133             : 
     134           1 :         t.dbs = make([]*pebble.DB, numInstances)
     135           1 :         for i := range t.dbs {
     136           1 :                 var db *pebble.DB
     137           1 :                 var err error
     138           1 :                 if len(t.dbs) > 1 {
     139           0 :                         dir = path.Join(t.dir, fmt.Sprintf("db%d", i+1))
     140           0 :                 }
     141           1 :                 err = withRetries(func() error {
     142           1 :                         db, err = pebble.Open(dir, t.opts)
     143           1 :                         return err
     144           1 :                 })
     145           1 :                 if err != nil {
     146           0 :                         return err
     147           0 :                 }
     148           1 :                 t.dbs[i] = db
     149           1 :                 h.log.Printf("// db%d.Open() %v", i+1, err)
     150           1 : 
     151           1 :                 if t.testOpts.sharedStorageEnabled {
     152           0 :                         err = withRetries(func() error {
     153           0 :                                 return db.SetCreatorID(uint64(i + 1))
     154           0 :                         })
     155           0 :                         if err != nil {
     156           0 :                                 return err
     157           0 :                         }
     158           0 :                         h.log.Printf("// db%d.SetCreatorID() %v", i+1, err)
     159             :                 }
     160             :         }
     161             : 
     162           1 :         var err error
     163           1 :         t.tmpDir = t.opts.FS.PathJoin(t.dir, "tmp")
     164           1 :         if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil {
     165           0 :                 return err
     166           0 :         }
     167           1 :         if t.testOpts.strictFS {
     168           0 :                 // Sync the whole directory path for the tmpDir, since restartDB() is executed during
     169           0 :                 // the test. That would reset MemFS to the synced state, which would make an unsynced
     170           0 :                 // directory disappear in the middle of the test. It is the responsibility of the test
     171           0 :                 // (not Pebble) to ensure that it can write the ssts that it will subsequently ingest
     172           0 :                 // into Pebble.
     173           0 :                 for {
     174           0 :                         f, err := t.opts.FS.OpenDir(dir)
     175           0 :                         if err != nil {
     176           0 :                                 return err
     177           0 :                         }
     178           0 :                         if err = f.Sync(); err != nil {
     179           0 :                                 return err
     180           0 :                         }
     181           0 :                         if err = f.Close(); err != nil {
     182           0 :                                 return err
     183           0 :                         }
     184           0 :                         if len(dir) == 1 {
     185           0 :                                 break
     186             :                         }
     187           0 :                         dir = t.opts.FS.PathDir(dir)
     188           0 :                         // TODO(sbhola): PathDir returns ".", which OpenDir() complains about. Fix.
     189           0 :                         if len(dir) == 1 {
     190           0 :                                 dir = "/"
     191           0 :                         }
     192             :                 }
     193             :         }
     194             : 
     195           1 :         return nil
     196             : }
     197             : 
     198           0 : func (t *test) isFMV(dbID objID, fmv pebble.FormatMajorVersion) bool {
     199           0 :         db := t.getDB(dbID)
     200           0 :         return db.FormatMajorVersion() >= fmv
     201           0 : }
     202             : 
     203           1 : func (t *test) restartDB(dbID objID) error {
     204           1 :         db := t.getDB(dbID)
     205           1 :         if !t.testOpts.strictFS {
     206           1 :                 return nil
     207           1 :         }
     208           0 :         t.opts.Cache.Ref()
     209           0 :         // The fs isn't necessarily a MemFS.
     210           0 :         fs, ok := vfs.Root(t.opts.FS).(*vfs.MemFS)
     211           0 :         if ok {
     212           0 :                 fs.SetIgnoreSyncs(true)
     213           0 :         }
     214           0 :         if err := db.Close(); err != nil {
     215           0 :                 return err
     216           0 :         }
     217             :         // Release any resources held by custom options. This may be used, for
     218             :         // example, by the encryption-at-rest custom option (within the Cockroach
     219             :         // repository) to close the file registry.
     220           0 :         for i := range t.testOpts.CustomOpts {
     221           0 :                 if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
     222           0 :                         return err
     223           0 :                 }
     224             :         }
     225           0 :         if ok {
     226           0 :                 fs.ResetToSyncedState()
     227           0 :                 fs.SetIgnoreSyncs(false)
     228           0 :         }
     229             : 
     230             :         // TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
     231             :         // are well defined within the context of retries.
     232           0 :         err := withRetries(func() (err error) {
     233           0 :                 // Reacquire any resources required by custom options. This may be used, for
     234           0 :                 // example, by the encryption-at-rest custom option (within the Cockroach
     235           0 :                 // repository) to reopen the file registry.
     236           0 :                 for i := range t.testOpts.CustomOpts {
     237           0 :                         if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
     238           0 :                                 return err
     239           0 :                         }
     240             :                 }
     241           0 :                 dir := t.dir
     242           0 :                 if len(t.dbs) > 1 {
     243           0 :                         dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
     244           0 :                 }
     245           0 :                 t.dbs[dbID.slot()-1], err = pebble.Open(dir, t.opts)
     246           0 :                 if err != nil {
     247           0 :                         return err
     248           0 :                 }
     249           0 :                 return err
     250             :         })
     251           0 :         t.opts.Cache.Unref()
     252           0 :         return err
     253             : }
     254             : 
     255             : // If an in-memory FS is being used, save the contents to disk.
     256           1 : func (t *test) maybeSaveData() {
     257           1 :         rootFS := vfs.Root(t.opts.FS)
     258           1 :         if rootFS == vfs.Default {
     259           0 :                 return
     260           0 :         }
     261           1 :         _ = os.RemoveAll(t.dir)
     262           1 :         if _, err := vfs.Clone(rootFS, vfs.Default, t.dir, t.dir); err != nil {
     263           0 :                 t.opts.Logger.Infof("unable to clone: %s: %v", t.dir, err)
     264           0 :         }
     265             : }
     266             : 
     267           1 : func (t *test) step(h *history) bool {
     268           1 :         if t.idx >= len(t.ops) {
     269           1 :                 return false
     270           1 :         }
     271           1 :         t.ops[t.idx].run(t, h.recorder(-1 /* thread */, t.idx))
     272           1 :         t.idx++
     273           1 :         return true
     274             : }
     275             : 
     276           1 : func (t *test) setBatch(id objID, b *pebble.Batch) {
     277           1 :         if id.tag() != batchTag {
     278           0 :                 panic(fmt.Sprintf("invalid batch ID: %s", id))
     279             :         }
     280           1 :         t.batches[id.slot()] = b
     281             : }
     282             : 
     283           1 : func (t *test) setIter(id objID, i *pebble.Iterator) {
     284           1 :         if id.tag() != iterTag {
     285           0 :                 panic(fmt.Sprintf("invalid iter ID: %s", id))
     286             :         }
     287           1 :         t.iters[id.slot()] = &retryableIter{
     288           1 :                 iter:    i,
     289           1 :                 lastKey: nil,
     290           1 :         }
     291             : }
     292             : 
     293             : type readerCloser interface {
     294             :         pebble.Reader
     295             :         io.Closer
     296             : }
     297             : 
     298           1 : func (t *test) setSnapshot(id objID, s readerCloser) {
     299           1 :         if id.tag() != snapTag {
     300           0 :                 panic(fmt.Sprintf("invalid snapshot ID: %s", id))
     301             :         }
     302           1 :         t.snapshots[id.slot()] = s
     303             : }
     304             : 
     305           1 : func (t *test) clearObj(id objID) {
     306           1 :         switch id.tag() {
     307           1 :         case dbTag:
     308           1 :                 t.dbs[id.slot()-1] = nil
     309           1 :         case batchTag:
     310           1 :                 t.batches[id.slot()] = nil
     311           1 :         case iterTag:
     312           1 :                 t.iters[id.slot()] = nil
     313           1 :         case snapTag:
     314           1 :                 t.snapshots[id.slot()] = nil
     315             :         }
     316             : }
     317             : 
     318           1 : func (t *test) getBatch(id objID) *pebble.Batch {
     319           1 :         if id.tag() != batchTag {
     320           0 :                 panic(fmt.Sprintf("invalid batch ID: %s", id))
     321             :         }
     322           1 :         return t.batches[id.slot()]
     323             : }
     324             : 
     325           1 : func (t *test) getCloser(id objID) io.Closer {
     326           1 :         switch id.tag() {
     327           1 :         case dbTag:
     328           1 :                 return t.dbs[id.slot()-1]
     329           1 :         case batchTag:
     330           1 :                 return t.batches[id.slot()]
     331           1 :         case iterTag:
     332           1 :                 return t.iters[id.slot()]
     333           1 :         case snapTag:
     334           1 :                 return t.snapshots[id.slot()]
     335             :         }
     336           0 :         panic(fmt.Sprintf("cannot close ID: %s", id))
     337             : }
     338             : 
     339           1 : func (t *test) getIter(id objID) *retryableIter {
     340           1 :         if id.tag() != iterTag {
     341           0 :                 panic(fmt.Sprintf("invalid iter ID: %s", id))
     342             :         }
     343           1 :         return t.iters[id.slot()]
     344             : }
     345             : 
     346           1 : func (t *test) getReader(id objID) pebble.Reader {
     347           1 :         switch id.tag() {
     348           1 :         case dbTag:
     349           1 :                 return t.dbs[id.slot()-1]
     350           1 :         case batchTag:
     351           1 :                 return t.batches[id.slot()]
     352           1 :         case snapTag:
     353           1 :                 return t.snapshots[id.slot()]
     354             :         }
     355           0 :         panic(fmt.Sprintf("invalid reader ID: %s", id))
     356             : }
     357             : 
     358           1 : func (t *test) getWriter(id objID) pebble.Writer {
     359           1 :         switch id.tag() {
     360           1 :         case dbTag:
     361           1 :                 return t.dbs[id.slot()-1]
     362           1 :         case batchTag:
     363           1 :                 return t.batches[id.slot()]
     364             :         }
     365           0 :         panic(fmt.Sprintf("invalid writer ID: %s", id))
     366             : }
     367             : 
     368           1 : func (t *test) getDB(id objID) *pebble.DB {
     369           1 :         switch id.tag() {
     370           1 :         case dbTag:
     371           1 :                 return t.dbs[id.slot()-1]
     372           0 :         default:
     373           0 :                 panic(fmt.Sprintf("invalid writer tag: %v", id.tag()))
     374             :         }
     375             : }
     376             : 
     377             : // Compute the synchronization points between operations. When operating
     378             : // with more than 1 thread, operations must synchronize access to shared
     379             : // objects. Compute two slices the same length as ops.
     380             : //
     381             : // opsWaitOn: the value v at index i indicates that operation i must wait
     382             : // for the operation at index v to finish before it may run. NB: v < i
     383             : //
     384             : // opsDone: the channel at index i must be closed when the operation at index i
     385             : // completes. This slice is sparse. Operations that are never used as
     386             : // synchronization points may have a nil channel.
     387           1 : func computeSynchronizationPoints(ops []op) (opsWaitOn [][]int, opsDone []chan struct{}) {
     388           1 :         opsDone = make([]chan struct{}, len(ops)) // operation index -> done channel
     389           1 :         opsWaitOn = make([][]int, len(ops))       // operation index -> operation index
     390           1 :         lastOpReference := make(map[objID]int)    // objID -> operation index
     391           1 :         for i, o := range ops {
     392           1 :                 // Find the last operation that involved the same receiver object. We at
     393           1 :                 // least need to wait on that operation.
     394           1 :                 receiver := o.receiver()
     395           1 :                 waitIndex, ok := lastOpReference[receiver]
     396           1 :                 lastOpReference[receiver] = i
     397           1 :                 if !ok {
     398           1 :                         // Only valid for i=0. For all other operations, the receiver should
     399           1 :                         // have been referenced by some other operation before it's used as
     400           1 :                         // a receiver.
     401           1 :                         if i != 0 && receiver.tag() != dbTag {
     402           0 :                                 panic(fmt.Sprintf("op %s on receiver %s; first reference of %s", ops[i].String(), receiver, receiver))
     403             :                         }
     404             :                         // The initOp is a little special. We do want to store the objects it's
     405             :                         // syncing on, in `lastOpReference`.
     406           1 :                         if i != 0 {
     407           0 :                                 continue
     408             :                         }
     409             :                 }
     410             : 
     411             :                 // The last operation that referenced `receiver` is the one at index
     412             :                 // `waitIndex`. All operations with the same receiver are performed on
     413             :                 // the same thread. We only need to synchronize on the operation at
     414             :                 // `waitIndex` if `receiver` isn't also the receiver on that operation
     415             :                 // too.
     416           1 :                 if ops[waitIndex].receiver() != receiver {
     417           1 :                         opsWaitOn[i] = append(opsWaitOn[i], waitIndex)
     418           1 :                 }
     419             : 
     420             :                 // In additional to synchronizing on the operation's receiver operation,
     421             :                 // we may need to synchronize on additional objects. For example,
     422             :                 // batch0.Commit() must synchronize its receiver, batch0, but also on
     423             :                 // the DB since it mutates database state.
     424           1 :                 for _, syncObjID := range o.syncObjs() {
     425           1 :                         if vi, vok := lastOpReference[syncObjID]; vok {
     426           1 :                                 opsWaitOn[i] = append(opsWaitOn[i], vi)
     427           1 :                         }
     428           1 :                         lastOpReference[syncObjID] = i
     429             :                 }
     430             : 
     431           1 :                 waitIndexes := opsWaitOn[i]
     432           1 :                 sort.Ints(waitIndexes)
     433           1 :                 for _, waitIndex := range waitIndexes {
     434           1 :                         // If this is the first operation that must wait on the operation at
     435           1 :                         // `waitIndex`, then there will be no channel for the operation yet.
     436           1 :                         // Create one.
     437           1 :                         if opsDone[waitIndex] == nil {
     438           1 :                                 opsDone[waitIndex] = make(chan struct{})
     439           1 :                         }
     440             :                 }
     441             :         }
     442           1 :         return opsWaitOn, opsDone
     443             : }

Generated by: LCOV version 1.14