Line data Source code
1 : package errorfs 2 : 3 : import ( 4 : "encoding/binary" 5 : "fmt" 6 : "go/token" 7 : "hash/maphash" 8 : "math/rand/v2" 9 : "strconv" 10 : "sync" 11 : "sync/atomic" 12 : "time" 13 : 14 : "github.com/cockroachdb/errors" 15 : "github.com/cockroachdb/pebble/internal/dsl" 16 : ) 17 : 18 : // RandomLatency constructs an Injector that does not inject errors but instead 19 : // injects random latency into operations that match the provided predicate. The 20 : // amount of latency injected follows an exponential distribution with the 21 : // provided mean. Latency injected is derived from the provided seed and is 22 : // deterministic with respect to each file's path. 23 : // 24 : // If limit is nonzero, total latency injected over the lifetime of the Injector 25 : // is capped to limit. 26 1 : func RandomLatency(pred Predicate, mean time.Duration, seed int64, limit time.Duration) Injector { 27 1 : rl := &randomLatency{ 28 1 : predicate: pred, 29 1 : mean: mean, 30 1 : limit: limit, 31 1 : } 32 1 : rl.keyedPrng.init(seed) 33 1 : return rl 34 1 : } 35 : 36 0 : func parseRandomLatency(p *Parser, s *dsl.Scanner) Injector { 37 0 : dur, err := time.ParseDuration(s.ConsumeString()) 38 0 : if err != nil { 39 0 : panic(errors.Newf("parsing RandomLatency: %s", err)) 40 : } 41 0 : lit := s.Consume(token.INT).Lit 42 0 : seed, err := strconv.ParseInt(lit, 10, 64) 43 0 : if err != nil { 44 0 : panic(err) 45 : } 46 0 : var pred Predicate 47 0 : tok := s.Scan() 48 0 : if tok.Kind == token.LPAREN || tok.Kind == token.IDENT { 49 0 : pred = p.predicates.ParseFromPos(s, tok) 50 0 : tok = s.Scan() 51 0 : } 52 0 : if tok.Kind != token.RPAREN { 53 0 : panic(errors.Errorf("errorfs: unexpected token %s; expected %s", tok.String(), token.RPAREN)) 54 : } 55 0 : return RandomLatency(pred, dur, seed, 0 /* no limit */) 56 : } 57 : 58 : type randomLatency struct { 59 : predicate Predicate 60 : // mean is the mean duration injected each operation. 61 : mean time.Duration 62 : // limit configures a limit on total latency injected over the lifetime of 63 : // the Injector if nonzero. 64 : limit time.Duration 65 : // agg is the aggregate latency injected over the lifetime of the Injector. 66 : agg atomic.Int64 67 : keyedPrng 68 : } 69 : 70 0 : func (rl *randomLatency) String() string { 71 0 : if rl.predicate == nil { 72 0 : return fmt.Sprintf("(RandomLatency %q %d)", rl.mean, rl.rootSeed) 73 0 : } 74 0 : return fmt.Sprintf("(RandomLatency %q %d %s)", rl.mean, rl.rootSeed, rl.predicate) 75 : } 76 : 77 1 : func (rl *randomLatency) MaybeError(op Op) error { 78 1 : if rl.predicate != nil && !rl.predicate.Evaluate(op) { 79 1 : return nil 80 1 : } 81 1 : var dur time.Duration 82 1 : rl.keyedPrng.withKey(op.Path, func(prng *rand.Rand) { 83 1 : // We cap the max latency to 100x: Otherwise, it seems possible 84 1 : // (although very unlikely) ExpFloat64 generates a multiplier high 85 1 : // enough that causes a test timeout. 86 1 : dur = time.Duration(min(prng.ExpFloat64(), 20.0) * float64(rl.mean)) 87 1 : }) 88 : 89 : // Apply a limit on total latency injected over the lifetime of the 90 : // Injector, if one is configured. 91 1 : if rl.limit > 0 { 92 1 : if v := time.Duration(rl.agg.Add(int64(dur))); v-dur > rl.limit { 93 1 : // We'd already exceeded the limit before adding dur. Don't inject 94 1 : // anything. 95 1 : return nil 96 1 : } else if v > rl.limit { 97 1 : // We're about to exceed the limit. Cap the duration. 98 1 : dur -= v - rl.limit 99 1 : } 100 : } 101 : 102 1 : time.Sleep(dur) 103 1 : return nil 104 : } 105 : 106 : // keyedPrng maintains a separate prng per-key that's deterministic with 107 : // respect to the key: its behavior for a particular key is deterministic 108 : // regardless of intervening evaluations for operations on other keys. This can 109 : // be used to ensure determinism despite nondeterministic concurrency if the 110 : // concurrency is constrained to separate keys. 111 : type keyedPrng struct { 112 : rootSeed int64 113 : mu struct { 114 : sync.Mutex 115 : h maphash.Hash 116 : perFilePrng map[string]*rand.Rand 117 : } 118 : } 119 : 120 1 : func (p *keyedPrng) init(rootSeed int64) { 121 1 : p.rootSeed = rootSeed 122 1 : p.mu.perFilePrng = make(map[string]*rand.Rand) 123 1 : } 124 : 125 1 : func (p *keyedPrng) withKey(key string, fn func(*rand.Rand)) { 126 1 : p.mu.Lock() 127 1 : defer p.mu.Unlock() 128 1 : prng, ok := p.mu.perFilePrng[key] 129 1 : if !ok { 130 1 : // This is the first time an operation has been performed on the key. 131 1 : // Initialize the per-key prng by computing a deterministic hash of the 132 1 : // key. 133 1 : p.mu.h.Reset() 134 1 : var b [8]byte 135 1 : binary.LittleEndian.PutUint64(b[:], uint64(p.rootSeed)) 136 1 : if _, err := p.mu.h.Write(b[:]); err != nil { 137 0 : panic(err) 138 : } 139 1 : if _, err := p.mu.h.WriteString(key); err != nil { 140 0 : panic(err) 141 : } 142 1 : seed := p.mu.h.Sum64() 143 1 : prng = rand.New(rand.NewPCG(0, seed)) 144 1 : p.mu.perFilePrng[key] = prng 145 : } 146 1 : fn(prng) 147 : }