LCOV - code coverage report
Current view: top level - pebble - compaction_scheduler.go (source / functions) Coverage Total Hit
Test: 2025-09-07 08:17Z d86f6dab - tests + meta.lcov Lines: 99.5 % 191 190
Test Date: 2025-09-07 08:25:14 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "sync"
       9              :         "time"
      10              : 
      11              :         "github.com/cockroachdb/pebble/internal/base"
      12              : )
      13              : 
      14              : type CompactionGrantHandle = base.CompactionGrantHandle
      15              : type CompactionGrantHandleStats = base.CompactionGrantHandleStats
      16              : type CompactionGoroutineKind = base.CompactionGoroutineKind
      17              : 
      18              : const (
      19              :         CompactionGoroutinePrimary           = base.CompactionGoroutinePrimary
      20              :         CompactionGoroutineSSTableSecondary  = base.CompactionGoroutineSSTableSecondary
      21              :         CompactionGoroutineBlobFileSecondary = base.CompactionGoroutineBlobFileSecondary
      22              : )
      23              : 
      24              : // NB: This interface is experimental and subject to change.
      25              : //
      26              : // For instance, we may incorporate more information in TrySchedule and in the
      27              : // return value of Schedule to tell CompactionScheduler of the sub-category of
      28              : // compaction so that the scheduler can have more granular estimates. For
      29              : // example, the input or output level could affect the write bandwidth if the
      30              : // inputs are better cached (say at higher levels).
      31              : 
      32              : // CompactionScheduler is responsible for scheduling both automatic and manual
      33              : // compactions. In the case of multiple DB instances on a node (i.e. a
      34              : // multi-store configuration), implementations of CompactionScheduler may
      35              : // enforce a global maximum compaction concurrency. Additionally,
      36              : // implementations of CompactionScheduler may be resource aware and permit
      37              : // more than the compactions that are "allowed without permission" if
      38              : // resources are available.
      39              : //
      40              : // Locking: CompactionScheduler's mutexes are ordered after DBForCompaction
      41              : // mutexes. We need to specify some lock ordering since CompactionScheduler
      42              : // and DBForCompaction call into each other. This ordering choice is made to
      43              : // simplify the implementation of DBForCompaction. There are three exceptions
      44              : // to this DBForCompaction.GetAllowedWithoutPermission,
      45              : // CompactionScheduler.Unregister, CompactionGrantHandle.Done -- see those
      46              : // declarations for details.
      47              : type CompactionScheduler interface {
      48              :         // Register is called to register this DB and to specify the number of
      49              :         // goroutines that consume CPU in each compaction (see the CPU reporting
      50              :         // interface, CompactionGrantHandle.MeasureCPU). Must be called exactly once
      51              :         // by this DB if it successfully opens.
      52              :         Register(numGoroutinesPerCompaction int, db DBForCompaction)
      53              :         // Unregister is used to unregister the DB. Must be called once when the DB
      54              :         // is being closed. Unregister waits until all ongoing calls to
      55              :         // DBForCompaction are finished, so Unregister must not be called while
      56              :         // holding locks that DBForCompaction acquires in those calls.
      57              :         Unregister()
      58              :         // TrySchedule is called by DB when it wants to run a compaction. The bool
      59              :         // is true iff permission is granted, and in that case the
      60              :         // CompactionGrantHandle needs to be exercised by the DB.
      61              :         TrySchedule() (bool, CompactionGrantHandle)
      62              :         // UpdateGetAllowedWithoutPermission is to inform the scheduler that some
      63              :         // external behavior may have caused this value to change. It exists because
      64              :         // flushes are not otherwise visible to the CompactionScheduler, and can
      65              :         // cause the value to increase. CompactionScheduler implementation should do
      66              :         // periodic sampling (e.g. as done by
      67              :         // ConcurrencyLimitScheduler.periodicGranter), but this provides an
      68              :         // instantaneous opportunity to act.
      69              :         UpdateGetAllowedWithoutPermission()
      70              : }
      71              : 
      72              : // DBForCompaction is the interface implemented by the DB to interact with the
      73              : // CompactionScheduler.
      74              : type DBForCompaction interface {
      75              :         // GetAllowedWithoutPermission returns what is permitted at the DB-level
      76              :         // (there may be further restrictions at the node level, when there are
      77              :         // multiple DBs at a node, which is not captured by this number). This can
      78              :         // vary based on compaction backlog or other factors. This method must not
      79              :         // acquire any mutex in DBForCompaction that is covered by the general mutex
      80              :         // ordering rule stated earlier.
      81              :         GetAllowedWithoutPermission() int
      82              :         // GetWaitingCompaction returns true iff the DB can run a compaction. The
      83              :         // true return is accompanied by a populated WaitingForCompaction, that the
      84              :         // scheduler can use to pick across DBs or other work in the system. This
      85              :         // method should typically be efficient, in that the DB should try to cache
      86              :         // some state if its previous call to TrySchedule resulted in a failure to
      87              :         // get permission. It is ok if it is sometimes slow since all work scheduled
      88              :         // by CompactionScheduler is long-lived (often executing for multiple
      89              :         // seconds).
      90              :         GetWaitingCompaction() (bool, WaitingCompaction)
      91              :         // Schedule grants the DB permission to run a compaction. The DB returns
      92              :         // true iff it accepts the grant, in which case it must exercise the
      93              :         // CompactionGrantHandle.
      94              :         Schedule(CompactionGrantHandle) bool
      95              : }
      96              : 
      97              : // WaitingCompaction captures state for a compaction that can be used to
      98              : // prioritize wrt compactions in other DBs or other long-lived work in the
      99              : // system.
     100              : type WaitingCompaction struct {
     101              :         // Optional is true for a compaction that isn't necessary for maintaining an
     102              :         // overall healthy LSM. This value can be compared across compactions and
     103              :         // other long-lived work.
     104              :         Optional bool
     105              :         // Priority is the priority of a compaction. It is only compared across
     106              :         // compactions, and when the Optional value is the same.
     107              :         Priority int
     108              :         // Score is only compared across compactions. It is only compared across
     109              :         // compactions, and when the Optional and Priority are the same.
     110              :         Score float64
     111              : }
     112              : 
     113              : // Ordering is by priority and if the optional value is different, false is
     114              : // more important than true.
     115              : //
     116              : // The ordering here must be consistent with the order in which compactions
     117              : // are picked in compactionPickerByScore.pickAuto.
     118              : type compactionOptionalAndPriority struct {
     119              :         optional bool
     120              :         priority int
     121              : }
     122              : 
     123              : var scheduledCompactionMap map[compactionKind]compactionOptionalAndPriority
     124              : var manualCompactionPriority int
     125              : 
     126            2 : func init() {
     127            2 :         // Manual compactions have priority just below the score-rebased
     128            2 :         // compactions, since DB.pickAnyCompaction first picks score-based
     129            2 :         // compactions, and then manual compactions.
     130            2 :         manualCompactionPriority = 70
     131            2 :         scheduledCompactionMap = map[compactionKind]compactionOptionalAndPriority{}
     132            2 :         // Score-based-compactions have priorities {100, 90, 80}.
     133            2 :         //
     134            2 :         // We don't actually know if it is a compactionKindMove or
     135            2 :         // compactionKindCopy until a compactionKindDefault is turned from a
     136            2 :         // pickedCompaction into a compaction struct. So we will never see those
     137            2 :         // values here, but for completeness we include them.
     138            2 :         scheduledCompactionMap[compactionKindMove] = compactionOptionalAndPriority{priority: 100}
     139            2 :         scheduledCompactionMap[compactionKindCopy] = compactionOptionalAndPriority{priority: 90}
     140            2 :         scheduledCompactionMap[compactionKindDefault] = compactionOptionalAndPriority{priority: 80}
     141            2 :         scheduledCompactionMap[compactionKindTombstoneDensity] =
     142            2 :                 compactionOptionalAndPriority{optional: true, priority: 60}
     143            2 :         scheduledCompactionMap[compactionKindElisionOnly] =
     144            2 :                 compactionOptionalAndPriority{optional: true, priority: 50}
     145            2 :         scheduledCompactionMap[compactionKindBlobFileRewrite] =
     146            2 :                 compactionOptionalAndPriority{optional: true, priority: 40}
     147            2 :         scheduledCompactionMap[compactionKindRead] =
     148            2 :                 compactionOptionalAndPriority{optional: true, priority: 30}
     149            2 :         scheduledCompactionMap[compactionKindRewrite] =
     150            2 :                 compactionOptionalAndPriority{optional: true, priority: 20}
     151            2 : }
     152              : 
     153              : // noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
     154              : type noopGrantHandle struct{}
     155              : 
     156              : var _ CompactionGrantHandle = noopGrantHandle{}
     157              : 
     158            2 : func (h noopGrantHandle) Started()                                              {}
     159            2 : func (h noopGrantHandle) MeasureCPU(CompactionGoroutineKind)                    {}
     160            2 : func (h noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {}
     161            2 : func (h noopGrantHandle) Done()                                                 {}
     162              : 
     163              : // pickedCompactionCache is used to avoid the work of repeatedly picking a
     164              : // compaction that then fails to run immediately because TrySchedule returns
     165              : // false.
     166              : //
     167              : // The high-level approach is to construct a pickedCompaction in
     168              : // DB.maybeScheduleCompaction if there isn't one in the cache, and if
     169              : // TrySchedule returns false, to remember it. Ignoring flushes, the worst-case
     170              : // behavior is 1 of 2 pickedCompactions gets to run (so half the picking work
     171              : // is wasted). This worst-case happens when the system is running at the limit
     172              : // of the long-lived work (including compactions) it can support. In this
     173              : // setting, each started compaction invalidates the pickedCompaction in the
     174              : // cache when it completes, and the reason the cache has a pickedCompaction
     175              : // (that got invalidated) is that the CompactionScheduler called
     176              : // GetWaitingCompaction and decided not to run the pickedCompaction (some
     177              : // other work won). We consider the CPU overhead of this waste acceptable.
     178              : //
     179              : // For the default case of a ConcurrencyLimitScheduler, which only considers a
     180              : // single DB, the aforementioned worst-case is avoided by not constructing a
     181              : // new pickedCompaction in DB.maybeScheduleCompaction when
     182              : // pickedCompactionCache.isWaiting is already true (which became true once,
     183              : // when a backlog developed). Whenever a compaction completes and a new
     184              : // compaction can be started, the call to DBForCompaction.GetWaitingCompaction
     185              : // constructs a new pickedCompaction and caches it, and then this immediately
     186              : // gets to run when DBForCompaction.Schedule is called.
     187              : type pickedCompactionCache struct {
     188              :         // pc != nil => waiting.
     189              :         //
     190              :         // It is acceptable for waiting to be true and pc to be nil, when pc is
     191              :         // invalidated due to starting a compaction, or completing a
     192              :         // compaction/flush (since it changes the latest version).
     193              :         waiting bool
     194              :         pc      pickedCompaction
     195              : }
     196              : 
     197              : // invalidate the cache because a new Version is installed or a compaction is
     198              : // started (since a new in-progress compaction affects future compaction
     199              : // picking). The value of waiting is not changed.
     200            2 : func (c *pickedCompactionCache) invalidate() {
     201            2 :         c.pc = nil
     202            2 : }
     203              : 
     204              : // isWaiting returns the value of waiting.
     205            2 : func (c *pickedCompactionCache) isWaiting() bool {
     206            2 :         return c.waiting
     207            2 : }
     208              : 
     209              : // getForRunning returns a pickedCompaction if in the cache. The cache is
     210              : // cleared. It may return nil.
     211            2 : func (c *pickedCompactionCache) getForRunning() pickedCompaction {
     212            2 :         // NB: This does not set c.waiting = false, since there may be more
     213            2 :         // compactions to run.
     214            2 :         pc := c.pc
     215            2 :         c.pc = nil
     216            2 :         return pc
     217            2 : }
     218              : 
     219              : // setNotWaiting sets waiting to false.
     220            2 : func (c *pickedCompactionCache) setNotWaiting() {
     221            2 :         c.waiting = false
     222            2 :         c.pc = nil
     223            2 : }
     224              : 
     225              : // peek return the pickedCompaction, if any, in the cache.
     226            2 : func (c *pickedCompactionCache) peek() pickedCompaction {
     227            2 :         return c.pc
     228            2 : }
     229              : 
     230              : // add adds a pickedCompaction to the cache and sets waiting to true.
     231            2 : func (c *pickedCompactionCache) add(pc pickedCompaction) {
     232            2 :         c.waiting = true
     233            2 :         c.pc = pc
     234            2 : }
     235              : 
     236              : // ConcurrencyLimitScheduler is the default scheduler used by Pebble. It
     237              : // simply uses the concurrency limit retrieved from
     238              : // DBForCompaction.GetAllowedWithoutPermission to decide the number of
     239              : // compactions to schedule. ConcurrencyLimitScheduler must have its Register
     240              : // method called at most once -- i.e., it cannot be reused across DBs.
     241              : //
     242              : // Since the GetAllowedWithoutPermission value changes over time, the
     243              : // scheduler needs to be quite current in its sampling, especially if the
     244              : // value is increasing, to prevent lag in scheduling compactions. Calls to
     245              : // ConcurrencyLimitScheduler.Done and ConcurrencyLimitScheduler.TrySchedule
     246              : // are obvious places this value is sampled. However, since
     247              : // ConcurrencyLimitScheduler does not observe flushes (which can increase the
     248              : // value), and there can be situations where compactions last 10+ seconds,
     249              : // this sampling is not considered sufficient. Note that calls to
     250              : // ConcurrencyLimitScheduler.TrySchedule are dampened in
     251              : // DB.maybeScheduleCompaction when there is a waiting compaction (to prevent
     252              : // wasted computation of pickedCompaction). If DB.maybeScheduleCompaction
     253              : // always called ConcurrencyLimitScheduler.TrySchedule we would have no lag as
     254              : // DB.maybeScheduleCompaction is called on flush completion. Hence, we resort
     255              : // to having a background thread in ConcurrencyLimitScheduler sample the value
     256              : // every 100ms, plus sample in UpdateGetAllowedWithoutPermission.
     257              : type ConcurrencyLimitScheduler struct {
     258              :         ts schedulerTimeSource
     259              :         // db is set in Register, but not protected by mu since it is strictly
     260              :         // before any calls to the other methods.
     261              :         db DBForCompaction
     262              :         mu struct {
     263              :                 sync.Mutex
     264              :                 runningCompactions int
     265              :                 // unregistered transitions once from false => true.
     266              :                 unregistered bool
     267              :                 // isGranting is used to (a) serialize granting from Done and
     268              :                 // periodicGranter, (b) ensure that granting is stopped before returning
     269              :                 // from Unregister.
     270              :                 isGranting                   bool
     271              :                 isGrantingCond               *sync.Cond
     272              :                 lastAllowedWithoutPermission int
     273              :         }
     274              :         stopPeriodicGranterCh chan struct{}
     275              :         pokePeriodicGranterCh chan struct{}
     276              :         // Only non-nil in some tests.
     277              :         periodicGranterRanChForTesting chan struct{}
     278              : }
     279              : 
     280              : var _ CompactionScheduler = &ConcurrencyLimitScheduler{}
     281              : 
     282            1 : func newConcurrencyLimitScheduler(ts schedulerTimeSource) *ConcurrencyLimitScheduler {
     283            1 :         s := &ConcurrencyLimitScheduler{
     284            1 :                 ts:                    ts,
     285            1 :                 stopPeriodicGranterCh: make(chan struct{}),
     286            1 :                 pokePeriodicGranterCh: make(chan struct{}, 1),
     287            1 :         }
     288            1 :         s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
     289            1 :         return s
     290            1 : }
     291              : 
     292            2 : func NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() *ConcurrencyLimitScheduler {
     293            2 :         s := &ConcurrencyLimitScheduler{
     294            2 :                 ts: defaultTimeSource{},
     295            2 :         }
     296            2 :         s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
     297            2 :         return s
     298            2 : }
     299              : 
     300            2 : func (s *ConcurrencyLimitScheduler) Register(numGoroutinesPerCompaction int, db DBForCompaction) {
     301            2 :         s.db = db
     302            2 :         if s.stopPeriodicGranterCh != nil {
     303            1 :                 go s.periodicGranter()
     304            1 :         }
     305            2 :         s.mu.Lock()
     306            2 :         defer s.mu.Unlock()
     307            2 :         if s.mu.unregistered {
     308            0 :                 panic("cannot reuse ConcurrencyLimitScheduler")
     309              :         }
     310              : }
     311              : 
     312            2 : func (s *ConcurrencyLimitScheduler) Unregister() {
     313            2 :         if s.stopPeriodicGranterCh != nil {
     314            1 :                 s.stopPeriodicGranterCh <- struct{}{}
     315            1 :         }
     316            2 :         s.mu.Lock()
     317            2 :         defer s.mu.Unlock()
     318            2 :         s.mu.unregistered = true
     319            2 :         // Wait until isGranting becomes false. Since unregistered has been set to
     320            2 :         // true, once isGranting becomes false, no more granting will happen.
     321            2 :         for s.mu.isGranting {
     322            2 :                 s.mu.isGrantingCond.Wait()
     323            2 :         }
     324              : }
     325              : 
     326            2 : func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle) {
     327            2 :         s.mu.Lock()
     328            2 :         defer s.mu.Unlock()
     329            2 :         if s.mu.unregistered {
     330            2 :                 return false, nil
     331            2 :         }
     332            2 :         s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
     333            2 :         if s.mu.lastAllowedWithoutPermission > s.mu.runningCompactions {
     334            2 :                 s.mu.runningCompactions++
     335            2 :                 return true, s
     336            2 :         }
     337            2 :         return false, nil
     338              : }
     339              : 
     340            2 : func (s *ConcurrencyLimitScheduler) Started()                                              {}
     341            2 : func (s *ConcurrencyLimitScheduler) MeasureCPU(CompactionGoroutineKind)                    {}
     342            2 : func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {}
     343              : 
     344            2 : func (s *ConcurrencyLimitScheduler) Done() {
     345            2 :         s.mu.Lock()
     346            2 :         s.mu.runningCompactions--
     347            2 :         s.tryGrantLockedAndUnlock()
     348            2 : }
     349              : 
     350            2 : func (s *ConcurrencyLimitScheduler) UpdateGetAllowedWithoutPermission() {
     351            2 :         s.mu.Lock()
     352            2 :         allowedWithoutPermission := s.db.GetAllowedWithoutPermission()
     353            2 :         tryGrant := allowedWithoutPermission > s.mu.lastAllowedWithoutPermission
     354            2 :         s.mu.lastAllowedWithoutPermission = allowedWithoutPermission
     355            2 :         s.mu.Unlock()
     356            2 :         if tryGrant {
     357            2 :                 select {
     358            1 :                 case s.pokePeriodicGranterCh <- struct{}{}:
     359            2 :                 default:
     360              :                 }
     361              :         }
     362              : }
     363              : 
     364            2 : func (s *ConcurrencyLimitScheduler) tryGrantLockedAndUnlock() {
     365            2 :         defer s.mu.Unlock()
     366            2 :         if s.mu.unregistered {
     367            2 :                 return
     368            2 :         }
     369              :         // Wait for turn to grant.
     370            2 :         for s.mu.isGranting {
     371            2 :                 s.mu.isGrantingCond.Wait()
     372            2 :         }
     373              :         // INVARIANT: !isGranting.
     374            2 :         if s.mu.unregistered {
     375            2 :                 return
     376            2 :         }
     377            2 :         s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
     378            2 :         toGrant := s.mu.lastAllowedWithoutPermission - s.mu.runningCompactions
     379            2 :         if toGrant > 0 {
     380            2 :                 s.mu.isGranting = true
     381            2 :         } else {
     382            2 :                 return
     383            2 :         }
     384            2 :         s.mu.Unlock()
     385            2 :         // We call GetWaitingCompaction iff we can successfully grant, so that there
     386            2 :         // is no wasted pickedCompaction.
     387            2 :         //
     388            2 :         // INVARIANT: loop exits with s.mu unlocked.
     389            2 :         for toGrant > 0 {
     390            2 :                 waiting, _ := s.db.GetWaitingCompaction()
     391            2 :                 if !waiting {
     392            2 :                         break
     393              :                 }
     394            2 :                 accepted := s.db.Schedule(s)
     395            2 :                 if !accepted {
     396            1 :                         break
     397              :                 }
     398            2 :                 s.mu.Lock()
     399            2 :                 s.mu.runningCompactions++
     400            2 :                 toGrant--
     401            2 :                 s.mu.Unlock()
     402              :         }
     403              :         // Will be unlocked by the defer statement.
     404            2 :         s.mu.Lock()
     405            2 :         s.mu.isGranting = false
     406            2 :         s.mu.isGrantingCond.Broadcast()
     407              : }
     408              : 
     409            1 : func (s *ConcurrencyLimitScheduler) periodicGranter() {
     410            1 :         ticker := s.ts.newTicker(100 * time.Millisecond)
     411            1 :         for {
     412            1 :                 select {
     413            1 :                 case <-ticker.ch():
     414            1 :                         s.mu.Lock()
     415            1 :                         s.tryGrantLockedAndUnlock()
     416            1 :                 case <-s.pokePeriodicGranterCh:
     417            1 :                         s.mu.Lock()
     418            1 :                         s.tryGrantLockedAndUnlock()
     419            1 :                 case <-s.stopPeriodicGranterCh:
     420            1 :                         ticker.stop()
     421            1 :                         return
     422              :                 }
     423            1 :                 if s.periodicGranterRanChForTesting != nil {
     424            1 :                         s.periodicGranterRanChForTesting <- struct{}{}
     425            1 :                 }
     426              :         }
     427              : }
     428              : 
     429            1 : func (s *ConcurrencyLimitScheduler) adjustRunningCompactionsForTesting(delta int) {
     430            1 :         s.mu.Lock()
     431            1 :         s.mu.runningCompactions += delta
     432            1 :         if delta < 0 {
     433            1 :                 s.tryGrantLockedAndUnlock()
     434            1 :         } else {
     435            1 :                 s.mu.Unlock()
     436            1 :         }
     437              : }
     438              : 
     439              : // schedulerTimeSource is used to abstract time.NewTicker for
     440              : // ConcurrencyLimitScheduler.
     441              : type schedulerTimeSource interface {
     442              :         newTicker(duration time.Duration) schedulerTicker
     443              : }
     444              : 
     445              : // schedulerTicker is used to abstract time.Ticker for
     446              : // ConcurrencyLimitScheduler.
     447              : type schedulerTicker interface {
     448              :         stop()
     449              :         ch() <-chan time.Time
     450              : }
     451              : 
     452              : // defaultTime is a schedulerTimeSource using the time package.
     453              : type defaultTimeSource struct{}
     454              : 
     455              : var _ schedulerTimeSource = defaultTimeSource{}
     456              : 
     457            1 : func (defaultTimeSource) newTicker(duration time.Duration) schedulerTicker {
     458            1 :         return (*defaultTicker)(time.NewTicker(duration))
     459            1 : }
     460              : 
     461              : // defaultTicker uses time.Ticker.
     462              : type defaultTicker time.Ticker
     463              : 
     464              : var _ schedulerTicker = &defaultTicker{}
     465              : 
     466            1 : func (t *defaultTicker) stop() {
     467            1 :         (*time.Ticker)(t).Stop()
     468            1 : }
     469              : 
     470            1 : func (t *defaultTicker) ch() <-chan time.Time {
     471            1 :         return (*time.Ticker)(t).C
     472            1 : }
        

Generated by: LCOV version 2.0-1