Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package main 6 : 7 : import ( 8 : "fmt" 9 : "math" 10 : "sync" 11 : "sync/atomic" 12 : "time" 13 : 14 : "github.com/cockroachdb/pebble/internal/rate" 15 : "golang.org/x/exp/rand" 16 : ) 17 : 18 : const ( 19 : // Max rate for all compactions. This is intentionally set low enough that 20 : // user writes will have to be delayed. 21 : maxCompactionRate = 80 << 20 // 80 MB/s 22 : 23 : memtableSize = 64 << 20 // 64 MB 24 : memtableStopThreshold = 2 * memtableSize 25 : maxWriteRate = 30 << 20 // 30 MB/s 26 : startingWriteRate = 30 << 20 // 30 MB/s 27 : 28 : l0SlowdownThreshold = 4 29 : l0CompactionThreshold = 1 30 : 31 : levelRatio = 10 32 : numLevels = 7 33 : 34 : // Slowdown threshold is set at the compaction debt incurred by the largest 35 : // possible compaction. 36 : compactionDebtSlowdownThreshold = memtableSize * (numLevels - 2) 37 : ) 38 : 39 : type compactionPacer struct { 40 : level atomic.Int64 41 : drainer *rate.Limiter 42 : } 43 : 44 0 : func newCompactionPacer() *compactionPacer { 45 0 : p := &compactionPacer{ 46 0 : drainer: rate.NewLimiter(maxCompactionRate, maxCompactionRate), 47 0 : } 48 0 : return p 49 0 : } 50 : 51 0 : func (p *compactionPacer) fill(n int64) { 52 0 : p.level.Add(n) 53 0 : } 54 : 55 0 : func (p *compactionPacer) drain(n int64) { 56 0 : p.drainer.Wait(float64(n)) 57 0 : 58 0 : p.level.Add(-n) 59 0 : } 60 : 61 : type flushPacer struct { 62 : level atomic.Int64 63 : memtableStopThreshold float64 64 : fillCond sync.Cond 65 : } 66 : 67 0 : func newFlushPacer(mu *sync.Mutex) *flushPacer { 68 0 : p := &flushPacer{ 69 0 : memtableStopThreshold: memtableStopThreshold, 70 0 : } 71 0 : p.fillCond.L = mu 72 0 : return p 73 0 : } 74 : 75 0 : func (p *flushPacer) fill(n int64) { 76 0 : for float64(p.level.Load()) >= p.memtableStopThreshold { 77 0 : p.fillCond.Wait() 78 0 : } 79 0 : p.level.Add(n) 80 0 : p.fillCond.Signal() 81 : } 82 : 83 0 : func (p *flushPacer) drain(n int64) { 84 0 : p.level.Add(-n) 85 0 : } 86 : 87 : // DB models a RocksDB DB. 88 : type DB struct { 89 : mu sync.Mutex 90 : flushPacer *flushPacer 91 : flushCond sync.Cond 92 : memtables []*int64 93 : fill atomic.Int64 94 : drain atomic.Int64 95 : 96 : compactionMu sync.Mutex 97 : compactionPacer *compactionPacer 98 : // L0 is represented as an array of integers whereas every other level 99 : // is represented as a single integer. 100 : L0 []*int64 101 : // Non-L0 sstables. sstables[0] == L1. 102 : sstables []atomic.Int64 103 : maxSSTableSizes []int64 104 : compactionFlushCond sync.Cond 105 : prevCompactionDebt float64 106 : previouslyInDebt bool 107 : 108 : writeLimiter *rate.Limiter 109 : } 110 : 111 0 : func newDB() *DB { 112 0 : db := &DB{} 113 0 : db.flushPacer = newFlushPacer(&db.mu) 114 0 : db.flushCond.L = &db.mu 115 0 : db.memtables = append(db.memtables, new(int64)) 116 0 : 117 0 : db.compactionFlushCond.L = &db.compactionMu 118 0 : db.L0 = append(db.L0, new(int64)) 119 0 : db.compactionPacer = newCompactionPacer() 120 0 : 121 0 : db.maxSSTableSizes = make([]int64, numLevels-1) 122 0 : db.sstables = make([]atomic.Int64, numLevels-1) 123 0 : base := int64(levelRatio) 124 0 : for i := uint64(0); i < numLevels-2; i++ { 125 0 : // Each level is 10 times larger than the one above it. 126 0 : db.maxSSTableSizes[i] = memtableSize * l0CompactionThreshold * base 127 0 : base *= levelRatio 128 0 : 129 0 : // Begin with each level full. 130 0 : newLevel := db.maxSSTableSizes[i] 131 0 : 132 0 : db.sstables[i].Store(newLevel) 133 0 : } 134 0 : db.sstables[numLevels-2].Store(0) 135 0 : db.maxSSTableSizes[numLevels-2] = math.MaxInt64 136 0 : 137 0 : db.writeLimiter = rate.NewLimiter(startingWriteRate, startingWriteRate) 138 0 : 139 0 : go db.drainMemtable() 140 0 : go db.drainCompaction() 141 0 : 142 0 : return db 143 : } 144 : 145 : // drainCompaction simulates background compactions. 146 0 : func (db *DB) drainCompaction() { 147 0 : rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) 148 0 : 149 0 : for { 150 0 : db.compactionMu.Lock() 151 0 : 152 0 : for len(db.L0) <= l0CompactionThreshold { 153 0 : db.compactionFlushCond.Wait() 154 0 : } 155 0 : l0Table := db.L0[0] 156 0 : db.compactionMu.Unlock() 157 0 : 158 0 : for i, size := int64(0), int64(0); i < *l0Table; i += size { 159 0 : size = 10000 + rng.Int63n(500) 160 0 : if size > (*l0Table - i) { 161 0 : size = *l0Table - i 162 0 : } 163 0 : db.compactionPacer.drain(size) 164 : } 165 : 166 0 : db.compactionMu.Lock() 167 0 : db.L0 = db.L0[1:] 168 0 : db.compactionMu.Unlock() 169 0 : 170 0 : singleTableSize := int64(memtableSize) 171 0 : tablesToCompact := 0 172 0 : for i := range db.sstables { 173 0 : newSSTableSize := db.sstables[i].Add(singleTableSize) 174 0 : if newSSTableSize > db.maxSSTableSizes[i] { 175 0 : db.sstables[i].Add(-singleTableSize) 176 0 : tablesToCompact++ 177 0 : } else { 178 0 : // Lower levels do not need compaction if level above it did not 179 0 : // need compaction. 180 0 : break 181 : } 182 : } 183 : 184 0 : totalCompactionBytes := int64(tablesToCompact * memtableSize) 185 0 : db.compactionPacer.fill(totalCompactionBytes) 186 0 : 187 0 : for t := 0; t < tablesToCompact; t++ { 188 0 : for i, size := int64(0), int64(0); i < memtableSize; i += size { 189 0 : size = 10000 + rng.Int63n(500) 190 0 : if size > (totalCompactionBytes - i) { 191 0 : size = totalCompactionBytes - i 192 0 : } 193 0 : db.compactionPacer.drain(size) 194 : } 195 : 196 0 : db.delayUserWrites() 197 : } 198 : } 199 : } 200 : 201 : // fillCompaction fills L0 sstables. 202 0 : func (db *DB) fillCompaction(size int64) { 203 0 : db.compactionMu.Lock() 204 0 : 205 0 : db.compactionPacer.fill(size) 206 0 : 207 0 : last := db.L0[len(db.L0)-1] 208 0 : if *last+size > memtableSize { 209 0 : last = new(int64) 210 0 : db.L0 = append(db.L0, last) 211 0 : db.compactionFlushCond.Signal() 212 0 : } 213 0 : *last += size 214 0 : 215 0 : db.compactionMu.Unlock() 216 : } 217 : 218 : // drainMemtable simulates memtable flushing. 219 0 : func (db *DB) drainMemtable() { 220 0 : rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) 221 0 : 222 0 : for { 223 0 : db.mu.Lock() 224 0 : for len(db.memtables) <= 1 { 225 0 : db.flushCond.Wait() 226 0 : } 227 0 : memtable := db.memtables[0] 228 0 : db.mu.Unlock() 229 0 : 230 0 : for i, size := int64(0), int64(0); i < *memtable; i += size { 231 0 : size = 1000 + rng.Int63n(50) 232 0 : if size > (*memtable - i) { 233 0 : size = *memtable - i 234 0 : } 235 0 : db.flushPacer.drain(size) 236 0 : db.drain.Add(size) 237 0 : 238 0 : db.fillCompaction(size) 239 : } 240 : 241 0 : db.delayUserWrites() 242 0 : 243 0 : db.mu.Lock() 244 0 : db.memtables = db.memtables[1:] 245 0 : db.mu.Unlock() 246 : } 247 : } 248 : 249 : // delayUserWrites applies write delays depending on compaction debt. 250 0 : func (db *DB) delayUserWrites() { 251 0 : totalCompactionBytes := db.compactionPacer.level.Load() 252 0 : compactionDebt := math.Max(float64(totalCompactionBytes)-l0CompactionThreshold*memtableSize, 0.0) 253 0 : 254 0 : db.mu.Lock() 255 0 : if len(db.L0) > l0SlowdownThreshold || compactionDebt > compactionDebtSlowdownThreshold { 256 0 : db.previouslyInDebt = true 257 0 : if compactionDebt > db.prevCompactionDebt { 258 0 : // Debt is growing. 259 0 : drainLimit := db.writeLimiter.Rate() * 0.8 260 0 : if drainLimit > 0 { 261 0 : db.writeLimiter.SetRate(drainLimit) 262 0 : } 263 0 : } else { 264 0 : // Debt is shrinking. 265 0 : drainLimit := db.writeLimiter.Rate() * 1 / 0.8 266 0 : if drainLimit <= maxWriteRate { 267 0 : db.writeLimiter.SetRate(drainLimit) 268 0 : } 269 : } 270 0 : } else if db.previouslyInDebt { 271 0 : // If compaction was previously delayed and has recovered, RocksDB 272 0 : // "rewards" the rate by double the slowdown ratio. 273 0 : 274 0 : // From RocksDB: 275 0 : // If the DB recovers from delay conditions, we reward with reducing 276 0 : // double the slowdown ratio. This is to balance the long term slowdown 277 0 : // increase signal. 278 0 : drainLimit := db.writeLimiter.Rate() * 1.4 279 0 : if drainLimit <= maxWriteRate { 280 0 : db.writeLimiter.SetRate(drainLimit) 281 0 : } 282 0 : db.previouslyInDebt = false 283 : } 284 : 285 0 : db.prevCompactionDebt = compactionDebt 286 0 : db.mu.Unlock() 287 : } 288 : 289 : // fillMemtable simulates memtable filling. 290 0 : func (db *DB) fillMemtable(size int64) { 291 0 : db.mu.Lock() 292 0 : 293 0 : db.flushPacer.fill(size) 294 0 : db.fill.Add(size) 295 0 : 296 0 : last := db.memtables[len(db.memtables)-1] 297 0 : if *last+size > memtableSize { 298 0 : last = new(int64) 299 0 : db.memtables = append(db.memtables, last) 300 0 : db.flushCond.Signal() 301 0 : } 302 0 : *last += size 303 0 : 304 0 : db.mu.Unlock() 305 : } 306 : 307 : // simulateWrite simulates user writes. 308 0 : func simulateWrite(db *DB) { 309 0 : limiter := rate.NewLimiter(10<<20, 10<<20) // 10 MB/s 310 0 : fmt.Printf("filling at 10 MB/sec\n") 311 0 : 312 0 : setRate := func(mb int) { 313 0 : fmt.Printf("filling at %d MB/sec\n", mb) 314 0 : limiter.SetRate(float64(mb << 20)) 315 0 : } 316 : 317 0 : go func() { 318 0 : rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) 319 0 : for { 320 0 : secs := 5 + rng.Intn(5) 321 0 : time.Sleep(time.Duration(secs) * time.Second) 322 0 : mb := 11 + rng.Intn(20) 323 0 : setRate(mb) 324 0 : } 325 : }() 326 : 327 0 : rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) 328 0 : 329 0 : for { 330 0 : size := 1000 + rng.Int63n(50) 331 0 : limiter.Wait(float64(size)) 332 0 : db.writeLimiter.Wait(float64(size)) 333 0 : db.fillMemtable(size) 334 0 : } 335 : } 336 : 337 0 : func main() { 338 0 : db := newDB() 339 0 : 340 0 : go simulateWrite(db) 341 0 : 342 0 : tick := time.NewTicker(time.Second) 343 0 : start := time.Now() 344 0 : lastNow := start 345 0 : var lastFill, lastDrain int64 346 0 : 347 0 : for i := 0; ; i++ { 348 0 : <-tick.C 349 0 : if (i % 20) == 0 { 350 0 : fmt.Printf("_elapsed___memtbs____dirty_____fill____drain____cdebt__l0count___max-w-rate\n") 351 0 : } 352 : 353 0 : db.mu.Lock() 354 0 : memtableCount := len(db.memtables) 355 0 : db.mu.Unlock() 356 0 : dirty := db.flushPacer.level.Load() 357 0 : fill := db.fill.Load() 358 0 : drain := db.drain.Load() 359 0 : 360 0 : db.compactionMu.Lock() 361 0 : compactionL0 := len(db.L0) 362 0 : db.compactionMu.Unlock() 363 0 : totalCompactionBytes := db.compactionPacer.level.Load() 364 0 : compactionDebt := math.Max(float64(totalCompactionBytes)-l0CompactionThreshold*memtableSize, 0.0) 365 0 : maxWriteRate := db.writeLimiter.Rate() 366 0 : 367 0 : now := time.Now() 368 0 : elapsed := now.Sub(lastNow).Seconds() 369 0 : fmt.Printf("%8s %8d %8.1f %8.1f %8.1f %8.1f %8d %12.1f\n", 370 0 : time.Duration(now.Sub(start).Seconds()+0.5)*time.Second, 371 0 : memtableCount, 372 0 : float64(dirty)/(1024.0*1024.0), 373 0 : float64(fill-lastFill)/(1024.0*1024.0*elapsed), 374 0 : float64(drain-lastDrain)/(1024.0*1024.0*elapsed), 375 0 : compactionDebt/(1024.0*1024.0), 376 0 : compactionL0, 377 0 : maxWriteRate/(1024.0*1024.0)) 378 0 : 379 0 : lastNow = now 380 0 : lastFill = fill 381 0 : lastDrain = drain 382 : } 383 : }