LCOV - code coverage report
Current view: top level - pebble/internal/rate - rate.go (source / functions) Hit Total Coverage
Test: 2023-11-11 08:11Z b224e8b9 - tests + meta.lcov Lines: 0 44 0.0 %
Date: 2023-11-11 08:12:06 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : // Package rate provides a rate limiter.
       6             : package rate // import "github.com/cockroachdb/pebble/internal/rate"
       7             : 
       8             : import (
       9             :         "sync"
      10             :         "time"
      11             : 
      12             :         "github.com/cockroachdb/tokenbucket"
      13             : )
      14             : 
      15             : // A Limiter controls how frequently events are allowed to happen.
      16             : // It implements a "token bucket" of size b, initially full and refilled
      17             : // at rate r tokens per second.
      18             : //
      19             : // Informally, in any large enough time interval, the Limiter limits the
      20             : // rate to r tokens per second, with a maximum burst size of b events.
      21             : //
      22             : // Limiter is thread-safe.
      23             : type Limiter struct {
      24             :         mu struct {
      25             :                 sync.Mutex
      26             :                 tb    tokenbucket.TokenBucket
      27             :                 rate  float64
      28             :                 burst float64
      29             :         }
      30             :         sleepFn func(d time.Duration)
      31             : }
      32             : 
      33             : // NewLimiter returns a new Limiter that allows events up to rate r and permits
      34             : // bursts of at most b tokens.
      35           0 : func NewLimiter(r float64, b float64) *Limiter {
      36           0 :         l := &Limiter{}
      37           0 :         l.mu.tb.Init(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(b))
      38           0 :         l.mu.rate = r
      39           0 :         l.mu.burst = b
      40           0 :         return l
      41           0 : }
      42             : 
      43             : // NewLimiterWithCustomTime returns a new Limiter that allows events up to rate
      44             : // r and permits bursts of at most b tokens. The limiter uses the given
      45             : // functions to retrieve the current time and to sleep (useful for testing).
      46             : func NewLimiterWithCustomTime(
      47             :         r float64, b float64, nowFn func() time.Time, sleepFn func(d time.Duration),
      48           0 : ) *Limiter {
      49           0 :         l := &Limiter{}
      50           0 :         l.mu.tb.InitWithNowFn(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(b), nowFn)
      51           0 :         l.mu.rate = r
      52           0 :         l.mu.burst = b
      53           0 :         l.sleepFn = sleepFn
      54           0 :         return l
      55           0 : }
      56             : 
      57             : // Wait sleeps until enough tokens are available. If n is more than the burst,
      58             : // the token bucket will go into debt, delaying future operations.
      59           0 : func (l *Limiter) Wait(n float64) {
      60           0 :         for {
      61           0 :                 l.mu.Lock()
      62           0 :                 ok, d := l.mu.tb.TryToFulfill(tokenbucket.Tokens(n))
      63           0 :                 l.mu.Unlock()
      64           0 :                 if ok {
      65           0 :                         return
      66           0 :                 }
      67           0 :                 if l.sleepFn != nil {
      68           0 :                         l.sleepFn(d)
      69           0 :                 } else {
      70           0 :                         time.Sleep(d)
      71           0 :                 }
      72             :         }
      73             : }
      74             : 
      75             : // Remove removes tokens for an operation that bypassed any waiting; it can put
      76             : // the token bucket into debt, delaying future operations.
      77           0 : func (l *Limiter) Remove(n float64) {
      78           0 :         l.mu.Lock()
      79           0 :         defer l.mu.Unlock()
      80           0 :         l.mu.tb.Adjust(-tokenbucket.Tokens(n))
      81           0 : }
      82             : 
      83             : // Rate returns the current rate limit.
      84           0 : func (l *Limiter) Rate() float64 {
      85           0 :         l.mu.Lock()
      86           0 :         defer l.mu.Unlock()
      87           0 :         return l.mu.rate
      88           0 : }
      89             : 
      90             : // SetRate updates the rate limit.
      91           0 : func (l *Limiter) SetRate(r float64) {
      92           0 :         l.mu.Lock()
      93           0 :         defer l.mu.Unlock()
      94           0 :         l.mu.tb.UpdateConfig(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(l.mu.burst))
      95           0 :         l.mu.rate = r
      96           0 : }

Generated by: LCOV version 1.14