Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "sync"
9 : "time"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : )
14 :
15 : type CompactionGrantHandle = base.CompactionGrantHandle
16 : type CompactionGrantHandleStats = base.CompactionGrantHandleStats
17 : type CompactionGoroutineKind = base.CompactionGoroutineKind
18 :
19 : const (
20 : CompactionGoroutinePrimary = base.CompactionGoroutinePrimary
21 : CompactionGoroutineSSTableSecondary = base.CompactionGoroutineSSTableSecondary
22 : CompactionGoroutineBlobFileSecondary = base.CompactionGoroutineBlobFileSecondary
23 : )
24 :
25 : // NB: This interface is experimental and subject to change.
26 : //
27 : // For instance, we may incorporate more information in TrySchedule and in the
28 : // return value of Schedule to tell CompactionScheduler of the sub-category of
29 : // compaction so that the scheduler can have more granular estimates. For
30 : // example, the input or output level could affect the write bandwidth if the
31 : // inputs are better cached (say at higher levels).
32 :
33 : // CompactionScheduler is responsible for scheduling both automatic and manual
34 : // compactions. In the case of multiple DB instances on a node (i.e. a
35 : // multi-store configuration), implementations of CompactionScheduler may
36 : // enforce a global maximum compaction concurrency. Additionally,
37 : // implementations of CompactionScheduler may be resource aware and permit
38 : // more than the compactions that are "allowed without permission" if
39 : // resources are available.
40 : //
41 : // Locking: CompactionScheduler's mutexes are ordered after DBForCompaction
42 : // mutexes. We need to specify some lock ordering since CompactionScheduler
43 : // and DBForCompaction call into each other. This ordering choice is made to
44 : // simplify the implementation of DBForCompaction. There are three exceptions
45 : // to this DBForCompaction.GetAllowedWithoutPermission,
46 : // CompactionScheduler.Unregister, CompactionGrantHandle.Done -- see those
47 : // declarations for details.
48 : type CompactionScheduler interface {
49 : // Register is called to register this DB and to specify the number of
50 : // goroutines that consume CPU in each compaction (see the CPU reporting
51 : // interface, CompactionGrantHandle.MeasureCPU). Must be called exactly once
52 : // by this DB if it successfully opens.
53 : Register(numGoroutinesPerCompaction int, db DBForCompaction)
54 : // Unregister is used to unregister the DB. Must be called once when the DB
55 : // is being closed. Unregister waits until all ongoing calls to
56 : // DBForCompaction are finished, so Unregister must not be called while
57 : // holding locks that DBForCompaction acquires in those calls.
58 : Unregister()
59 : // TrySchedule is called by DB when it wants to run a compaction. The bool
60 : // is true iff permission is granted, and in that case the
61 : // CompactionGrantHandle needs to be exercised by the DB.
62 : TrySchedule() (bool, CompactionGrantHandle)
63 : // UpdateGetAllowedWithoutPermission is to inform the scheduler that some
64 : // external behavior may have caused this value to change. It exists because
65 : // flushes are not otherwise visible to the CompactionScheduler, and can
66 : // cause the value to increase. CompactionScheduler implementation should do
67 : // periodic sampling (e.g. as done by
68 : // ConcurrencyLimitScheduler.periodicGranter), but this provides an
69 : // instantaneous opportunity to act.
70 : UpdateGetAllowedWithoutPermission()
71 : }
72 :
73 : // DBForCompaction is the interface implemented by the DB to interact with the
74 : // CompactionScheduler.
75 : type DBForCompaction interface {
76 : // GetAllowedWithoutPermission returns what is permitted at the DB-level
77 : // (there may be further restrictions at the node level, when there are
78 : // multiple DBs at a node, which is not captured by this number). This can
79 : // vary based on compaction backlog or other factors. This method must not
80 : // acquire any mutex in DBForCompaction that is covered by the general mutex
81 : // ordering rule stated earlier.
82 : GetAllowedWithoutPermission() int
83 : // GetWaitingCompaction returns true iff the DB can run a compaction. The
84 : // true return is accompanied by a populated WaitingForCompaction, that the
85 : // scheduler can use to pick across DBs or other work in the system. This
86 : // method should typically be efficient, in that the DB should try to cache
87 : // some state if its previous call to TrySchedule resulted in a failure to
88 : // get permission. It is ok if it is sometimes slow since all work scheduled
89 : // by CompactionScheduler is long-lived (often executing for multiple
90 : // seconds).
91 : GetWaitingCompaction() (bool, WaitingCompaction)
92 : // Schedule grants the DB permission to run a compaction. The DB returns
93 : // true iff it accepts the grant, in which case it must exercise the
94 : // CompactionGrantHandle.
95 : Schedule(CompactionGrantHandle) bool
96 : }
97 :
98 : // WaitingCompaction captures state for a compaction that can be used to
99 : // prioritize wrt compactions in other DBs or other long-lived work in the
100 : // system.
101 : type WaitingCompaction struct {
102 : // Optional is true for a compaction that isn't necessary for maintaining an
103 : // overall healthy LSM. This value can be compared across compactions and
104 : // other long-lived work.
105 : Optional bool
106 : // Priority is the priority of a compaction. It is only compared across
107 : // compactions, and when the Optional value is the same.
108 : Priority int
109 : // Score is only compared across compactions. It is only compared across
110 : // compactions, and when the Optional and Priority are the same.
111 : Score float64
112 : }
113 :
114 : // Ordering is by priority and if the optional value is different, false is
115 : // more important than true.
116 : //
117 : // The ordering here must be consistent with the order in which compactions
118 : // are picked in compactionPickerByScore.pickAuto.
119 : type compactionOptionalAndPriority struct {
120 : optional bool
121 : priority int
122 : }
123 :
124 : var scheduledCompactionMap map[compactionKind]compactionOptionalAndPriority
125 : var manualCompactionPriority int
126 :
127 1 : func init() {
128 1 : // Manual compactions have priority just below the score-rebased
129 1 : // compactions, since DB.pickAnyCompaction first picks score-based
130 1 : // compactions, and then manual compactions.
131 1 : manualCompactionPriority = 70
132 1 : scheduledCompactionMap = map[compactionKind]compactionOptionalAndPriority{}
133 1 : // Score-based-compactions have priorities {100, 90, 80}.
134 1 : //
135 1 : // We don't actually know if it is a compactionKindMove or
136 1 : // compactionKindCopy until a compactionKindDefault is turned from a
137 1 : // pickedCompaction into a compaction struct. So we will never see those
138 1 : // values here, but for completeness we include them.
139 1 : scheduledCompactionMap[compactionKindMove] = compactionOptionalAndPriority{priority: 100}
140 1 : scheduledCompactionMap[compactionKindCopy] = compactionOptionalAndPriority{priority: 90}
141 1 : scheduledCompactionMap[compactionKindDefault] = compactionOptionalAndPriority{priority: 80}
142 1 : scheduledCompactionMap[compactionKindTombstoneDensity] =
143 1 : compactionOptionalAndPriority{optional: true, priority: 60}
144 1 : scheduledCompactionMap[compactionKindElisionOnly] =
145 1 : compactionOptionalAndPriority{optional: true, priority: 50}
146 1 : scheduledCompactionMap[compactionKindRead] =
147 1 : compactionOptionalAndPriority{optional: true, priority: 40}
148 1 : scheduledCompactionMap[compactionKindRewrite] =
149 1 : compactionOptionalAndPriority{optional: true, priority: 30}
150 1 : }
151 :
152 1 : func makeWaitingCompaction(manual bool, kind compactionKind, score float64) WaitingCompaction {
153 1 : if manual {
154 1 : return WaitingCompaction{Priority: manualCompactionPriority, Score: score}
155 1 : }
156 1 : entry, ok := scheduledCompactionMap[kind]
157 1 : if !ok {
158 0 : panic(errors.AssertionFailedf("unexpected compactionKind %s", kind))
159 : }
160 1 : return WaitingCompaction{Optional: entry.optional, Priority: entry.priority, Score: score}
161 : }
162 :
163 : // noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
164 : type noopGrantHandle struct{}
165 :
166 : var _ CompactionGrantHandle = noopGrantHandle{}
167 :
168 1 : func (h noopGrantHandle) Started() {}
169 1 : func (h noopGrantHandle) MeasureCPU(CompactionGoroutineKind) {}
170 1 : func (h noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {}
171 1 : func (h noopGrantHandle) Done() {}
172 :
173 : // pickedCompactionCache is used to avoid the work of repeatedly picking a
174 : // compaction that then fails to run immediately because TrySchedule returns
175 : // false.
176 : //
177 : // The high-level approach is to construct a pickedCompaction in
178 : // DB.maybeScheduleCompaction if there isn't one in the cache, and if
179 : // TrySchedule returns false, to remember it. Ignoring flushes, the worst-case
180 : // behavior is 1 of 2 pickedCompactions gets to run (so half the picking work
181 : // is wasted). This worst-case happens when the system is running at the limit
182 : // of the long-lived work (including compactions) it can support. In this
183 : // setting, each started compaction invalidates the pickedCompaction in the
184 : // cache when it completes, and the reason the cache has a pickedCompaction
185 : // (that got invalidated) is that the CompactionScheduler called
186 : // GetWaitingCompaction and decided not to run the pickedCompaction (some
187 : // other work won). We consider the CPU overhead of this waste acceptable.
188 : //
189 : // For the default case of a ConcurrencyLimitScheduler, which only considers a
190 : // single DB, the aforementioned worst-case is avoided by not constructing a
191 : // new pickedCompaction in DB.maybeScheduleCompaction when
192 : // pickedCompactionCache.isWaiting is already true (which became true once,
193 : // when a backlog developed). Whenever a compaction completes and a new
194 : // compaction can be started, the call to DBForCompaction.GetWaitingCompaction
195 : // constructs a new pickedCompaction and caches it, and then this immediately
196 : // gets to run when DBForCompaction.Schedule is called.
197 : type pickedCompactionCache struct {
198 : // pc != nil => waiting.
199 : //
200 : // It is acceptable for waiting to be true and pc to be nil, when pc is
201 : // invalidated due to starting a compaction, or completing a
202 : // compaction/flush (since it changes the latest version).
203 : waiting bool
204 : pc *pickedCompaction
205 : }
206 :
207 : // invalidate the cache because a new Version is installed or a compaction is
208 : // started (since a new in-progress compaction affects future compaction
209 : // picking). The value of waiting is not changed.
210 1 : func (c *pickedCompactionCache) invalidate() {
211 1 : c.pc = nil
212 1 : }
213 :
214 : // isWaiting returns the value of waiting.
215 1 : func (c *pickedCompactionCache) isWaiting() bool {
216 1 : return c.waiting
217 1 : }
218 :
219 : // getForRunning returns a pickedCompaction if in the cache. The cache is
220 : // cleared. It may return nil.
221 1 : func (c *pickedCompactionCache) getForRunning() *pickedCompaction {
222 1 : // NB: This does not set c.waiting = false, since there may be more
223 1 : // compactions to run.
224 1 : pc := c.pc
225 1 : c.pc = nil
226 1 : return pc
227 1 : }
228 :
229 : // setNotWaiting sets waiting to false.
230 1 : func (c *pickedCompactionCache) setNotWaiting() {
231 1 : c.waiting = false
232 1 : c.pc = nil
233 1 : }
234 :
235 : // peek return the pickedCompaction, if any, in the cache.
236 1 : func (c *pickedCompactionCache) peek() *pickedCompaction {
237 1 : return c.pc
238 1 : }
239 :
240 : // add adds a pickedCompaction to the cache and sets waiting to true.
241 1 : func (c *pickedCompactionCache) add(pc *pickedCompaction) {
242 1 : c.waiting = true
243 1 : c.pc = pc
244 1 : }
245 :
246 : // ConcurrencyLimitScheduler is the default scheduler used by Pebble. It
247 : // simply uses the concurrency limit retrieved from
248 : // DBForCompaction.GetAllowedWithoutPermission to decide the number of
249 : // compactions to schedule. ConcurrencyLimitScheduler must have its Register
250 : // method called at most once -- i.e., it cannot be reused across DBs.
251 : //
252 : // Since the GetAllowedWithoutPermission value changes over time, the
253 : // scheduler needs to be quite current in its sampling, especially if the
254 : // value is increasing, to prevent lag in scheduling compactions. Calls to
255 : // ConcurrencyLimitScheduler.Done and ConcurrencyLimitScheduler.TrySchedule
256 : // are obvious places this value is sampled. However, since
257 : // ConcurrencyLimitScheduler does not observe flushes (which can increase the
258 : // value), and there can be situations where compactions last 10+ seconds,
259 : // this sampling is not considered sufficient. Note that calls to
260 : // ConcurrencyLimitScheduler.TrySchedule are dampened in
261 : // DB.maybeScheduleCompaction when there is a waiting compaction (to prevent
262 : // wasted computation of pickedCompaction). If DB.maybeScheduleCompaction
263 : // always called ConcurrencyLimitScheduler.TrySchedule we would have no lag as
264 : // DB.maybeScheduleCompaction is called on flush completion. Hence, we resort
265 : // to having a background thread in ConcurrencyLimitScheduler sample the value
266 : // every 100ms, plus sample in UpdateGetAllowedWithoutPermission.
267 : type ConcurrencyLimitScheduler struct {
268 : ts schedulerTimeSource
269 : // db is set in Register, but not protected by mu since it is strictly
270 : // before any calls to the other methods.
271 : db DBForCompaction
272 : mu struct {
273 : sync.Mutex
274 : runningCompactions int
275 : // unregistered transitions once from false => true.
276 : unregistered bool
277 : // isGranting is used to (a) serialize granting from Done and
278 : // periodicGranter, (b) ensure that granting is stopped before returning
279 : // from Unregister.
280 : isGranting bool
281 : isGrantingCond *sync.Cond
282 : lastAllowedWithoutPermission int
283 : }
284 : stopPeriodicGranterCh chan struct{}
285 : pokePeriodicGranterCh chan struct{}
286 : // Only non-nil in some tests.
287 : periodicGranterRanChForTesting chan struct{}
288 : }
289 :
290 : var _ CompactionScheduler = &ConcurrencyLimitScheduler{}
291 :
292 1 : func newConcurrencyLimitScheduler(ts schedulerTimeSource) *ConcurrencyLimitScheduler {
293 1 : s := &ConcurrencyLimitScheduler{
294 1 : ts: ts,
295 1 : stopPeriodicGranterCh: make(chan struct{}),
296 1 : pokePeriodicGranterCh: make(chan struct{}, 1),
297 1 : }
298 1 : s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
299 1 : return s
300 1 : }
301 :
302 1 : func NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() *ConcurrencyLimitScheduler {
303 1 : s := &ConcurrencyLimitScheduler{
304 1 : ts: defaultTimeSource{},
305 1 : }
306 1 : s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
307 1 : return s
308 1 : }
309 :
310 1 : func (s *ConcurrencyLimitScheduler) Register(numGoroutinesPerCompaction int, db DBForCompaction) {
311 1 : s.db = db
312 1 : if s.stopPeriodicGranterCh != nil {
313 1 : go s.periodicGranter()
314 1 : }
315 : }
316 :
317 1 : func (s *ConcurrencyLimitScheduler) Unregister() {
318 1 : if s.stopPeriodicGranterCh != nil {
319 1 : s.stopPeriodicGranterCh <- struct{}{}
320 1 : }
321 1 : s.mu.Lock()
322 1 : defer s.mu.Unlock()
323 1 : s.mu.unregistered = true
324 1 : // Wait until isGranting becomes false. Since unregistered has been set to
325 1 : // true, once isGranting becomes false, no more granting will happen.
326 1 : for s.mu.isGranting {
327 0 : s.mu.isGrantingCond.Wait()
328 0 : }
329 : }
330 :
331 1 : func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle) {
332 1 : s.mu.Lock()
333 1 : defer s.mu.Unlock()
334 1 : if s.mu.unregistered {
335 1 : return false, nil
336 1 : }
337 1 : s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
338 1 : if s.mu.lastAllowedWithoutPermission > s.mu.runningCompactions {
339 1 : s.mu.runningCompactions++
340 1 : return true, s
341 1 : }
342 1 : return false, nil
343 : }
344 :
345 1 : func (s *ConcurrencyLimitScheduler) Started() {}
346 1 : func (s *ConcurrencyLimitScheduler) MeasureCPU(CompactionGoroutineKind) {}
347 1 : func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {}
348 :
349 1 : func (s *ConcurrencyLimitScheduler) Done() {
350 1 : s.mu.Lock()
351 1 : s.mu.runningCompactions--
352 1 : s.tryGrantLockedAndUnlock()
353 1 : }
354 :
355 1 : func (s *ConcurrencyLimitScheduler) UpdateGetAllowedWithoutPermission() {
356 1 : s.mu.Lock()
357 1 : allowedWithoutPermission := s.db.GetAllowedWithoutPermission()
358 1 : tryGrant := allowedWithoutPermission > s.mu.lastAllowedWithoutPermission
359 1 : s.mu.lastAllowedWithoutPermission = allowedWithoutPermission
360 1 : s.mu.Unlock()
361 1 : if tryGrant {
362 1 : select {
363 1 : case s.pokePeriodicGranterCh <- struct{}{}:
364 1 : default:
365 : }
366 : }
367 : }
368 :
369 1 : func (s *ConcurrencyLimitScheduler) tryGrantLockedAndUnlock() {
370 1 : defer s.mu.Unlock()
371 1 : if s.mu.unregistered {
372 1 : return
373 1 : }
374 : // Wait for turn to grant.
375 1 : for s.mu.isGranting {
376 1 : s.mu.isGrantingCond.Wait()
377 1 : }
378 : // INVARIANT: !isGranting.
379 1 : if s.mu.unregistered {
380 1 : return
381 1 : }
382 1 : s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
383 1 : toGrant := s.mu.lastAllowedWithoutPermission - s.mu.runningCompactions
384 1 : if toGrant > 0 {
385 1 : s.mu.isGranting = true
386 1 : } else {
387 1 : return
388 1 : }
389 1 : s.mu.Unlock()
390 1 : // We call GetWaitingCompaction iff we can successfully grant, so that there
391 1 : // is no wasted pickedCompaction.
392 1 : //
393 1 : // INVARIANT: loop exits with s.mu unlocked.
394 1 : for toGrant > 0 {
395 1 : waiting, _ := s.db.GetWaitingCompaction()
396 1 : if !waiting {
397 1 : break
398 : }
399 1 : accepted := s.db.Schedule(s)
400 1 : if !accepted {
401 0 : break
402 : }
403 1 : s.mu.Lock()
404 1 : s.mu.runningCompactions++
405 1 : toGrant--
406 1 : s.mu.Unlock()
407 : }
408 : // Will be unlocked by the defer statement.
409 1 : s.mu.Lock()
410 1 : s.mu.isGranting = false
411 1 : s.mu.isGrantingCond.Broadcast()
412 : }
413 :
414 1 : func (s *ConcurrencyLimitScheduler) periodicGranter() {
415 1 : ticker := s.ts.newTicker(100 * time.Millisecond)
416 1 : for {
417 1 : select {
418 1 : case <-ticker.ch():
419 1 : s.mu.Lock()
420 1 : s.tryGrantLockedAndUnlock()
421 1 : case <-s.pokePeriodicGranterCh:
422 1 : s.mu.Lock()
423 1 : s.tryGrantLockedAndUnlock()
424 1 : case <-s.stopPeriodicGranterCh:
425 1 : ticker.stop()
426 1 : return
427 : }
428 1 : if s.periodicGranterRanChForTesting != nil {
429 1 : s.periodicGranterRanChForTesting <- struct{}{}
430 1 : }
431 : }
432 : }
433 :
434 1 : func (s *ConcurrencyLimitScheduler) adjustRunningCompactionsForTesting(delta int) {
435 1 : s.mu.Lock()
436 1 : s.mu.runningCompactions += delta
437 1 : if delta < 0 {
438 1 : s.tryGrantLockedAndUnlock()
439 1 : } else {
440 1 : s.mu.Unlock()
441 1 : }
442 : }
443 :
444 1 : func (s *ConcurrencyLimitScheduler) isUnregisteredForTesting() bool {
445 1 : s.mu.Lock()
446 1 : defer s.mu.Unlock()
447 1 : return s.mu.unregistered
448 1 : }
449 :
450 : // schedulerTimeSource is used to abstract time.NewTicker for
451 : // ConcurrencyLimitScheduler.
452 : type schedulerTimeSource interface {
453 : newTicker(duration time.Duration) schedulerTicker
454 : }
455 :
456 : // schedulerTicker is used to abstract time.Ticker for
457 : // ConcurrencyLimitScheduler.
458 : type schedulerTicker interface {
459 : stop()
460 : ch() <-chan time.Time
461 : }
462 :
463 : // defaultTime is a schedulerTimeSource using the time package.
464 : type defaultTimeSource struct{}
465 :
466 : var _ schedulerTimeSource = defaultTimeSource{}
467 :
468 1 : func (defaultTimeSource) newTicker(duration time.Duration) schedulerTicker {
469 1 : return (*defaultTicker)(time.NewTicker(duration))
470 1 : }
471 :
472 : // defaultTicker uses time.Ticker.
473 : type defaultTicker time.Ticker
474 :
475 : var _ schedulerTicker = &defaultTicker{}
476 :
477 1 : func (t *defaultTicker) stop() {
478 1 : (*time.Ticker)(t).Stop()
479 1 : }
480 :
481 1 : func (t *defaultTicker) ch() <-chan time.Time {
482 1 : return (*time.Ticker)(t).C
483 1 : }
|