LCOV - code coverage report
Current view: top level - pebble/internal/pacertoy/rocksdb - main.go (source / functions) Hit Total Coverage
Test: 2024-03-13 08:16Z d938cdc6 - tests + meta.lcov Lines: 0 262 0.0 %
Date: 2024-03-13 08:16:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package main
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "math"
      10             :         "sync"
      11             :         "sync/atomic"
      12             :         "time"
      13             : 
      14             :         "github.com/cockroachdb/pebble/internal/rate"
      15             :         "golang.org/x/exp/rand"
      16             : )
      17             : 
      18             : const (
      19             :         // Max rate for all compactions. This is intentionally set low enough that
      20             :         // user writes will have to be delayed.
      21             :         maxCompactionRate = 80 << 20 // 80 MB/s
      22             : 
      23             :         memtableSize          = 64 << 20 // 64 MB
      24             :         memtableStopThreshold = 2 * memtableSize
      25             :         maxWriteRate          = 30 << 20 // 30 MB/s
      26             :         startingWriteRate     = 30 << 20 // 30 MB/s
      27             : 
      28             :         l0SlowdownThreshold   = 4
      29             :         l0CompactionThreshold = 1
      30             : 
      31             :         levelRatio = 10
      32             :         numLevels  = 7
      33             : 
      34             :         // Slowdown threshold is set at the compaction debt incurred by the largest
      35             :         // possible compaction.
      36             :         compactionDebtSlowdownThreshold = memtableSize * (numLevels - 2)
      37             : )
      38             : 
      39             : type compactionPacer struct {
      40             :         level   atomic.Int64
      41             :         drainer *rate.Limiter
      42             : }
      43             : 
      44           0 : func newCompactionPacer() *compactionPacer {
      45           0 :         p := &compactionPacer{
      46           0 :                 drainer: rate.NewLimiter(maxCompactionRate, maxCompactionRate),
      47           0 :         }
      48           0 :         return p
      49           0 : }
      50             : 
      51           0 : func (p *compactionPacer) fill(n int64) {
      52           0 :         p.level.Add(n)
      53           0 : }
      54             : 
      55           0 : func (p *compactionPacer) drain(n int64) {
      56           0 :         p.drainer.Wait(float64(n))
      57           0 : 
      58           0 :         p.level.Add(-n)
      59           0 : }
      60             : 
      61             : type flushPacer struct {
      62             :         level                 atomic.Int64
      63             :         memtableStopThreshold float64
      64             :         fillCond              sync.Cond
      65             : }
      66             : 
      67           0 : func newFlushPacer(mu *sync.Mutex) *flushPacer {
      68           0 :         p := &flushPacer{
      69           0 :                 memtableStopThreshold: memtableStopThreshold,
      70           0 :         }
      71           0 :         p.fillCond.L = mu
      72           0 :         return p
      73           0 : }
      74             : 
      75           0 : func (p *flushPacer) fill(n int64) {
      76           0 :         for float64(p.level.Load()) >= p.memtableStopThreshold {
      77           0 :                 p.fillCond.Wait()
      78           0 :         }
      79           0 :         p.level.Add(n)
      80           0 :         p.fillCond.Signal()
      81             : }
      82             : 
      83           0 : func (p *flushPacer) drain(n int64) {
      84           0 :         p.level.Add(-n)
      85           0 : }
      86             : 
      87             : // DB models a RocksDB DB.
      88             : type DB struct {
      89             :         mu         sync.Mutex
      90             :         flushPacer *flushPacer
      91             :         flushCond  sync.Cond
      92             :         memtables  []*int64
      93             :         fill       atomic.Int64
      94             :         drain      atomic.Int64
      95             : 
      96             :         compactionMu    sync.Mutex
      97             :         compactionPacer *compactionPacer
      98             :         // L0 is represented as an array of integers whereas every other level
      99             :         // is represented as a single integer.
     100             :         L0 []*int64
     101             :         // Non-L0 sstables. sstables[0] == L1.
     102             :         sstables            []atomic.Int64
     103             :         maxSSTableSizes     []int64
     104             :         compactionFlushCond sync.Cond
     105             :         prevCompactionDebt  float64
     106             :         previouslyInDebt    bool
     107             : 
     108             :         writeLimiter *rate.Limiter
     109             : }
     110             : 
     111           0 : func newDB() *DB {
     112           0 :         db := &DB{}
     113           0 :         db.flushPacer = newFlushPacer(&db.mu)
     114           0 :         db.flushCond.L = &db.mu
     115           0 :         db.memtables = append(db.memtables, new(int64))
     116           0 : 
     117           0 :         db.compactionFlushCond.L = &db.compactionMu
     118           0 :         db.L0 = append(db.L0, new(int64))
     119           0 :         db.compactionPacer = newCompactionPacer()
     120           0 : 
     121           0 :         db.maxSSTableSizes = make([]int64, numLevels-1)
     122           0 :         db.sstables = make([]atomic.Int64, numLevels-1)
     123           0 :         base := int64(levelRatio)
     124           0 :         for i := uint64(0); i < numLevels-2; i++ {
     125           0 :                 // Each level is 10 times larger than the one above it.
     126           0 :                 db.maxSSTableSizes[i] = memtableSize * l0CompactionThreshold * base
     127           0 :                 base *= levelRatio
     128           0 : 
     129           0 :                 // Begin with each level full.
     130           0 :                 newLevel := db.maxSSTableSizes[i]
     131           0 : 
     132           0 :                 db.sstables[i].Store(newLevel)
     133           0 :         }
     134           0 :         db.sstables[numLevels-2].Store(0)
     135           0 :         db.maxSSTableSizes[numLevels-2] = math.MaxInt64
     136           0 : 
     137           0 :         db.writeLimiter = rate.NewLimiter(startingWriteRate, startingWriteRate)
     138           0 : 
     139           0 :         go db.drainMemtable()
     140           0 :         go db.drainCompaction()
     141           0 : 
     142           0 :         return db
     143             : }
     144             : 
     145             : // drainCompaction simulates background compactions.
     146           0 : func (db *DB) drainCompaction() {
     147           0 :         rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
     148           0 : 
     149           0 :         for {
     150           0 :                 db.compactionMu.Lock()
     151           0 : 
     152           0 :                 for len(db.L0) <= l0CompactionThreshold {
     153           0 :                         db.compactionFlushCond.Wait()
     154           0 :                 }
     155           0 :                 l0Table := db.L0[0]
     156           0 :                 db.compactionMu.Unlock()
     157           0 : 
     158           0 :                 for i, size := int64(0), int64(0); i < *l0Table; i += size {
     159           0 :                         size = 10000 + rng.Int63n(500)
     160           0 :                         if size > (*l0Table - i) {
     161           0 :                                 size = *l0Table - i
     162           0 :                         }
     163           0 :                         db.compactionPacer.drain(size)
     164             :                 }
     165             : 
     166           0 :                 db.compactionMu.Lock()
     167           0 :                 db.L0 = db.L0[1:]
     168           0 :                 db.compactionMu.Unlock()
     169           0 : 
     170           0 :                 singleTableSize := int64(memtableSize)
     171           0 :                 tablesToCompact := 0
     172           0 :                 for i := range db.sstables {
     173           0 :                         newSSTableSize := db.sstables[i].Add(singleTableSize)
     174           0 :                         if newSSTableSize > db.maxSSTableSizes[i] {
     175           0 :                                 db.sstables[i].Add(-singleTableSize)
     176           0 :                                 tablesToCompact++
     177           0 :                         } else {
     178           0 :                                 // Lower levels do not need compaction if level above it did not
     179           0 :                                 // need compaction.
     180           0 :                                 break
     181             :                         }
     182             :                 }
     183             : 
     184           0 :                 totalCompactionBytes := int64(tablesToCompact * memtableSize)
     185           0 :                 db.compactionPacer.fill(totalCompactionBytes)
     186           0 : 
     187           0 :                 for t := 0; t < tablesToCompact; t++ {
     188           0 :                         for i, size := int64(0), int64(0); i < memtableSize; i += size {
     189           0 :                                 size = 10000 + rng.Int63n(500)
     190           0 :                                 if size > (totalCompactionBytes - i) {
     191           0 :                                         size = totalCompactionBytes - i
     192           0 :                                 }
     193           0 :                                 db.compactionPacer.drain(size)
     194             :                         }
     195             : 
     196           0 :                         db.delayUserWrites()
     197             :                 }
     198             :         }
     199             : }
     200             : 
     201             : // fillCompaction fills L0 sstables.
     202           0 : func (db *DB) fillCompaction(size int64) {
     203           0 :         db.compactionMu.Lock()
     204           0 : 
     205           0 :         db.compactionPacer.fill(size)
     206           0 : 
     207           0 :         last := db.L0[len(db.L0)-1]
     208           0 :         if *last+size > memtableSize {
     209           0 :                 last = new(int64)
     210           0 :                 db.L0 = append(db.L0, last)
     211           0 :                 db.compactionFlushCond.Signal()
     212           0 :         }
     213           0 :         *last += size
     214           0 : 
     215           0 :         db.compactionMu.Unlock()
     216             : }
     217             : 
     218             : // drainMemtable simulates memtable flushing.
     219           0 : func (db *DB) drainMemtable() {
     220           0 :         rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
     221           0 : 
     222           0 :         for {
     223           0 :                 db.mu.Lock()
     224           0 :                 for len(db.memtables) <= 1 {
     225           0 :                         db.flushCond.Wait()
     226           0 :                 }
     227           0 :                 memtable := db.memtables[0]
     228           0 :                 db.mu.Unlock()
     229           0 : 
     230           0 :                 for i, size := int64(0), int64(0); i < *memtable; i += size {
     231           0 :                         size = 1000 + rng.Int63n(50)
     232           0 :                         if size > (*memtable - i) {
     233           0 :                                 size = *memtable - i
     234           0 :                         }
     235           0 :                         db.flushPacer.drain(size)
     236           0 :                         db.drain.Add(size)
     237           0 : 
     238           0 :                         db.fillCompaction(size)
     239             :                 }
     240             : 
     241           0 :                 db.delayUserWrites()
     242           0 : 
     243           0 :                 db.mu.Lock()
     244           0 :                 db.memtables = db.memtables[1:]
     245           0 :                 db.mu.Unlock()
     246             :         }
     247             : }
     248             : 
     249             : // delayUserWrites applies write delays depending on compaction debt.
     250           0 : func (db *DB) delayUserWrites() {
     251           0 :         totalCompactionBytes := db.compactionPacer.level.Load()
     252           0 :         compactionDebt := math.Max(float64(totalCompactionBytes)-l0CompactionThreshold*memtableSize, 0.0)
     253           0 : 
     254           0 :         db.mu.Lock()
     255           0 :         if len(db.L0) > l0SlowdownThreshold || compactionDebt > compactionDebtSlowdownThreshold {
     256           0 :                 db.previouslyInDebt = true
     257           0 :                 if compactionDebt > db.prevCompactionDebt {
     258           0 :                         // Debt is growing.
     259           0 :                         drainLimit := db.writeLimiter.Rate() * 0.8
     260           0 :                         if drainLimit > 0 {
     261           0 :                                 db.writeLimiter.SetRate(drainLimit)
     262           0 :                         }
     263           0 :                 } else {
     264           0 :                         // Debt is shrinking.
     265           0 :                         drainLimit := db.writeLimiter.Rate() * 1 / 0.8
     266           0 :                         if drainLimit <= maxWriteRate {
     267           0 :                                 db.writeLimiter.SetRate(drainLimit)
     268           0 :                         }
     269             :                 }
     270           0 :         } else if db.previouslyInDebt {
     271           0 :                 // If compaction was previously delayed and has recovered, RocksDB
     272           0 :                 // "rewards" the rate by double the slowdown ratio.
     273           0 : 
     274           0 :                 // From RocksDB:
     275           0 :                 // If the DB recovers from delay conditions, we reward with reducing
     276           0 :                 // double the slowdown ratio. This is to balance the long term slowdown
     277           0 :                 // increase signal.
     278           0 :                 drainLimit := db.writeLimiter.Rate() * 1.4
     279           0 :                 if drainLimit <= maxWriteRate {
     280           0 :                         db.writeLimiter.SetRate(drainLimit)
     281           0 :                 }
     282           0 :                 db.previouslyInDebt = false
     283             :         }
     284             : 
     285           0 :         db.prevCompactionDebt = compactionDebt
     286           0 :         db.mu.Unlock()
     287             : }
     288             : 
     289             : // fillMemtable simulates memtable filling.
     290           0 : func (db *DB) fillMemtable(size int64) {
     291           0 :         db.mu.Lock()
     292           0 : 
     293           0 :         db.flushPacer.fill(size)
     294           0 :         db.fill.Add(size)
     295           0 : 
     296           0 :         last := db.memtables[len(db.memtables)-1]
     297           0 :         if *last+size > memtableSize {
     298           0 :                 last = new(int64)
     299           0 :                 db.memtables = append(db.memtables, last)
     300           0 :                 db.flushCond.Signal()
     301           0 :         }
     302           0 :         *last += size
     303           0 : 
     304           0 :         db.mu.Unlock()
     305             : }
     306             : 
     307             : // simulateWrite simulates user writes.
     308           0 : func simulateWrite(db *DB) {
     309           0 :         limiter := rate.NewLimiter(10<<20, 10<<20) // 10 MB/s
     310           0 :         fmt.Printf("filling at 10 MB/sec\n")
     311           0 : 
     312           0 :         setRate := func(mb int) {
     313           0 :                 fmt.Printf("filling at %d MB/sec\n", mb)
     314           0 :                 limiter.SetRate(float64(mb << 20))
     315           0 :         }
     316             : 
     317           0 :         go func() {
     318           0 :                 rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
     319           0 :                 for {
     320           0 :                         secs := 5 + rng.Intn(5)
     321           0 :                         time.Sleep(time.Duration(secs) * time.Second)
     322           0 :                         mb := 11 + rng.Intn(20)
     323           0 :                         setRate(mb)
     324           0 :                 }
     325             :         }()
     326             : 
     327           0 :         rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
     328           0 : 
     329           0 :         for {
     330           0 :                 size := 1000 + rng.Int63n(50)
     331           0 :                 limiter.Wait(float64(size))
     332           0 :                 db.writeLimiter.Wait(float64(size))
     333           0 :                 db.fillMemtable(size)
     334           0 :         }
     335             : }
     336             : 
     337           0 : func main() {
     338           0 :         db := newDB()
     339           0 : 
     340           0 :         go simulateWrite(db)
     341           0 : 
     342           0 :         tick := time.NewTicker(time.Second)
     343           0 :         start := time.Now()
     344           0 :         lastNow := start
     345           0 :         var lastFill, lastDrain int64
     346           0 : 
     347           0 :         for i := 0; ; i++ {
     348           0 :                 <-tick.C
     349           0 :                 if (i % 20) == 0 {
     350           0 :                         fmt.Printf("_elapsed___memtbs____dirty_____fill____drain____cdebt__l0count___max-w-rate\n")
     351           0 :                 }
     352             : 
     353           0 :                 db.mu.Lock()
     354           0 :                 memtableCount := len(db.memtables)
     355           0 :                 db.mu.Unlock()
     356           0 :                 dirty := db.flushPacer.level.Load()
     357           0 :                 fill := db.fill.Load()
     358           0 :                 drain := db.drain.Load()
     359           0 : 
     360           0 :                 db.compactionMu.Lock()
     361           0 :                 compactionL0 := len(db.L0)
     362           0 :                 db.compactionMu.Unlock()
     363           0 :                 totalCompactionBytes := db.compactionPacer.level.Load()
     364           0 :                 compactionDebt := math.Max(float64(totalCompactionBytes)-l0CompactionThreshold*memtableSize, 0.0)
     365           0 :                 maxWriteRate := db.writeLimiter.Rate()
     366           0 : 
     367           0 :                 now := time.Now()
     368           0 :                 elapsed := now.Sub(lastNow).Seconds()
     369           0 :                 fmt.Printf("%8s %8d %8.1f %8.1f %8.1f %8.1f %8d %12.1f\n",
     370           0 :                         time.Duration(now.Sub(start).Seconds()+0.5)*time.Second,
     371           0 :                         memtableCount,
     372           0 :                         float64(dirty)/(1024.0*1024.0),
     373           0 :                         float64(fill-lastFill)/(1024.0*1024.0*elapsed),
     374           0 :                         float64(drain-lastDrain)/(1024.0*1024.0*elapsed),
     375           0 :                         compactionDebt/(1024.0*1024.0),
     376           0 :                         compactionL0,
     377           0 :                         maxWriteRate/(1024.0*1024.0))
     378           0 : 
     379           0 :                 lastNow = now
     380           0 :                 lastFill = fill
     381           0 :                 lastDrain = drain
     382             :         }
     383             : }

Generated by: LCOV version 1.14