LCOV - code coverage report
Current view: top level - pebble - compaction_scheduler.go (source / functions) Coverage Total Hit
Test: 2025-07-02 08:19Z a2947b4a - meta test only.lcov Lines: 72.9 % 192 140
Test Date: 2025-07-02 08:20:24 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "sync"
       9              :         "time"
      10              : 
      11              :         "github.com/cockroachdb/pebble/internal/base"
      12              : )
      13              : 
      14              : type CompactionGrantHandle = base.CompactionGrantHandle
      15              : type CompactionGrantHandleStats = base.CompactionGrantHandleStats
      16              : type CompactionGoroutineKind = base.CompactionGoroutineKind
      17              : 
      18              : const (
      19              :         CompactionGoroutinePrimary           = base.CompactionGoroutinePrimary
      20              :         CompactionGoroutineSSTableSecondary  = base.CompactionGoroutineSSTableSecondary
      21              :         CompactionGoroutineBlobFileSecondary = base.CompactionGoroutineBlobFileSecondary
      22              : )
      23              : 
      24              : // NB: This interface is experimental and subject to change.
      25              : //
      26              : // For instance, we may incorporate more information in TrySchedule and in the
      27              : // return value of Schedule to tell CompactionScheduler of the sub-category of
      28              : // compaction so that the scheduler can have more granular estimates. For
      29              : // example, the input or output level could affect the write bandwidth if the
      30              : // inputs are better cached (say at higher levels).
      31              : 
      32              : // CompactionScheduler is responsible for scheduling both automatic and manual
      33              : // compactions. In the case of multiple DB instances on a node (i.e. a
      34              : // multi-store configuration), implementations of CompactionScheduler may
      35              : // enforce a global maximum compaction concurrency. Additionally,
      36              : // implementations of CompactionScheduler may be resource aware and permit
      37              : // more than the compactions that are "allowed without permission" if
      38              : // resources are available.
      39              : //
      40              : // Locking: CompactionScheduler's mutexes are ordered after DBForCompaction
      41              : // mutexes. We need to specify some lock ordering since CompactionScheduler
      42              : // and DBForCompaction call into each other. This ordering choice is made to
      43              : // simplify the implementation of DBForCompaction. There are three exceptions
      44              : // to this DBForCompaction.GetAllowedWithoutPermission,
      45              : // CompactionScheduler.Unregister, CompactionGrantHandle.Done -- see those
      46              : // declarations for details.
      47              : type CompactionScheduler interface {
      48              :         // Register is called to register this DB and to specify the number of
      49              :         // goroutines that consume CPU in each compaction (see the CPU reporting
      50              :         // interface, CompactionGrantHandle.MeasureCPU). Must be called exactly once
      51              :         // by this DB if it successfully opens.
      52              :         Register(numGoroutinesPerCompaction int, db DBForCompaction)
      53              :         // Unregister is used to unregister the DB. Must be called once when the DB
      54              :         // is being closed. Unregister waits until all ongoing calls to
      55              :         // DBForCompaction are finished, so Unregister must not be called while
      56              :         // holding locks that DBForCompaction acquires in those calls.
      57              :         Unregister()
      58              :         // TrySchedule is called by DB when it wants to run a compaction. The bool
      59              :         // is true iff permission is granted, and in that case the
      60              :         // CompactionGrantHandle needs to be exercised by the DB.
      61              :         TrySchedule() (bool, CompactionGrantHandle)
      62              :         // UpdateGetAllowedWithoutPermission is to inform the scheduler that some
      63              :         // external behavior may have caused this value to change. It exists because
      64              :         // flushes are not otherwise visible to the CompactionScheduler, and can
      65              :         // cause the value to increase. CompactionScheduler implementation should do
      66              :         // periodic sampling (e.g. as done by
      67              :         // ConcurrencyLimitScheduler.periodicGranter), but this provides an
      68              :         // instantaneous opportunity to act.
      69              :         UpdateGetAllowedWithoutPermission()
      70              : }
      71              : 
      72              : // DBForCompaction is the interface implemented by the DB to interact with the
      73              : // CompactionScheduler.
      74              : type DBForCompaction interface {
      75              :         // GetAllowedWithoutPermission returns what is permitted at the DB-level
      76              :         // (there may be further restrictions at the node level, when there are
      77              :         // multiple DBs at a node, which is not captured by this number). This can
      78              :         // vary based on compaction backlog or other factors. This method must not
      79              :         // acquire any mutex in DBForCompaction that is covered by the general mutex
      80              :         // ordering rule stated earlier.
      81              :         GetAllowedWithoutPermission() int
      82              :         // GetWaitingCompaction returns true iff the DB can run a compaction. The
      83              :         // true return is accompanied by a populated WaitingForCompaction, that the
      84              :         // scheduler can use to pick across DBs or other work in the system. This
      85              :         // method should typically be efficient, in that the DB should try to cache
      86              :         // some state if its previous call to TrySchedule resulted in a failure to
      87              :         // get permission. It is ok if it is sometimes slow since all work scheduled
      88              :         // by CompactionScheduler is long-lived (often executing for multiple
      89              :         // seconds).
      90              :         GetWaitingCompaction() (bool, WaitingCompaction)
      91              :         // Schedule grants the DB permission to run a compaction. The DB returns
      92              :         // true iff it accepts the grant, in which case it must exercise the
      93              :         // CompactionGrantHandle.
      94              :         Schedule(CompactionGrantHandle) bool
      95              : }
      96              : 
      97              : // WaitingCompaction captures state for a compaction that can be used to
      98              : // prioritize wrt compactions in other DBs or other long-lived work in the
      99              : // system.
     100              : type WaitingCompaction struct {
     101              :         // Optional is true for a compaction that isn't necessary for maintaining an
     102              :         // overall healthy LSM. This value can be compared across compactions and
     103              :         // other long-lived work.
     104              :         Optional bool
     105              :         // Priority is the priority of a compaction. It is only compared across
     106              :         // compactions, and when the Optional value is the same.
     107              :         Priority int
     108              :         // Score is only compared across compactions. It is only compared across
     109              :         // compactions, and when the Optional and Priority are the same.
     110              :         Score float64
     111              : }
     112              : 
     113              : // Ordering is by priority and if the optional value is different, false is
     114              : // more important than true.
     115              : //
     116              : // The ordering here must be consistent with the order in which compactions
     117              : // are picked in compactionPickerByScore.pickAuto.
     118              : type compactionOptionalAndPriority struct {
     119              :         optional bool
     120              :         priority int
     121              : }
     122              : 
     123              : var scheduledCompactionMap map[compactionKind]compactionOptionalAndPriority
     124              : var manualCompactionPriority int
     125              : 
     126            1 : func init() {
     127            1 :         // Manual compactions have priority just below the score-rebased
     128            1 :         // compactions, since DB.pickAnyCompaction first picks score-based
     129            1 :         // compactions, and then manual compactions.
     130            1 :         manualCompactionPriority = 70
     131            1 :         scheduledCompactionMap = map[compactionKind]compactionOptionalAndPriority{}
     132            1 :         // Score-based-compactions have priorities {100, 90, 80}.
     133            1 :         //
     134            1 :         // We don't actually know if it is a compactionKindMove or
     135            1 :         // compactionKindCopy until a compactionKindDefault is turned from a
     136            1 :         // pickedCompaction into a compaction struct. So we will never see those
     137            1 :         // values here, but for completeness we include them.
     138            1 :         scheduledCompactionMap[compactionKindMove] = compactionOptionalAndPriority{priority: 100}
     139            1 :         scheduledCompactionMap[compactionKindCopy] = compactionOptionalAndPriority{priority: 90}
     140            1 :         scheduledCompactionMap[compactionKindDefault] = compactionOptionalAndPriority{priority: 80}
     141            1 :         scheduledCompactionMap[compactionKindTombstoneDensity] =
     142            1 :                 compactionOptionalAndPriority{optional: true, priority: 60}
     143            1 :         scheduledCompactionMap[compactionKindElisionOnly] =
     144            1 :                 compactionOptionalAndPriority{optional: true, priority: 50}
     145            1 :         scheduledCompactionMap[compactionKindBlobFileRewrite] =
     146            1 :                 compactionOptionalAndPriority{optional: true, priority: 40}
     147            1 :         scheduledCompactionMap[compactionKindRead] =
     148            1 :                 compactionOptionalAndPriority{optional: true, priority: 30}
     149            1 :         scheduledCompactionMap[compactionKindRewrite] =
     150            1 :                 compactionOptionalAndPriority{optional: true, priority: 20}
     151            1 : }
     152              : 
     153              : // noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
     154              : type noopGrantHandle struct{}
     155              : 
     156              : var _ CompactionGrantHandle = noopGrantHandle{}
     157              : 
     158            1 : func (h noopGrantHandle) Started()                                              {}
     159            1 : func (h noopGrantHandle) MeasureCPU(CompactionGoroutineKind)                    {}
     160            1 : func (h noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {}
     161            1 : func (h noopGrantHandle) Done()                                                 {}
     162              : 
     163              : // pickedCompactionCache is used to avoid the work of repeatedly picking a
     164              : // compaction that then fails to run immediately because TrySchedule returns
     165              : // false.
     166              : //
     167              : // The high-level approach is to construct a pickedCompaction in
     168              : // DB.maybeScheduleCompaction if there isn't one in the cache, and if
     169              : // TrySchedule returns false, to remember it. Ignoring flushes, the worst-case
     170              : // behavior is 1 of 2 pickedCompactions gets to run (so half the picking work
     171              : // is wasted). This worst-case happens when the system is running at the limit
     172              : // of the long-lived work (including compactions) it can support. In this
     173              : // setting, each started compaction invalidates the pickedCompaction in the
     174              : // cache when it completes, and the reason the cache has a pickedCompaction
     175              : // (that got invalidated) is that the CompactionScheduler called
     176              : // GetWaitingCompaction and decided not to run the pickedCompaction (some
     177              : // other work won). We consider the CPU overhead of this waste acceptable.
     178              : //
     179              : // For the default case of a ConcurrencyLimitScheduler, which only considers a
     180              : // single DB, the aforementioned worst-case is avoided by not constructing a
     181              : // new pickedCompaction in DB.maybeScheduleCompaction when
     182              : // pickedCompactionCache.isWaiting is already true (which became true once,
     183              : // when a backlog developed). Whenever a compaction completes and a new
     184              : // compaction can be started, the call to DBForCompaction.GetWaitingCompaction
     185              : // constructs a new pickedCompaction and caches it, and then this immediately
     186              : // gets to run when DBForCompaction.Schedule is called.
     187              : type pickedCompactionCache struct {
     188              :         // pc != nil => waiting.
     189              :         //
     190              :         // It is acceptable for waiting to be true and pc to be nil, when pc is
     191              :         // invalidated due to starting a compaction, or completing a
     192              :         // compaction/flush (since it changes the latest version).
     193              :         waiting bool
     194              :         pc      pickedCompaction
     195              : }
     196              : 
     197              : // invalidate the cache because a new Version is installed or a compaction is
     198              : // started (since a new in-progress compaction affects future compaction
     199              : // picking). The value of waiting is not changed.
     200            1 : func (c *pickedCompactionCache) invalidate() {
     201            1 :         c.pc = nil
     202            1 : }
     203              : 
     204              : // isWaiting returns the value of waiting.
     205            1 : func (c *pickedCompactionCache) isWaiting() bool {
     206            1 :         return c.waiting
     207            1 : }
     208              : 
     209              : // getForRunning returns a pickedCompaction if in the cache. The cache is
     210              : // cleared. It may return nil.
     211            1 : func (c *pickedCompactionCache) getForRunning() pickedCompaction {
     212            1 :         // NB: This does not set c.waiting = false, since there may be more
     213            1 :         // compactions to run.
     214            1 :         pc := c.pc
     215            1 :         c.pc = nil
     216            1 :         return pc
     217            1 : }
     218              : 
     219              : // setNotWaiting sets waiting to false.
     220            1 : func (c *pickedCompactionCache) setNotWaiting() {
     221            1 :         c.waiting = false
     222            1 :         c.pc = nil
     223            1 : }
     224              : 
     225              : // peek return the pickedCompaction, if any, in the cache.
     226            1 : func (c *pickedCompactionCache) peek() pickedCompaction {
     227            1 :         return c.pc
     228            1 : }
     229              : 
     230              : // add adds a pickedCompaction to the cache and sets waiting to true.
     231            1 : func (c *pickedCompactionCache) add(pc pickedCompaction) {
     232            1 :         c.waiting = true
     233            1 :         c.pc = pc
     234            1 : }
     235              : 
     236              : // ConcurrencyLimitScheduler is the default scheduler used by Pebble. It
     237              : // simply uses the concurrency limit retrieved from
     238              : // DBForCompaction.GetAllowedWithoutPermission to decide the number of
     239              : // compactions to schedule. ConcurrencyLimitScheduler must have its Register
     240              : // method called at most once -- i.e., it cannot be reused across DBs.
     241              : //
     242              : // Since the GetAllowedWithoutPermission value changes over time, the
     243              : // scheduler needs to be quite current in its sampling, especially if the
     244              : // value is increasing, to prevent lag in scheduling compactions. Calls to
     245              : // ConcurrencyLimitScheduler.Done and ConcurrencyLimitScheduler.TrySchedule
     246              : // are obvious places this value is sampled. However, since
     247              : // ConcurrencyLimitScheduler does not observe flushes (which can increase the
     248              : // value), and there can be situations where compactions last 10+ seconds,
     249              : // this sampling is not considered sufficient. Note that calls to
     250              : // ConcurrencyLimitScheduler.TrySchedule are dampened in
     251              : // DB.maybeScheduleCompaction when there is a waiting compaction (to prevent
     252              : // wasted computation of pickedCompaction). If DB.maybeScheduleCompaction
     253              : // always called ConcurrencyLimitScheduler.TrySchedule we would have no lag as
     254              : // DB.maybeScheduleCompaction is called on flush completion. Hence, we resort
     255              : // to having a background thread in ConcurrencyLimitScheduler sample the value
     256              : // every 100ms, plus sample in UpdateGetAllowedWithoutPermission.
     257              : type ConcurrencyLimitScheduler struct {
     258              :         ts schedulerTimeSource
     259              :         // db is set in Register, but not protected by mu since it is strictly
     260              :         // before any calls to the other methods.
     261              :         db DBForCompaction
     262              :         mu struct {
     263              :                 sync.Mutex
     264              :                 runningCompactions int
     265              :                 // unregistered transitions once from false => true.
     266              :                 unregistered bool
     267              :                 // isGranting is used to (a) serialize granting from Done and
     268              :                 // periodicGranter, (b) ensure that granting is stopped before returning
     269              :                 // from Unregister.
     270              :                 isGranting                   bool
     271              :                 isGrantingCond               *sync.Cond
     272              :                 lastAllowedWithoutPermission int
     273              :         }
     274              :         stopPeriodicGranterCh chan struct{}
     275              :         pokePeriodicGranterCh chan struct{}
     276              :         // Only non-nil in some tests.
     277              :         periodicGranterRanChForTesting chan struct{}
     278              : }
     279              : 
     280              : var _ CompactionScheduler = &ConcurrencyLimitScheduler{}
     281              : 
     282            0 : func newConcurrencyLimitScheduler(ts schedulerTimeSource) *ConcurrencyLimitScheduler {
     283            0 :         s := &ConcurrencyLimitScheduler{
     284            0 :                 ts:                    ts,
     285            0 :                 stopPeriodicGranterCh: make(chan struct{}),
     286            0 :                 pokePeriodicGranterCh: make(chan struct{}, 1),
     287            0 :         }
     288            0 :         s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
     289            0 :         return s
     290            0 : }
     291              : 
     292            1 : func NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() *ConcurrencyLimitScheduler {
     293            1 :         s := &ConcurrencyLimitScheduler{
     294            1 :                 ts: defaultTimeSource{},
     295            1 :         }
     296            1 :         s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
     297            1 :         return s
     298            1 : }
     299              : 
     300            1 : func (s *ConcurrencyLimitScheduler) Register(numGoroutinesPerCompaction int, db DBForCompaction) {
     301            1 :         s.db = db
     302            1 :         if s.stopPeriodicGranterCh != nil {
     303            0 :                 go s.periodicGranter()
     304            0 :         }
     305              : }
     306              : 
     307            1 : func (s *ConcurrencyLimitScheduler) Unregister() {
     308            1 :         if s.stopPeriodicGranterCh != nil {
     309            0 :                 s.stopPeriodicGranterCh <- struct{}{}
     310            0 :         }
     311            1 :         s.mu.Lock()
     312            1 :         defer s.mu.Unlock()
     313            1 :         s.mu.unregistered = true
     314            1 :         // Wait until isGranting becomes false. Since unregistered has been set to
     315            1 :         // true, once isGranting becomes false, no more granting will happen.
     316            1 :         for s.mu.isGranting {
     317            1 :                 s.mu.isGrantingCond.Wait()
     318            1 :         }
     319              : }
     320              : 
     321            1 : func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle) {
     322            1 :         s.mu.Lock()
     323            1 :         defer s.mu.Unlock()
     324            1 :         if s.mu.unregistered {
     325            1 :                 return false, nil
     326            1 :         }
     327            1 :         s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
     328            1 :         if s.mu.lastAllowedWithoutPermission > s.mu.runningCompactions {
     329            1 :                 s.mu.runningCompactions++
     330            1 :                 return true, s
     331            1 :         }
     332            1 :         return false, nil
     333              : }
     334              : 
     335            1 : func (s *ConcurrencyLimitScheduler) Started()                                              {}
     336            1 : func (s *ConcurrencyLimitScheduler) MeasureCPU(CompactionGoroutineKind)                    {}
     337            1 : func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {}
     338              : 
     339            1 : func (s *ConcurrencyLimitScheduler) Done() {
     340            1 :         s.mu.Lock()
     341            1 :         s.mu.runningCompactions--
     342            1 :         s.tryGrantLockedAndUnlock()
     343            1 : }
     344              : 
     345            1 : func (s *ConcurrencyLimitScheduler) UpdateGetAllowedWithoutPermission() {
     346            1 :         s.mu.Lock()
     347            1 :         allowedWithoutPermission := s.db.GetAllowedWithoutPermission()
     348            1 :         tryGrant := allowedWithoutPermission > s.mu.lastAllowedWithoutPermission
     349            1 :         s.mu.lastAllowedWithoutPermission = allowedWithoutPermission
     350            1 :         s.mu.Unlock()
     351            1 :         if tryGrant {
     352            1 :                 select {
     353            0 :                 case s.pokePeriodicGranterCh <- struct{}{}:
     354            1 :                 default:
     355              :                 }
     356              :         }
     357              : }
     358              : 
     359            1 : func (s *ConcurrencyLimitScheduler) tryGrantLockedAndUnlock() {
     360            1 :         defer s.mu.Unlock()
     361            1 :         if s.mu.unregistered {
     362            1 :                 return
     363            1 :         }
     364              :         // Wait for turn to grant.
     365            1 :         for s.mu.isGranting {
     366            1 :                 s.mu.isGrantingCond.Wait()
     367            1 :         }
     368              :         // INVARIANT: !isGranting.
     369            1 :         if s.mu.unregistered {
     370            1 :                 return
     371            1 :         }
     372            1 :         s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
     373            1 :         toGrant := s.mu.lastAllowedWithoutPermission - s.mu.runningCompactions
     374            1 :         if toGrant > 0 {
     375            1 :                 s.mu.isGranting = true
     376            1 :         } else {
     377            1 :                 return
     378            1 :         }
     379            1 :         s.mu.Unlock()
     380            1 :         // We call GetWaitingCompaction iff we can successfully grant, so that there
     381            1 :         // is no wasted pickedCompaction.
     382            1 :         //
     383            1 :         // INVARIANT: loop exits with s.mu unlocked.
     384            1 :         for toGrant > 0 {
     385            1 :                 waiting, _ := s.db.GetWaitingCompaction()
     386            1 :                 if !waiting {
     387            1 :                         break
     388              :                 }
     389            1 :                 accepted := s.db.Schedule(s)
     390            1 :                 if !accepted {
     391            1 :                         break
     392              :                 }
     393            1 :                 s.mu.Lock()
     394            1 :                 s.mu.runningCompactions++
     395            1 :                 toGrant--
     396            1 :                 s.mu.Unlock()
     397              :         }
     398              :         // Will be unlocked by the defer statement.
     399            1 :         s.mu.Lock()
     400            1 :         s.mu.isGranting = false
     401            1 :         s.mu.isGrantingCond.Broadcast()
     402              : }
     403              : 
     404            0 : func (s *ConcurrencyLimitScheduler) periodicGranter() {
     405            0 :         ticker := s.ts.newTicker(100 * time.Millisecond)
     406            0 :         for {
     407            0 :                 select {
     408            0 :                 case <-ticker.ch():
     409            0 :                         s.mu.Lock()
     410            0 :                         s.tryGrantLockedAndUnlock()
     411            0 :                 case <-s.pokePeriodicGranterCh:
     412            0 :                         s.mu.Lock()
     413            0 :                         s.tryGrantLockedAndUnlock()
     414            0 :                 case <-s.stopPeriodicGranterCh:
     415            0 :                         ticker.stop()
     416            0 :                         return
     417              :                 }
     418            0 :                 if s.periodicGranterRanChForTesting != nil {
     419            0 :                         s.periodicGranterRanChForTesting <- struct{}{}
     420            0 :                 }
     421              :         }
     422              : }
     423              : 
     424            0 : func (s *ConcurrencyLimitScheduler) adjustRunningCompactionsForTesting(delta int) {
     425            0 :         s.mu.Lock()
     426            0 :         s.mu.runningCompactions += delta
     427            0 :         if delta < 0 {
     428            0 :                 s.tryGrantLockedAndUnlock()
     429            0 :         } else {
     430            0 :                 s.mu.Unlock()
     431            0 :         }
     432              : }
     433              : 
     434            0 : func (s *ConcurrencyLimitScheduler) isUnregisteredForTesting() bool {
     435            0 :         s.mu.Lock()
     436            0 :         defer s.mu.Unlock()
     437            0 :         return s.mu.unregistered
     438            0 : }
     439              : 
     440              : // schedulerTimeSource is used to abstract time.NewTicker for
     441              : // ConcurrencyLimitScheduler.
     442              : type schedulerTimeSource interface {
     443              :         newTicker(duration time.Duration) schedulerTicker
     444              : }
     445              : 
     446              : // schedulerTicker is used to abstract time.Ticker for
     447              : // ConcurrencyLimitScheduler.
     448              : type schedulerTicker interface {
     449              :         stop()
     450              :         ch() <-chan time.Time
     451              : }
     452              : 
     453              : // defaultTime is a schedulerTimeSource using the time package.
     454              : type defaultTimeSource struct{}
     455              : 
     456              : var _ schedulerTimeSource = defaultTimeSource{}
     457              : 
     458            0 : func (defaultTimeSource) newTicker(duration time.Duration) schedulerTicker {
     459            0 :         return (*defaultTicker)(time.NewTicker(duration))
     460            0 : }
     461              : 
     462              : // defaultTicker uses time.Ticker.
     463              : type defaultTicker time.Ticker
     464              : 
     465              : var _ schedulerTicker = &defaultTicker{}
     466              : 
     467            0 : func (t *defaultTicker) stop() {
     468            0 :         (*time.Ticker)(t).Stop()
     469            0 : }
     470              : 
     471            0 : func (t *defaultTicker) ch() <-chan time.Time {
     472            0 :         return (*time.Ticker)(t).C
     473            0 : }
        

Generated by: LCOV version 2.0-1