LCOV - code coverage report
Current view: top level - pebble/metamorphic - test.go (source / functions) Hit Total Coverage
Test: 2023-09-27 08:17Z 14b8ccdc - tests only.lcov Lines: 0 275 0.0 %
Date: 2023-09-27 08:17:49 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "os"
      11             :         "sort"
      12             :         "strings"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble"
      16             :         "github.com/cockroachdb/pebble/vfs"
      17             :         "github.com/cockroachdb/pebble/vfs/errorfs"
      18             : )
      19             : 
      20             : type test struct {
      21             :         // The list of ops to execute. The ops refer to slots in the batches, iters,
      22             :         // and snapshots slices.
      23             :         ops       []op
      24             :         opsWaitOn [][]int         // op index -> op indexes
      25             :         opsDone   []chan struct{} // op index -> done channel
      26             :         idx       int
      27             :         // The DB the test is run on.
      28             :         dir       string
      29             :         db        *pebble.DB
      30             :         opts      *pebble.Options
      31             :         testOpts  *TestOptions
      32             :         writeOpts *pebble.WriteOptions
      33             :         tmpDir    string
      34             :         // The slots for the batches, iterators, and snapshots. These are read and
      35             :         // written by the ops to pass state from one op to another.
      36             :         batches   []*pebble.Batch
      37             :         iters     []*retryableIter
      38             :         snapshots []readerCloser
      39             : }
      40             : 
      41           0 : func newTest(ops []op) *test {
      42           0 :         return &test{
      43           0 :                 ops: ops,
      44           0 :         }
      45           0 : }
      46             : 
      47           0 : func (t *test) init(h *history, dir string, testOpts *TestOptions) error {
      48           0 :         t.dir = dir
      49           0 :         t.testOpts = testOpts
      50           0 :         t.writeOpts = pebble.NoSync
      51           0 :         if testOpts.strictFS {
      52           0 :                 t.writeOpts = pebble.Sync
      53           0 :         }
      54           0 :         t.opts = testOpts.Opts.EnsureDefaults()
      55           0 :         t.opts.Logger = h
      56           0 :         lel := pebble.MakeLoggingEventListener(t.opts.Logger)
      57           0 :         t.opts.EventListener = &lel
      58           0 :         t.opts.DebugCheck = func(db *pebble.DB) error {
      59           0 :                 // Wrap the ordinary DebugCheckLevels with retrying
      60           0 :                 // of injected errors.
      61           0 :                 return withRetries(func() error {
      62           0 :                         return pebble.DebugCheckLevels(db)
      63           0 :                 })
      64             :         }
      65             : 
      66           0 :         t.opsWaitOn, t.opsDone = computeSynchronizationPoints(t.ops)
      67           0 : 
      68           0 :         defer t.opts.Cache.Unref()
      69           0 : 
      70           0 :         // If an error occurs and we were using an in-memory FS, attempt to clone to
      71           0 :         // on-disk in order to allow post-mortem debugging. Note that always using
      72           0 :         // the on-disk FS isn't desirable because there is a large performance
      73           0 :         // difference between in-memory and on-disk which causes different code paths
      74           0 :         // and timings to be exercised.
      75           0 :         maybeExit := func(err error) {
      76           0 :                 if err == nil || errors.Is(err, errorfs.ErrInjected) {
      77           0 :                         return
      78           0 :                 }
      79           0 :                 t.maybeSaveData()
      80           0 :                 fmt.Fprintln(os.Stderr, err)
      81           0 :                 os.Exit(1)
      82             :         }
      83             : 
      84             :         // Exit early on any error from a background operation.
      85           0 :         t.opts.EventListener.BackgroundError = func(err error) {
      86           0 :                 t.opts.Logger.Infof("background error: %s", err)
      87           0 :                 maybeExit(err)
      88           0 :         }
      89           0 :         t.opts.EventListener.CompactionEnd = func(info pebble.CompactionInfo) {
      90           0 :                 t.opts.Logger.Infof("%s", info)
      91           0 :                 maybeExit(info.Err)
      92           0 :         }
      93           0 :         t.opts.EventListener.FlushEnd = func(info pebble.FlushInfo) {
      94           0 :                 t.opts.Logger.Infof("%s", info)
      95           0 :                 if info.Err != nil && !strings.Contains(info.Err.Error(), "pebble: empty table") {
      96           0 :                         maybeExit(info.Err)
      97           0 :                 }
      98             :         }
      99           0 :         t.opts.EventListener.ManifestCreated = func(info pebble.ManifestCreateInfo) {
     100           0 :                 t.opts.Logger.Infof("%s", info)
     101           0 :                 maybeExit(info.Err)
     102           0 :         }
     103           0 :         t.opts.EventListener.ManifestDeleted = func(info pebble.ManifestDeleteInfo) {
     104           0 :                 t.opts.Logger.Infof("%s", info)
     105           0 :                 maybeExit(info.Err)
     106           0 :         }
     107           0 :         t.opts.EventListener.TableDeleted = func(info pebble.TableDeleteInfo) {
     108           0 :                 t.opts.Logger.Infof("%s", info)
     109           0 :                 maybeExit(info.Err)
     110           0 :         }
     111           0 :         t.opts.EventListener.TableIngested = func(info pebble.TableIngestInfo) {
     112           0 :                 t.opts.Logger.Infof("%s", info)
     113           0 :                 maybeExit(info.Err)
     114           0 :         }
     115           0 :         t.opts.EventListener.WALCreated = func(info pebble.WALCreateInfo) {
     116           0 :                 t.opts.Logger.Infof("%s", info)
     117           0 :                 maybeExit(info.Err)
     118           0 :         }
     119           0 :         t.opts.EventListener.WALDeleted = func(info pebble.WALDeleteInfo) {
     120           0 :                 t.opts.Logger.Infof("%s", info)
     121           0 :                 maybeExit(info.Err)
     122           0 :         }
     123             : 
     124           0 :         for i := range t.testOpts.CustomOpts {
     125           0 :                 if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
     126           0 :                         return err
     127           0 :                 }
     128             :         }
     129             : 
     130           0 :         var db *pebble.DB
     131           0 :         var err error
     132           0 :         err = withRetries(func() error {
     133           0 :                 db, err = pebble.Open(dir, t.opts)
     134           0 :                 return err
     135           0 :         })
     136           0 :         if err != nil {
     137           0 :                 return err
     138           0 :         }
     139           0 :         h.log.Printf("// db.Open() %v", err)
     140           0 : 
     141           0 :         if t.testOpts.sharedStorageEnabled {
     142           0 :                 err = withRetries(func() error {
     143           0 :                         return db.SetCreatorID(1)
     144           0 :                 })
     145           0 :                 if err != nil {
     146           0 :                         return err
     147           0 :                 }
     148           0 :                 h.log.Printf("// db.SetCreatorID() %v", err)
     149             :         }
     150             : 
     151           0 :         t.tmpDir = t.opts.FS.PathJoin(dir, "tmp")
     152           0 :         if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil {
     153           0 :                 return err
     154           0 :         }
     155           0 :         if t.testOpts.strictFS {
     156           0 :                 // Sync the whole directory path for the tmpDir, since restartDB() is executed during
     157           0 :                 // the test. That would reset MemFS to the synced state, which would make an unsynced
     158           0 :                 // directory disappear in the middle of the test. It is the responsibility of the test
     159           0 :                 // (not Pebble) to ensure that it can write the ssts that it will subsequently ingest
     160           0 :                 // into Pebble.
     161           0 :                 for {
     162           0 :                         f, err := t.opts.FS.OpenDir(dir)
     163           0 :                         if err != nil {
     164           0 :                                 return err
     165           0 :                         }
     166           0 :                         if err = f.Sync(); err != nil {
     167           0 :                                 return err
     168           0 :                         }
     169           0 :                         if err = f.Close(); err != nil {
     170           0 :                                 return err
     171           0 :                         }
     172           0 :                         if len(dir) == 1 {
     173           0 :                                 break
     174             :                         }
     175           0 :                         dir = t.opts.FS.PathDir(dir)
     176           0 :                         // TODO(sbhola): PathDir returns ".", which OpenDir() complains about. Fix.
     177           0 :                         if len(dir) == 1 {
     178           0 :                                 dir = "/"
     179           0 :                         }
     180             :                 }
     181             :         }
     182             : 
     183           0 :         t.db = db
     184           0 :         return nil
     185             : }
     186             : 
     187           0 : func (t *test) isFMV(fmv pebble.FormatMajorVersion) bool {
     188           0 :         return t.db.FormatMajorVersion() >= fmv
     189           0 : }
     190             : 
     191           0 : func (t *test) restartDB() error {
     192           0 :         if !t.testOpts.strictFS {
     193           0 :                 return nil
     194           0 :         }
     195           0 :         t.opts.Cache.Ref()
     196           0 :         // The fs isn't necessarily a MemFS.
     197           0 :         fs, ok := vfs.Root(t.opts.FS).(*vfs.MemFS)
     198           0 :         if ok {
     199           0 :                 fs.SetIgnoreSyncs(true)
     200           0 :         }
     201           0 :         if err := t.db.Close(); err != nil {
     202           0 :                 return err
     203           0 :         }
     204             :         // Release any resources held by custom options. This may be used, for
     205             :         // example, by the encryption-at-rest custom option (within the Cockroach
     206             :         // repository) to close the file registry.
     207           0 :         for i := range t.testOpts.CustomOpts {
     208           0 :                 if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
     209           0 :                         return err
     210           0 :                 }
     211             :         }
     212           0 :         if ok {
     213           0 :                 fs.ResetToSyncedState()
     214           0 :                 fs.SetIgnoreSyncs(false)
     215           0 :         }
     216             : 
     217             :         // TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
     218             :         // are well defined within the context of retries.
     219           0 :         err := withRetries(func() (err error) {
     220           0 :                 // Reacquire any resources required by custom options. This may be used, for
     221           0 :                 // example, by the encryption-at-rest custom option (within the Cockroach
     222           0 :                 // repository) to reopen the file registry.
     223           0 :                 for i := range t.testOpts.CustomOpts {
     224           0 :                         if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
     225           0 :                                 return err
     226           0 :                         }
     227             :                 }
     228           0 :                 t.db, err = pebble.Open(t.dir, t.opts)
     229           0 :                 return err
     230             :         })
     231           0 :         t.opts.Cache.Unref()
     232           0 :         return err
     233             : }
     234             : 
     235             : // If an in-memory FS is being used, save the contents to disk.
     236           0 : func (t *test) maybeSaveData() {
     237           0 :         rootFS := vfs.Root(t.opts.FS)
     238           0 :         if rootFS == vfs.Default {
     239           0 :                 return
     240           0 :         }
     241           0 :         _ = os.RemoveAll(t.dir)
     242           0 :         if _, err := vfs.Clone(rootFS, vfs.Default, t.dir, t.dir); err != nil {
     243           0 :                 t.opts.Logger.Infof("unable to clone: %s: %v", t.dir, err)
     244           0 :         }
     245             : }
     246             : 
     247           0 : func (t *test) step(h *history) bool {
     248           0 :         if t.idx >= len(t.ops) {
     249           0 :                 return false
     250           0 :         }
     251           0 :         t.ops[t.idx].run(t, h.recorder(-1 /* thread */, t.idx))
     252           0 :         t.idx++
     253           0 :         return true
     254             : }
     255             : 
     256           0 : func (t *test) setBatch(id objID, b *pebble.Batch) {
     257           0 :         if id.tag() != batchTag {
     258           0 :                 panic(fmt.Sprintf("invalid batch ID: %s", id))
     259             :         }
     260           0 :         t.batches[id.slot()] = b
     261             : }
     262             : 
     263           0 : func (t *test) setIter(id objID, i *pebble.Iterator) {
     264           0 :         if id.tag() != iterTag {
     265           0 :                 panic(fmt.Sprintf("invalid iter ID: %s", id))
     266             :         }
     267           0 :         t.iters[id.slot()] = &retryableIter{
     268           0 :                 iter:    i,
     269           0 :                 lastKey: nil,
     270           0 :         }
     271             : }
     272             : 
     273             : type readerCloser interface {
     274             :         pebble.Reader
     275             :         io.Closer
     276             : }
     277             : 
     278           0 : func (t *test) setSnapshot(id objID, s readerCloser) {
     279           0 :         if id.tag() != snapTag {
     280           0 :                 panic(fmt.Sprintf("invalid snapshot ID: %s", id))
     281             :         }
     282           0 :         t.snapshots[id.slot()] = s
     283             : }
     284             : 
     285           0 : func (t *test) clearObj(id objID) {
     286           0 :         switch id.tag() {
     287           0 :         case dbTag:
     288           0 :                 t.db = nil
     289           0 :         case batchTag:
     290           0 :                 t.batches[id.slot()] = nil
     291           0 :         case iterTag:
     292           0 :                 t.iters[id.slot()] = nil
     293           0 :         case snapTag:
     294           0 :                 t.snapshots[id.slot()] = nil
     295             :         }
     296             : }
     297             : 
     298           0 : func (t *test) getBatch(id objID) *pebble.Batch {
     299           0 :         if id.tag() != batchTag {
     300           0 :                 panic(fmt.Sprintf("invalid batch ID: %s", id))
     301             :         }
     302           0 :         return t.batches[id.slot()]
     303             : }
     304             : 
     305           0 : func (t *test) getCloser(id objID) io.Closer {
     306           0 :         switch id.tag() {
     307           0 :         case dbTag:
     308           0 :                 return t.db
     309           0 :         case batchTag:
     310           0 :                 return t.batches[id.slot()]
     311           0 :         case iterTag:
     312           0 :                 return t.iters[id.slot()]
     313           0 :         case snapTag:
     314           0 :                 return t.snapshots[id.slot()]
     315             :         }
     316           0 :         panic(fmt.Sprintf("cannot close ID: %s", id))
     317             : }
     318             : 
     319           0 : func (t *test) getIter(id objID) *retryableIter {
     320           0 :         if id.tag() != iterTag {
     321           0 :                 panic(fmt.Sprintf("invalid iter ID: %s", id))
     322             :         }
     323           0 :         return t.iters[id.slot()]
     324             : }
     325             : 
     326           0 : func (t *test) getReader(id objID) pebble.Reader {
     327           0 :         switch id.tag() {
     328           0 :         case dbTag:
     329           0 :                 return t.db
     330           0 :         case batchTag:
     331           0 :                 return t.batches[id.slot()]
     332           0 :         case snapTag:
     333           0 :                 return t.snapshots[id.slot()]
     334             :         }
     335           0 :         panic(fmt.Sprintf("invalid reader ID: %s", id))
     336             : }
     337             : 
     338           0 : func (t *test) getWriter(id objID) pebble.Writer {
     339           0 :         switch id.tag() {
     340           0 :         case dbTag:
     341           0 :                 return t.db
     342           0 :         case batchTag:
     343           0 :                 return t.batches[id.slot()]
     344             :         }
     345           0 :         panic(fmt.Sprintf("invalid writer ID: %s", id))
     346             : }
     347             : 
     348             : // Compute the synchronization points between operations. When operating
     349             : // with more than 1 thread, operations must synchronize access to shared
     350             : // objects. Compute two slices the same length as ops.
     351             : //
     352             : // opsWaitOn: the value v at index i indicates that operation i must wait
     353             : // for the operation at index v to finish before it may run. NB: v < i
     354             : //
     355             : // opsDone: the channel at index i must be closed when the operation at index i
     356             : // completes. This slice is sparse. Operations that are never used as
     357             : // synchronization points may have a nil channel.
     358           0 : func computeSynchronizationPoints(ops []op) (opsWaitOn [][]int, opsDone []chan struct{}) {
     359           0 :         opsDone = make([]chan struct{}, len(ops)) // operation index -> done channel
     360           0 :         opsWaitOn = make([][]int, len(ops))       // operation index -> operation index
     361           0 :         lastOpReference := make(map[objID]int)    // objID -> operation index
     362           0 :         for i, o := range ops {
     363           0 :                 // Find the last operation that involved the same receiver object. We at
     364           0 :                 // least need to wait on that operation.
     365           0 :                 receiver := o.receiver()
     366           0 :                 waitIndex, ok := lastOpReference[receiver]
     367           0 :                 lastOpReference[receiver] = i
     368           0 :                 if !ok {
     369           0 :                         // Only valid for i=0. For all other operations, the receiver should
     370           0 :                         // have been referenced by some other operation before it's used as
     371           0 :                         // a receiver.
     372           0 :                         if i != 0 {
     373           0 :                                 panic(fmt.Sprintf("op %d on receiver %s; first reference of %s", i, receiver, receiver))
     374             :                         }
     375           0 :                         continue
     376             :                 }
     377             : 
     378             :                 // The last operation that referenced `receiver` is the one at index
     379             :                 // `waitIndex`. All operations with the same receiver are performed on
     380             :                 // the same thread. We only need to synchronize on the operation at
     381             :                 // `waitIndex` if `receiver` isn't also the receiver on that operation
     382             :                 // too.
     383           0 :                 if ops[waitIndex].receiver() != receiver {
     384           0 :                         opsWaitOn[i] = append(opsWaitOn[i], waitIndex)
     385           0 :                 }
     386             : 
     387             :                 // In additional to synchronizing on the operation's receiver operation,
     388             :                 // we may need to synchronize on additional objects. For example,
     389             :                 // batch0.Commit() must synchronize its receiver, batch0, but also on
     390             :                 // dbObjID since it mutates database state.
     391           0 :                 for _, syncObjID := range o.syncObjs() {
     392           0 :                         if vi, vok := lastOpReference[syncObjID]; vok {
     393           0 :                                 opsWaitOn[i] = append(opsWaitOn[i], vi)
     394           0 :                         }
     395           0 :                         lastOpReference[syncObjID] = i
     396             :                 }
     397             : 
     398           0 :                 waitIndexes := opsWaitOn[i]
     399           0 :                 sort.Ints(waitIndexes)
     400           0 :                 for _, waitIndex := range waitIndexes {
     401           0 :                         // If this is the first operation that must wait on the operation at
     402           0 :                         // `waitIndex`, then there will be no channel for the operation yet.
     403           0 :                         // Create one.
     404           0 :                         if opsDone[waitIndex] == nil {
     405           0 :                                 opsDone[waitIndex] = make(chan struct{})
     406           0 :                         }
     407             :                 }
     408             :         }
     409           0 :         return opsWaitOn, opsDone
     410             : }

Generated by: LCOV version 1.14