LCOV - code coverage report
Current view: top level - pebble - compaction_scheduler.go (source / functions) Coverage Total Hit
Test: 2025-10-08 08:18Z 26c861d9 - tests + meta.lcov Lines: 99.5 % 193 192
Test Date: 2025-10-08 08:20:38 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "sync"
       9              :         "time"
      10              : 
      11              :         "github.com/cockroachdb/pebble/internal/base"
      12              : )
      13              : 
      14              : type CompactionGrantHandle = base.CompactionGrantHandle
      15              : type CompactionGrantHandleStats = base.CompactionGrantHandleStats
      16              : type CompactionGoroutineKind = base.CompactionGoroutineKind
      17              : 
      18              : const (
      19              :         CompactionGoroutinePrimary           = base.CompactionGoroutinePrimary
      20              :         CompactionGoroutineSSTableSecondary  = base.CompactionGoroutineSSTableSecondary
      21              :         CompactionGoroutineBlobFileSecondary = base.CompactionGoroutineBlobFileSecondary
      22              : )
      23              : 
      24              : // NB: This interface is experimental and subject to change.
      25              : //
      26              : // For instance, we may incorporate more information in TrySchedule and in the
      27              : // return value of Schedule to tell CompactionScheduler of the sub-category of
      28              : // compaction so that the scheduler can have more granular estimates. For
      29              : // example, the input or output level could affect the write bandwidth if the
      30              : // inputs are better cached (say at higher levels).
      31              : 
      32              : // CompactionScheduler is responsible for scheduling both automatic and manual
      33              : // compactions. In the case of multiple DB instances on a node (i.e. a
      34              : // multi-store configuration), implementations of CompactionScheduler may
      35              : // enforce a global maximum compaction concurrency. Additionally,
      36              : // implementations of CompactionScheduler may be resource aware and permit
      37              : // more than the compactions that are "allowed without permission" if
      38              : // resources are available.
      39              : //
      40              : // Locking: CompactionScheduler's mutexes are ordered after DBForCompaction
      41              : // mutexes. We need to specify some lock ordering since CompactionScheduler
      42              : // and DBForCompaction call into each other. This ordering choice is made to
      43              : // simplify the implementation of DBForCompaction. There are three exceptions
      44              : // to this DBForCompaction.GetAllowedWithoutPermission,
      45              : // CompactionScheduler.Unregister, CompactionGrantHandle.Done -- see those
      46              : // declarations for details.
      47              : type CompactionScheduler interface {
      48              :         // Register is called to register this DB and to specify the number of
      49              :         // goroutines that consume CPU in each compaction (see the CPU reporting
      50              :         // interface, CompactionGrantHandle.MeasureCPU). Must be called exactly once
      51              :         // by this DB if it successfully opens.
      52              :         Register(numGoroutinesPerCompaction int, db DBForCompaction)
      53              :         // Unregister is used to unregister the DB. Must be called once when the DB
      54              :         // is being closed. Unregister waits until all ongoing calls to
      55              :         // DBForCompaction are finished, so Unregister must not be called while
      56              :         // holding locks that DBForCompaction acquires in those calls.
      57              :         Unregister()
      58              :         // TrySchedule is called by DB when it wants to run a compaction. The bool
      59              :         // is true iff permission is granted, and in that case the
      60              :         // CompactionGrantHandle needs to be exercised by the DB.
      61              :         TrySchedule() (bool, CompactionGrantHandle)
      62              :         // UpdateGetAllowedWithoutPermission is to inform the scheduler that some
      63              :         // external behavior may have caused this value to change. It exists because
      64              :         // flushes are not otherwise visible to the CompactionScheduler, and can
      65              :         // cause the value to increase. CompactionScheduler implementation should do
      66              :         // periodic sampling (e.g. as done by
      67              :         // ConcurrencyLimitScheduler.periodicGranter), but this provides an
      68              :         // instantaneous opportunity to act.
      69              :         UpdateGetAllowedWithoutPermission()
      70              : }
      71              : 
      72              : // DBForCompaction is the interface implemented by the DB to interact with the
      73              : // CompactionScheduler.
      74              : type DBForCompaction interface {
      75              :         // GetAllowedWithoutPermission returns what is permitted at the DB-level
      76              :         // (there may be further restrictions at the node level, when there are
      77              :         // multiple DBs at a node, which is not captured by this number). This can
      78              :         // vary based on compaction backlog or other factors. This method must not
      79              :         // acquire any mutex in DBForCompaction that is covered by the general mutex
      80              :         // ordering rule stated earlier.
      81              :         GetAllowedWithoutPermission() int
      82              :         // GetWaitingCompaction returns true iff the DB can run a compaction. The
      83              :         // true return is accompanied by a populated WaitingForCompaction, that the
      84              :         // scheduler can use to pick across DBs or other work in the system. This
      85              :         // method should typically be efficient, in that the DB should try to cache
      86              :         // some state if its previous call to TrySchedule resulted in a failure to
      87              :         // get permission. It is ok if it is sometimes slow since all work scheduled
      88              :         // by CompactionScheduler is long-lived (often executing for multiple
      89              :         // seconds).
      90              :         GetWaitingCompaction() (bool, WaitingCompaction)
      91              :         // Schedule grants the DB permission to run a compaction. The DB returns
      92              :         // true iff it accepts the grant, in which case it must exercise the
      93              :         // CompactionGrantHandle.
      94              :         Schedule(CompactionGrantHandle) bool
      95              : }
      96              : 
      97              : // WaitingCompaction captures state for a compaction that can be used to
      98              : // prioritize wrt compactions in other DBs or other long-lived work in the
      99              : // system.
     100              : type WaitingCompaction struct {
     101              :         // Optional is true for a compaction that isn't necessary for maintaining an
     102              :         // overall healthy LSM. This value can be compared across compactions and
     103              :         // other long-lived work.
     104              :         Optional bool
     105              :         // Priority is the priority of a compaction. It is only compared across
     106              :         // compactions, and when the Optional value is the same.
     107              :         Priority int
     108              :         // Score is only compared across compactions. It is only compared across
     109              :         // compactions, and when the Optional and Priority are the same.
     110              :         Score float64
     111              : }
     112              : 
     113              : // Ordering is by priority and if the optional value is different, false is
     114              : // more important than true.
     115              : //
     116              : // The ordering here must be consistent with the order in which compactions
     117              : // are picked in compactionPickerByScore.pickAuto.
     118              : type compactionOptionalAndPriority struct {
     119              :         optional bool
     120              :         priority int
     121              : }
     122              : 
     123              : var scheduledCompactionMap map[compactionKind]compactionOptionalAndPriority
     124              : var manualCompactionPriority int
     125              : 
     126            2 : func init() {
     127            2 :         // Manual compactions have priority just below the score-rebased
     128            2 :         // compactions, since DB.pickAnyCompaction first picks score-based
     129            2 :         // compactions, and then manual compactions.
     130            2 :         manualCompactionPriority = 70
     131            2 :         scheduledCompactionMap = map[compactionKind]compactionOptionalAndPriority{}
     132            2 :         // Score-based-compactions have priorities {100, 90, 80}.
     133            2 :         //
     134            2 :         // We don't actually know if it is a compactionKindMove or
     135            2 :         // compactionKindCopy until a compactionKindDefault is turned from a
     136            2 :         // pickedCompaction into a compaction struct. So we will never see those
     137            2 :         // values here, but for completeness we include them.
     138            2 :         scheduledCompactionMap[compactionKindMove] = compactionOptionalAndPriority{priority: 100}
     139            2 :         scheduledCompactionMap[compactionKindCopy] = compactionOptionalAndPriority{priority: 90}
     140            2 :         scheduledCompactionMap[compactionKindDefault] = compactionOptionalAndPriority{priority: 80}
     141            2 :         scheduledCompactionMap[compactionKindTombstoneDensity] =
     142            2 :                 compactionOptionalAndPriority{optional: true, priority: 60}
     143            2 :         scheduledCompactionMap[compactionKindElisionOnly] =
     144            2 :                 compactionOptionalAndPriority{optional: true, priority: 50}
     145            2 :         scheduledCompactionMap[compactionKindVirtualRewrite] =
     146            2 :                 compactionOptionalAndPriority{optional: true, priority: 40}
     147            2 :         scheduledCompactionMap[compactionKindBlobFileRewrite] =
     148            2 :                 compactionOptionalAndPriority{optional: true, priority: 30}
     149            2 :         scheduledCompactionMap[compactionKindRead] =
     150            2 :                 compactionOptionalAndPriority{optional: true, priority: 20}
     151            2 :         scheduledCompactionMap[compactionKindRewrite] =
     152            2 :                 compactionOptionalAndPriority{optional: true, priority: 10}
     153            2 : }
     154              : 
     155              : // noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
     156              : type noopGrantHandle struct{}
     157              : 
     158              : var _ CompactionGrantHandle = noopGrantHandle{}
     159              : 
     160            2 : func (h noopGrantHandle) Started()                                              {}
     161            2 : func (h noopGrantHandle) MeasureCPU(CompactionGoroutineKind)                    {}
     162            2 : func (h noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {}
     163            2 : func (h noopGrantHandle) Done()                                                 {}
     164              : 
     165              : // pickedCompactionCache is used to avoid the work of repeatedly picking a
     166              : // compaction that then fails to run immediately because TrySchedule returns
     167              : // false.
     168              : //
     169              : // The high-level approach is to construct a pickedCompaction in
     170              : // DB.maybeScheduleCompaction if there isn't one in the cache, and if
     171              : // TrySchedule returns false, to remember it. Ignoring flushes, the worst-case
     172              : // behavior is 1 of 2 pickedCompactions gets to run (so half the picking work
     173              : // is wasted). This worst-case happens when the system is running at the limit
     174              : // of the long-lived work (including compactions) it can support. In this
     175              : // setting, each started compaction invalidates the pickedCompaction in the
     176              : // cache when it completes, and the reason the cache has a pickedCompaction
     177              : // (that got invalidated) is that the CompactionScheduler called
     178              : // GetWaitingCompaction and decided not to run the pickedCompaction (some
     179              : // other work won). We consider the CPU overhead of this waste acceptable.
     180              : //
     181              : // For the default case of a ConcurrencyLimitScheduler, which only considers a
     182              : // single DB, the aforementioned worst-case is avoided by not constructing a
     183              : // new pickedCompaction in DB.maybeScheduleCompaction when
     184              : // pickedCompactionCache.isWaiting is already true (which became true once,
     185              : // when a backlog developed). Whenever a compaction completes and a new
     186              : // compaction can be started, the call to DBForCompaction.GetWaitingCompaction
     187              : // constructs a new pickedCompaction and caches it, and then this immediately
     188              : // gets to run when DBForCompaction.Schedule is called.
     189              : type pickedCompactionCache struct {
     190              :         // pc != nil => waiting.
     191              :         //
     192              :         // It is acceptable for waiting to be true and pc to be nil, when pc is
     193              :         // invalidated due to starting a compaction, or completing a
     194              :         // compaction/flush (since it changes the latest version).
     195              :         waiting bool
     196              :         pc      pickedCompaction
     197              : }
     198              : 
     199              : // invalidate the cache because a new Version is installed or a compaction is
     200              : // started (since a new in-progress compaction affects future compaction
     201              : // picking). The value of waiting is not changed.
     202            2 : func (c *pickedCompactionCache) invalidate() {
     203            2 :         c.pc = nil
     204            2 : }
     205              : 
     206              : // isWaiting returns the value of waiting.
     207            2 : func (c *pickedCompactionCache) isWaiting() bool {
     208            2 :         return c.waiting
     209            2 : }
     210              : 
     211              : // getForRunning returns a pickedCompaction if in the cache. The cache is
     212              : // cleared. It may return nil.
     213            2 : func (c *pickedCompactionCache) getForRunning() pickedCompaction {
     214            2 :         // NB: This does not set c.waiting = false, since there may be more
     215            2 :         // compactions to run.
     216            2 :         pc := c.pc
     217            2 :         c.pc = nil
     218            2 :         return pc
     219            2 : }
     220              : 
     221              : // setNotWaiting sets waiting to false.
     222            2 : func (c *pickedCompactionCache) setNotWaiting() {
     223            2 :         c.waiting = false
     224            2 :         c.pc = nil
     225            2 : }
     226              : 
     227              : // peek return the pickedCompaction, if any, in the cache.
     228            2 : func (c *pickedCompactionCache) peek() pickedCompaction {
     229            2 :         return c.pc
     230            2 : }
     231              : 
     232              : // add adds a pickedCompaction to the cache and sets waiting to true.
     233            2 : func (c *pickedCompactionCache) add(pc pickedCompaction) {
     234            2 :         c.waiting = true
     235            2 :         c.pc = pc
     236            2 : }
     237              : 
     238              : // ConcurrencyLimitScheduler is the default scheduler used by Pebble. It
     239              : // simply uses the concurrency limit retrieved from
     240              : // DBForCompaction.GetAllowedWithoutPermission to decide the number of
     241              : // compactions to schedule. ConcurrencyLimitScheduler must have its Register
     242              : // method called at most once -- i.e., it cannot be reused across DBs.
     243              : //
     244              : // Since the GetAllowedWithoutPermission value changes over time, the
     245              : // scheduler needs to be quite current in its sampling, especially if the
     246              : // value is increasing, to prevent lag in scheduling compactions. Calls to
     247              : // ConcurrencyLimitScheduler.Done and ConcurrencyLimitScheduler.TrySchedule
     248              : // are obvious places this value is sampled. However, since
     249              : // ConcurrencyLimitScheduler does not observe flushes (which can increase the
     250              : // value), and there can be situations where compactions last 10+ seconds,
     251              : // this sampling is not considered sufficient. Note that calls to
     252              : // ConcurrencyLimitScheduler.TrySchedule are dampened in
     253              : // DB.maybeScheduleCompaction when there is a waiting compaction (to prevent
     254              : // wasted computation of pickedCompaction). If DB.maybeScheduleCompaction
     255              : // always called ConcurrencyLimitScheduler.TrySchedule we would have no lag as
     256              : // DB.maybeScheduleCompaction is called on flush completion. Hence, we resort
     257              : // to having a background thread in ConcurrencyLimitScheduler sample the value
     258              : // every 100ms, plus sample in UpdateGetAllowedWithoutPermission.
     259              : type ConcurrencyLimitScheduler struct {
     260              :         ts schedulerTimeSource
     261              :         // db is set in Register, but not protected by mu since it is strictly
     262              :         // before any calls to the other methods.
     263              :         db DBForCompaction
     264              :         mu struct {
     265              :                 sync.Mutex
     266              :                 runningCompactions int
     267              :                 // unregistered transitions once from false => true.
     268              :                 unregistered bool
     269              :                 // isGranting is used to (a) serialize granting from Done and
     270              :                 // periodicGranter, (b) ensure that granting is stopped before returning
     271              :                 // from Unregister.
     272              :                 isGranting                   bool
     273              :                 isGrantingCond               *sync.Cond
     274              :                 lastAllowedWithoutPermission int
     275              :         }
     276              :         stopPeriodicGranterCh chan struct{}
     277              :         pokePeriodicGranterCh chan struct{}
     278              :         // Only non-nil in some tests.
     279              :         periodicGranterRanChForTesting chan struct{}
     280              : }
     281              : 
     282              : var _ CompactionScheduler = &ConcurrencyLimitScheduler{}
     283              : 
     284            1 : func newConcurrencyLimitScheduler(ts schedulerTimeSource) *ConcurrencyLimitScheduler {
     285            1 :         s := &ConcurrencyLimitScheduler{
     286            1 :                 ts:                    ts,
     287            1 :                 stopPeriodicGranterCh: make(chan struct{}),
     288            1 :                 pokePeriodicGranterCh: make(chan struct{}, 1),
     289            1 :         }
     290            1 :         s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
     291            1 :         return s
     292            1 : }
     293              : 
     294            2 : func NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() *ConcurrencyLimitScheduler {
     295            2 :         s := &ConcurrencyLimitScheduler{
     296            2 :                 ts: defaultTimeSource{},
     297            2 :         }
     298            2 :         s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
     299            2 :         return s
     300            2 : }
     301              : 
     302            2 : func (s *ConcurrencyLimitScheduler) Register(numGoroutinesPerCompaction int, db DBForCompaction) {
     303            2 :         s.db = db
     304            2 :         if s.stopPeriodicGranterCh != nil {
     305            1 :                 go s.periodicGranter()
     306            1 :         }
     307            2 :         s.mu.Lock()
     308            2 :         defer s.mu.Unlock()
     309            2 :         if s.mu.unregistered {
     310            0 :                 panic("cannot reuse ConcurrencyLimitScheduler")
     311              :         }
     312              : }
     313              : 
     314            2 : func (s *ConcurrencyLimitScheduler) Unregister() {
     315            2 :         if s.stopPeriodicGranterCh != nil {
     316            1 :                 s.stopPeriodicGranterCh <- struct{}{}
     317            1 :         }
     318            2 :         s.mu.Lock()
     319            2 :         defer s.mu.Unlock()
     320            2 :         s.mu.unregistered = true
     321            2 :         // Wait until isGranting becomes false. Since unregistered has been set to
     322            2 :         // true, once isGranting becomes false, no more granting will happen.
     323            2 :         for s.mu.isGranting {
     324            2 :                 s.mu.isGrantingCond.Wait()
     325            2 :         }
     326              : }
     327              : 
     328            2 : func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle) {
     329            2 :         s.mu.Lock()
     330            2 :         defer s.mu.Unlock()
     331            2 :         if s.mu.unregistered {
     332            2 :                 return false, nil
     333            2 :         }
     334            2 :         s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
     335            2 :         if s.mu.lastAllowedWithoutPermission > s.mu.runningCompactions {
     336            2 :                 s.mu.runningCompactions++
     337            2 :                 return true, s
     338            2 :         }
     339            2 :         return false, nil
     340              : }
     341              : 
     342            2 : func (s *ConcurrencyLimitScheduler) Started()                                              {}
     343            2 : func (s *ConcurrencyLimitScheduler) MeasureCPU(CompactionGoroutineKind)                    {}
     344            2 : func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {}
     345              : 
     346            2 : func (s *ConcurrencyLimitScheduler) Done() {
     347            2 :         s.mu.Lock()
     348            2 :         s.mu.runningCompactions--
     349            2 :         s.tryGrantLockedAndUnlock()
     350            2 : }
     351              : 
     352            2 : func (s *ConcurrencyLimitScheduler) UpdateGetAllowedWithoutPermission() {
     353            2 :         s.mu.Lock()
     354            2 :         allowedWithoutPermission := s.db.GetAllowedWithoutPermission()
     355            2 :         tryGrant := allowedWithoutPermission > s.mu.lastAllowedWithoutPermission
     356            2 :         s.mu.lastAllowedWithoutPermission = allowedWithoutPermission
     357            2 :         s.mu.Unlock()
     358            2 :         if tryGrant {
     359            2 :                 select {
     360            1 :                 case s.pokePeriodicGranterCh <- struct{}{}:
     361            2 :                 default:
     362              :                 }
     363              :         }
     364              : }
     365              : 
     366            2 : func (s *ConcurrencyLimitScheduler) tryGrantLockedAndUnlock() {
     367            2 :         defer s.mu.Unlock()
     368            2 :         if s.mu.unregistered {
     369            2 :                 return
     370            2 :         }
     371              :         // Wait for turn to grant.
     372            2 :         for s.mu.isGranting {
     373            2 :                 s.mu.isGrantingCond.Wait()
     374            2 :         }
     375              :         // INVARIANT: !isGranting.
     376            2 :         if s.mu.unregistered {
     377            2 :                 return
     378            2 :         }
     379            2 :         s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
     380            2 :         toGrant := s.mu.lastAllowedWithoutPermission - s.mu.runningCompactions
     381            2 :         if toGrant > 0 {
     382            2 :                 s.mu.isGranting = true
     383            2 :         } else {
     384            2 :                 return
     385            2 :         }
     386            2 :         s.mu.Unlock()
     387            2 :         // We call GetWaitingCompaction iff we can successfully grant, so that there
     388            2 :         // is no wasted pickedCompaction.
     389            2 :         //
     390            2 :         // INVARIANT: loop exits with s.mu unlocked.
     391            2 :         for toGrant > 0 {
     392            2 :                 waiting, _ := s.db.GetWaitingCompaction()
     393            2 :                 if !waiting {
     394            2 :                         break
     395              :                 }
     396            2 :                 accepted := s.db.Schedule(s)
     397            2 :                 if !accepted {
     398            1 :                         break
     399              :                 }
     400            2 :                 s.mu.Lock()
     401            2 :                 s.mu.runningCompactions++
     402            2 :                 toGrant--
     403            2 :                 s.mu.Unlock()
     404              :         }
     405              :         // Will be unlocked by the defer statement.
     406            2 :         s.mu.Lock()
     407            2 :         s.mu.isGranting = false
     408            2 :         s.mu.isGrantingCond.Broadcast()
     409              : }
     410              : 
     411            1 : func (s *ConcurrencyLimitScheduler) periodicGranter() {
     412            1 :         ticker := s.ts.newTicker(100 * time.Millisecond)
     413            1 :         for {
     414            1 :                 select {
     415            1 :                 case <-ticker.ch():
     416            1 :                         s.mu.Lock()
     417            1 :                         s.tryGrantLockedAndUnlock()
     418            1 :                 case <-s.pokePeriodicGranterCh:
     419            1 :                         s.mu.Lock()
     420            1 :                         s.tryGrantLockedAndUnlock()
     421            1 :                 case <-s.stopPeriodicGranterCh:
     422            1 :                         ticker.stop()
     423            1 :                         return
     424              :                 }
     425            1 :                 if s.periodicGranterRanChForTesting != nil {
     426            1 :                         s.periodicGranterRanChForTesting <- struct{}{}
     427            1 :                 }
     428              :         }
     429              : }
     430              : 
     431            1 : func (s *ConcurrencyLimitScheduler) adjustRunningCompactionsForTesting(delta int) {
     432            1 :         s.mu.Lock()
     433            1 :         s.mu.runningCompactions += delta
     434            1 :         if delta < 0 {
     435            1 :                 s.tryGrantLockedAndUnlock()
     436            1 :         } else {
     437            1 :                 s.mu.Unlock()
     438            1 :         }
     439              : }
     440              : 
     441              : // schedulerTimeSource is used to abstract time.NewTicker for
     442              : // ConcurrencyLimitScheduler.
     443              : type schedulerTimeSource interface {
     444              :         newTicker(duration time.Duration) schedulerTicker
     445              : }
     446              : 
     447              : // schedulerTicker is used to abstract time.Ticker for
     448              : // ConcurrencyLimitScheduler.
     449              : type schedulerTicker interface {
     450              :         stop()
     451              :         ch() <-chan time.Time
     452              : }
     453              : 
     454              : // defaultTime is a schedulerTimeSource using the time package.
     455              : type defaultTimeSource struct{}
     456              : 
     457              : var _ schedulerTimeSource = defaultTimeSource{}
     458              : 
     459            1 : func (defaultTimeSource) newTicker(duration time.Duration) schedulerTicker {
     460            1 :         return (*defaultTicker)(time.NewTicker(duration))
     461            1 : }
     462              : 
     463              : // defaultTicker uses time.Ticker.
     464              : type defaultTicker time.Ticker
     465              : 
     466              : var _ schedulerTicker = &defaultTicker{}
     467              : 
     468            1 : func (t *defaultTicker) stop() {
     469            1 :         (*time.Ticker)(t).Stop()
     470            1 : }
     471              : 
     472            1 : func (t *defaultTicker) ch() <-chan time.Time {
     473            1 :         return (*time.Ticker)(t).C
     474            1 : }
        

Generated by: LCOV version 2.0-1