LCOV - code coverage report
Current view: top level - pebble/vfs/errorfs - latency.go (source / functions) Hit Total Coverage
Test: 2024-12-29 08:16Z 87a5141c - tests only.lcov Lines: 67 76 88.2 %
Date: 2024-12-29 08:17:52 Functions: 0 0 -

          Line data    Source code
       1             : package errorfs
       2             : 
       3             : import (
       4             :         "encoding/binary"
       5             :         "fmt"
       6             :         "go/token"
       7             :         "hash/maphash"
       8             :         "math/rand/v2"
       9             :         "strconv"
      10             :         "sync"
      11             :         "sync/atomic"
      12             :         "time"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/dsl"
      16             : )
      17             : 
      18             : // RandomLatency constructs an Injector that does not inject errors but instead
      19             : // injects random latency into operations that match the provided predicate. The
      20             : // amount of latency injected follows an exponential distribution with the
      21             : // provided mean. Latency injected is derived from the provided seed and is
      22             : // deterministic with respect to each file's path.
      23             : //
      24             : // If limit is nonzero, total latency injected over the lifetime of the Injector
      25             : // is capped to limit.
      26           1 : func RandomLatency(pred Predicate, mean time.Duration, seed int64, limit time.Duration) Injector {
      27           1 :         rl := &randomLatency{
      28           1 :                 predicate: pred,
      29           1 :                 mean:      mean,
      30           1 :                 limit:     limit,
      31           1 :         }
      32           1 :         rl.keyedPrng.init(seed)
      33           1 :         return rl
      34           1 : }
      35             : 
      36           1 : func parseRandomLatency(p *Parser, s *dsl.Scanner) Injector {
      37           1 :         dur, err := time.ParseDuration(s.ConsumeString())
      38           1 :         if err != nil {
      39           1 :                 panic(errors.Newf("parsing RandomLatency: %s", err))
      40             :         }
      41           1 :         lit := s.Consume(token.INT).Lit
      42           1 :         seed, err := strconv.ParseInt(lit, 10, 64)
      43           1 :         if err != nil {
      44           0 :                 panic(err)
      45             :         }
      46           1 :         var pred Predicate
      47           1 :         tok := s.Scan()
      48           1 :         if tok.Kind == token.LPAREN || tok.Kind == token.IDENT {
      49           1 :                 pred = p.predicates.ParseFromPos(s, tok)
      50           1 :                 tok = s.Scan()
      51           1 :         }
      52           1 :         if tok.Kind != token.RPAREN {
      53           1 :                 panic(errors.Errorf("errorfs: unexpected token %s; expected %s", tok.String(), token.RPAREN))
      54             :         }
      55           1 :         return RandomLatency(pred, dur, seed, 0 /* no limit */)
      56             : }
      57             : 
      58             : type randomLatency struct {
      59             :         predicate Predicate
      60             :         // mean is the mean duration injected each operation.
      61             :         mean time.Duration
      62             :         // limit configures a limit on total latency injected over the lifetime of
      63             :         // the Injector if nonzero.
      64             :         limit time.Duration
      65             :         // agg is the aggregate latency injected over the lifetime of the Injector.
      66             :         agg atomic.Int64
      67             :         keyedPrng
      68             : }
      69             : 
      70           1 : func (rl *randomLatency) String() string {
      71           1 :         if rl.predicate == nil {
      72           1 :                 return fmt.Sprintf("(RandomLatency %q %d)", rl.mean, rl.rootSeed)
      73           1 :         }
      74           1 :         return fmt.Sprintf("(RandomLatency %q %d %s)", rl.mean, rl.rootSeed, rl.predicate)
      75             : }
      76             : 
      77           1 : func (rl *randomLatency) MaybeError(op Op) error {
      78           1 :         if rl.predicate != nil && !rl.predicate.Evaluate(op) {
      79           1 :                 return nil
      80           1 :         }
      81           1 :         var dur time.Duration
      82           1 :         rl.keyedPrng.withKey(op.Path, func(prng *rand.Rand) {
      83           1 :                 // We cap the max latency to 100x: Otherwise, it seems possible
      84           1 :                 // (although very unlikely) ExpFloat64 generates a multiplier high
      85           1 :                 // enough that causes a test timeout.
      86           1 :                 dur = time.Duration(min(prng.ExpFloat64(), 20.0) * float64(rl.mean))
      87           1 :         })
      88             : 
      89             :         // Apply a limit on total latency injected over the lifetime of the
      90             :         // Injector, if one is configured.
      91           1 :         if rl.limit > 0 {
      92           1 :                 if v := time.Duration(rl.agg.Add(int64(dur))); v-dur > rl.limit {
      93           0 :                         // We'd already exceeded the limit before adding dur. Don't inject
      94           0 :                         // anything.
      95           0 :                         return nil
      96           1 :                 } else if v > rl.limit {
      97           0 :                         // We're about to exceed the limit. Cap the duration.
      98           0 :                         dur -= v - rl.limit
      99           0 :                 }
     100             :         }
     101             : 
     102           1 :         time.Sleep(dur)
     103           1 :         return nil
     104             : }
     105             : 
     106             : // keyedPrng maintains a separate prng per-key that's deterministic with
     107             : // respect to the key: its behavior for a particular key is deterministic
     108             : // regardless of intervening evaluations for operations on other keys. This can
     109             : // be used to ensure determinism despite nondeterministic concurrency if the
     110             : // concurrency is constrained to separate keys.
     111             : type keyedPrng struct {
     112             :         rootSeed int64
     113             :         mu       struct {
     114             :                 sync.Mutex
     115             :                 h           maphash.Hash
     116             :                 perFilePrng map[string]*rand.Rand
     117             :         }
     118             : }
     119             : 
     120           1 : func (p *keyedPrng) init(rootSeed int64) {
     121           1 :         p.rootSeed = rootSeed
     122           1 :         p.mu.perFilePrng = make(map[string]*rand.Rand)
     123           1 : }
     124             : 
     125           1 : func (p *keyedPrng) withKey(key string, fn func(*rand.Rand)) {
     126           1 :         p.mu.Lock()
     127           1 :         defer p.mu.Unlock()
     128           1 :         prng, ok := p.mu.perFilePrng[key]
     129           1 :         if !ok {
     130           1 :                 // This is the first time an operation has been performed on the key.
     131           1 :                 // Initialize the per-key prng by computing a deterministic hash of the
     132           1 :                 // key.
     133           1 :                 p.mu.h.Reset()
     134           1 :                 var b [8]byte
     135           1 :                 binary.LittleEndian.PutUint64(b[:], uint64(p.rootSeed))
     136           1 :                 if _, err := p.mu.h.Write(b[:]); err != nil {
     137           0 :                         panic(err)
     138             :                 }
     139           1 :                 if _, err := p.mu.h.WriteString(key); err != nil {
     140           0 :                         panic(err)
     141             :                 }
     142           1 :                 seed := p.mu.h.Sum64()
     143           1 :                 prng = rand.New(rand.NewPCG(0, seed))
     144           1 :                 p.mu.perFilePrng[key] = prng
     145             :         }
     146           1 :         fn(prng)
     147             : }

Generated by: LCOV version 1.14