Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : // Package rate provides a rate limiter. 6 : package rate // import "github.com/cockroachdb/pebble/internal/rate" 7 : 8 : import ( 9 : "sync" 10 : "time" 11 : 12 : "github.com/cockroachdb/tokenbucket" 13 : ) 14 : 15 : // A Limiter controls how frequently events are allowed to happen. 16 : // It implements a "token bucket" of size b, initially full and refilled 17 : // at rate r tokens per second. 18 : // 19 : // Informally, in any large enough time interval, the Limiter limits the 20 : // rate to r tokens per second, with a maximum burst size of b events. 21 : // 22 : // Limiter is thread-safe. 23 : type Limiter struct { 24 : mu struct { 25 : sync.Mutex 26 : tb tokenbucket.TokenBucket 27 : rate float64 28 : burst float64 29 : } 30 : sleepFn func(d time.Duration) 31 : } 32 : 33 : // NewLimiter returns a new Limiter that allows events up to rate r and permits 34 : // bursts of at most b tokens. 35 0 : func NewLimiter(r float64, b float64) *Limiter { 36 0 : l := &Limiter{} 37 0 : l.mu.tb.Init(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(b)) 38 0 : l.mu.rate = r 39 0 : l.mu.burst = b 40 0 : return l 41 0 : } 42 : 43 : // NewLimiterWithCustomTime returns a new Limiter that allows events up to rate 44 : // r and permits bursts of at most b tokens. The limiter uses the given 45 : // functions to retrieve the current time and to sleep (useful for testing). 46 : func NewLimiterWithCustomTime( 47 : r float64, b float64, nowFn func() time.Time, sleepFn func(d time.Duration), 48 0 : ) *Limiter { 49 0 : l := &Limiter{} 50 0 : l.mu.tb.InitWithNowFn(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(b), nowFn) 51 0 : l.mu.rate = r 52 0 : l.mu.burst = b 53 0 : l.sleepFn = sleepFn 54 0 : return l 55 0 : } 56 : 57 : // Wait sleeps until enough tokens are available. If n is more than the burst, 58 : // the token bucket will go into debt, delaying future operations. 59 0 : func (l *Limiter) Wait(n float64) { 60 0 : for { 61 0 : l.mu.Lock() 62 0 : ok, d := l.mu.tb.TryToFulfill(tokenbucket.Tokens(n)) 63 0 : l.mu.Unlock() 64 0 : if ok { 65 0 : return 66 0 : } 67 0 : if l.sleepFn != nil { 68 0 : l.sleepFn(d) 69 0 : } else { 70 0 : time.Sleep(d) 71 0 : } 72 : } 73 : } 74 : 75 : // Remove removes tokens for an operation that bypassed any waiting; it can put 76 : // the token bucket into debt, delaying future operations. 77 0 : func (l *Limiter) Remove(n float64) { 78 0 : l.mu.Lock() 79 0 : defer l.mu.Unlock() 80 0 : l.mu.tb.Adjust(-tokenbucket.Tokens(n)) 81 0 : } 82 : 83 : // Rate returns the current rate limit. 84 0 : func (l *Limiter) Rate() float64 { 85 0 : l.mu.Lock() 86 0 : defer l.mu.Unlock() 87 0 : return l.mu.rate 88 0 : } 89 : 90 : // SetRate updates the rate limit. 91 0 : func (l *Limiter) SetRate(r float64) { 92 0 : l.mu.Lock() 93 0 : defer l.mu.Unlock() 94 0 : l.mu.tb.UpdateConfig(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(l.mu.burst)) 95 0 : l.mu.rate = r 96 0 : }