Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "sync"
9 : "time"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : )
13 :
14 : type CompactionGrantHandle = base.CompactionGrantHandle
15 : type CompactionGrantHandleStats = base.CompactionGrantHandleStats
16 : type CompactionGoroutineKind = base.CompactionGoroutineKind
17 :
18 : const (
19 : CompactionGoroutinePrimary = base.CompactionGoroutinePrimary
20 : CompactionGoroutineSSTableSecondary = base.CompactionGoroutineSSTableSecondary
21 : CompactionGoroutineBlobFileSecondary = base.CompactionGoroutineBlobFileSecondary
22 : )
23 :
24 : // NB: This interface is experimental and subject to change.
25 : //
26 : // For instance, we may incorporate more information in TrySchedule and in the
27 : // return value of Schedule to tell CompactionScheduler of the sub-category of
28 : // compaction so that the scheduler can have more granular estimates. For
29 : // example, the input or output level could affect the write bandwidth if the
30 : // inputs are better cached (say at higher levels).
31 :
32 : // CompactionScheduler is responsible for scheduling both automatic and manual
33 : // compactions. In the case of multiple DB instances on a node (i.e. a
34 : // multi-store configuration), implementations of CompactionScheduler may
35 : // enforce a global maximum compaction concurrency. Additionally,
36 : // implementations of CompactionScheduler may be resource aware and permit
37 : // more than the compactions that are "allowed without permission" if
38 : // resources are available.
39 : //
40 : // Locking: CompactionScheduler's mutexes are ordered after DBForCompaction
41 : // mutexes. We need to specify some lock ordering since CompactionScheduler
42 : // and DBForCompaction call into each other. This ordering choice is made to
43 : // simplify the implementation of DBForCompaction. There are three exceptions
44 : // to this DBForCompaction.GetAllowedWithoutPermission,
45 : // CompactionScheduler.Unregister, CompactionGrantHandle.Done -- see those
46 : // declarations for details.
47 : type CompactionScheduler interface {
48 : // Register is called to register this DB and to specify the number of
49 : // goroutines that consume CPU in each compaction (see the CPU reporting
50 : // interface, CompactionGrantHandle.MeasureCPU). Must be called exactly once
51 : // by this DB if it successfully opens.
52 : Register(numGoroutinesPerCompaction int, db DBForCompaction)
53 : // Unregister is used to unregister the DB. Must be called once when the DB
54 : // is being closed. Unregister waits until all ongoing calls to
55 : // DBForCompaction are finished, so Unregister must not be called while
56 : // holding locks that DBForCompaction acquires in those calls.
57 : Unregister()
58 : // TrySchedule is called by DB when it wants to run a compaction. The bool
59 : // is true iff permission is granted, and in that case the
60 : // CompactionGrantHandle needs to be exercised by the DB.
61 : TrySchedule() (bool, CompactionGrantHandle)
62 : // UpdateGetAllowedWithoutPermission is to inform the scheduler that some
63 : // external behavior may have caused this value to change. It exists because
64 : // flushes are not otherwise visible to the CompactionScheduler, and can
65 : // cause the value to increase. CompactionScheduler implementation should do
66 : // periodic sampling (e.g. as done by
67 : // ConcurrencyLimitScheduler.periodicGranter), but this provides an
68 : // instantaneous opportunity to act.
69 : UpdateGetAllowedWithoutPermission()
70 : }
71 :
72 : // DBForCompaction is the interface implemented by the DB to interact with the
73 : // CompactionScheduler.
74 : type DBForCompaction interface {
75 : // GetAllowedWithoutPermission returns what is permitted at the DB-level
76 : // (there may be further restrictions at the node level, when there are
77 : // multiple DBs at a node, which is not captured by this number). This can
78 : // vary based on compaction backlog or other factors. This method must not
79 : // acquire any mutex in DBForCompaction that is covered by the general mutex
80 : // ordering rule stated earlier.
81 : GetAllowedWithoutPermission() int
82 : // GetWaitingCompaction returns true iff the DB can run a compaction. The
83 : // true return is accompanied by a populated WaitingForCompaction, that the
84 : // scheduler can use to pick across DBs or other work in the system. This
85 : // method should typically be efficient, in that the DB should try to cache
86 : // some state if its previous call to TrySchedule resulted in a failure to
87 : // get permission. It is ok if it is sometimes slow since all work scheduled
88 : // by CompactionScheduler is long-lived (often executing for multiple
89 : // seconds).
90 : GetWaitingCompaction() (bool, WaitingCompaction)
91 : // Schedule grants the DB permission to run a compaction. The DB returns
92 : // true iff it accepts the grant, in which case it must exercise the
93 : // CompactionGrantHandle.
94 : Schedule(CompactionGrantHandle) bool
95 : }
96 :
97 : // WaitingCompaction captures state for a compaction that can be used to
98 : // prioritize wrt compactions in other DBs or other long-lived work in the
99 : // system.
100 : type WaitingCompaction struct {
101 : // Optional is true for a compaction that isn't necessary for maintaining an
102 : // overall healthy LSM. This value can be compared across compactions and
103 : // other long-lived work.
104 : Optional bool
105 : // Priority is the priority of a compaction. It is only compared across
106 : // compactions, and when the Optional value is the same.
107 : Priority int
108 : // Score is only compared across compactions. It is only compared across
109 : // compactions, and when the Optional and Priority are the same.
110 : Score float64
111 : }
112 :
113 : // Ordering is by priority and if the optional value is different, false is
114 : // more important than true.
115 : //
116 : // The ordering here must be consistent with the order in which compactions
117 : // are picked in compactionPickerByScore.pickAuto.
118 : type compactionOptionalAndPriority struct {
119 : optional bool
120 : priority int
121 : }
122 :
123 : var scheduledCompactionMap map[compactionKind]compactionOptionalAndPriority
124 : var manualCompactionPriority int
125 :
126 1 : func init() {
127 1 : // Manual compactions have priority just below the score-rebased
128 1 : // compactions, since DB.pickAnyCompaction first picks score-based
129 1 : // compactions, and then manual compactions.
130 1 : manualCompactionPriority = 70
131 1 : scheduledCompactionMap = map[compactionKind]compactionOptionalAndPriority{}
132 1 : // Score-based-compactions have priorities {100, 90, 80}.
133 1 : //
134 1 : // We don't actually know if it is a compactionKindMove or
135 1 : // compactionKindCopy until a compactionKindDefault is turned from a
136 1 : // pickedCompaction into a compaction struct. So we will never see those
137 1 : // values here, but for completeness we include them.
138 1 : scheduledCompactionMap[compactionKindMove] = compactionOptionalAndPriority{priority: 100}
139 1 : scheduledCompactionMap[compactionKindCopy] = compactionOptionalAndPriority{priority: 90}
140 1 : scheduledCompactionMap[compactionKindDefault] = compactionOptionalAndPriority{priority: 80}
141 1 : scheduledCompactionMap[compactionKindTombstoneDensity] =
142 1 : compactionOptionalAndPriority{optional: true, priority: 60}
143 1 : scheduledCompactionMap[compactionKindElisionOnly] =
144 1 : compactionOptionalAndPriority{optional: true, priority: 50}
145 1 : scheduledCompactionMap[compactionKindBlobFileRewrite] =
146 1 : compactionOptionalAndPriority{optional: true, priority: 40}
147 1 : scheduledCompactionMap[compactionKindRead] =
148 1 : compactionOptionalAndPriority{optional: true, priority: 30}
149 1 : scheduledCompactionMap[compactionKindRewrite] =
150 1 : compactionOptionalAndPriority{optional: true, priority: 20}
151 1 : }
152 :
153 : // noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
154 : type noopGrantHandle struct{}
155 :
156 : var _ CompactionGrantHandle = noopGrantHandle{}
157 :
158 1 : func (h noopGrantHandle) Started() {}
159 1 : func (h noopGrantHandle) MeasureCPU(CompactionGoroutineKind) {}
160 1 : func (h noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {}
161 1 : func (h noopGrantHandle) Done() {}
162 :
163 : // pickedCompactionCache is used to avoid the work of repeatedly picking a
164 : // compaction that then fails to run immediately because TrySchedule returns
165 : // false.
166 : //
167 : // The high-level approach is to construct a pickedCompaction in
168 : // DB.maybeScheduleCompaction if there isn't one in the cache, and if
169 : // TrySchedule returns false, to remember it. Ignoring flushes, the worst-case
170 : // behavior is 1 of 2 pickedCompactions gets to run (so half the picking work
171 : // is wasted). This worst-case happens when the system is running at the limit
172 : // of the long-lived work (including compactions) it can support. In this
173 : // setting, each started compaction invalidates the pickedCompaction in the
174 : // cache when it completes, and the reason the cache has a pickedCompaction
175 : // (that got invalidated) is that the CompactionScheduler called
176 : // GetWaitingCompaction and decided not to run the pickedCompaction (some
177 : // other work won). We consider the CPU overhead of this waste acceptable.
178 : //
179 : // For the default case of a ConcurrencyLimitScheduler, which only considers a
180 : // single DB, the aforementioned worst-case is avoided by not constructing a
181 : // new pickedCompaction in DB.maybeScheduleCompaction when
182 : // pickedCompactionCache.isWaiting is already true (which became true once,
183 : // when a backlog developed). Whenever a compaction completes and a new
184 : // compaction can be started, the call to DBForCompaction.GetWaitingCompaction
185 : // constructs a new pickedCompaction and caches it, and then this immediately
186 : // gets to run when DBForCompaction.Schedule is called.
187 : type pickedCompactionCache struct {
188 : // pc != nil => waiting.
189 : //
190 : // It is acceptable for waiting to be true and pc to be nil, when pc is
191 : // invalidated due to starting a compaction, or completing a
192 : // compaction/flush (since it changes the latest version).
193 : waiting bool
194 : pc pickedCompaction
195 : }
196 :
197 : // invalidate the cache because a new Version is installed or a compaction is
198 : // started (since a new in-progress compaction affects future compaction
199 : // picking). The value of waiting is not changed.
200 1 : func (c *pickedCompactionCache) invalidate() {
201 1 : c.pc = nil
202 1 : }
203 :
204 : // isWaiting returns the value of waiting.
205 1 : func (c *pickedCompactionCache) isWaiting() bool {
206 1 : return c.waiting
207 1 : }
208 :
209 : // getForRunning returns a pickedCompaction if in the cache. The cache is
210 : // cleared. It may return nil.
211 1 : func (c *pickedCompactionCache) getForRunning() pickedCompaction {
212 1 : // NB: This does not set c.waiting = false, since there may be more
213 1 : // compactions to run.
214 1 : pc := c.pc
215 1 : c.pc = nil
216 1 : return pc
217 1 : }
218 :
219 : // setNotWaiting sets waiting to false.
220 1 : func (c *pickedCompactionCache) setNotWaiting() {
221 1 : c.waiting = false
222 1 : c.pc = nil
223 1 : }
224 :
225 : // peek return the pickedCompaction, if any, in the cache.
226 1 : func (c *pickedCompactionCache) peek() pickedCompaction {
227 1 : return c.pc
228 1 : }
229 :
230 : // add adds a pickedCompaction to the cache and sets waiting to true.
231 1 : func (c *pickedCompactionCache) add(pc pickedCompaction) {
232 1 : c.waiting = true
233 1 : c.pc = pc
234 1 : }
235 :
236 : // ConcurrencyLimitScheduler is the default scheduler used by Pebble. It
237 : // simply uses the concurrency limit retrieved from
238 : // DBForCompaction.GetAllowedWithoutPermission to decide the number of
239 : // compactions to schedule. ConcurrencyLimitScheduler must have its Register
240 : // method called at most once -- i.e., it cannot be reused across DBs.
241 : //
242 : // Since the GetAllowedWithoutPermission value changes over time, the
243 : // scheduler needs to be quite current in its sampling, especially if the
244 : // value is increasing, to prevent lag in scheduling compactions. Calls to
245 : // ConcurrencyLimitScheduler.Done and ConcurrencyLimitScheduler.TrySchedule
246 : // are obvious places this value is sampled. However, since
247 : // ConcurrencyLimitScheduler does not observe flushes (which can increase the
248 : // value), and there can be situations where compactions last 10+ seconds,
249 : // this sampling is not considered sufficient. Note that calls to
250 : // ConcurrencyLimitScheduler.TrySchedule are dampened in
251 : // DB.maybeScheduleCompaction when there is a waiting compaction (to prevent
252 : // wasted computation of pickedCompaction). If DB.maybeScheduleCompaction
253 : // always called ConcurrencyLimitScheduler.TrySchedule we would have no lag as
254 : // DB.maybeScheduleCompaction is called on flush completion. Hence, we resort
255 : // to having a background thread in ConcurrencyLimitScheduler sample the value
256 : // every 100ms, plus sample in UpdateGetAllowedWithoutPermission.
257 : type ConcurrencyLimitScheduler struct {
258 : ts schedulerTimeSource
259 : // db is set in Register, but not protected by mu since it is strictly
260 : // before any calls to the other methods.
261 : db DBForCompaction
262 : mu struct {
263 : sync.Mutex
264 : runningCompactions int
265 : // unregistered transitions once from false => true.
266 : unregistered bool
267 : // isGranting is used to (a) serialize granting from Done and
268 : // periodicGranter, (b) ensure that granting is stopped before returning
269 : // from Unregister.
270 : isGranting bool
271 : isGrantingCond *sync.Cond
272 : lastAllowedWithoutPermission int
273 : }
274 : stopPeriodicGranterCh chan struct{}
275 : pokePeriodicGranterCh chan struct{}
276 : // Only non-nil in some tests.
277 : periodicGranterRanChForTesting chan struct{}
278 : }
279 :
280 : var _ CompactionScheduler = &ConcurrencyLimitScheduler{}
281 :
282 0 : func newConcurrencyLimitScheduler(ts schedulerTimeSource) *ConcurrencyLimitScheduler {
283 0 : s := &ConcurrencyLimitScheduler{
284 0 : ts: ts,
285 0 : stopPeriodicGranterCh: make(chan struct{}),
286 0 : pokePeriodicGranterCh: make(chan struct{}, 1),
287 0 : }
288 0 : s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
289 0 : return s
290 0 : }
291 :
292 1 : func NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() *ConcurrencyLimitScheduler {
293 1 : s := &ConcurrencyLimitScheduler{
294 1 : ts: defaultTimeSource{},
295 1 : }
296 1 : s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
297 1 : return s
298 1 : }
299 :
300 1 : func (s *ConcurrencyLimitScheduler) Register(numGoroutinesPerCompaction int, db DBForCompaction) {
301 1 : s.db = db
302 1 : if s.stopPeriodicGranterCh != nil {
303 0 : go s.periodicGranter()
304 0 : }
305 : }
306 :
307 1 : func (s *ConcurrencyLimitScheduler) Unregister() {
308 1 : if s.stopPeriodicGranterCh != nil {
309 0 : s.stopPeriodicGranterCh <- struct{}{}
310 0 : }
311 1 : s.mu.Lock()
312 1 : defer s.mu.Unlock()
313 1 : s.mu.unregistered = true
314 1 : // Wait until isGranting becomes false. Since unregistered has been set to
315 1 : // true, once isGranting becomes false, no more granting will happen.
316 1 : for s.mu.isGranting {
317 1 : s.mu.isGrantingCond.Wait()
318 1 : }
319 : }
320 :
321 1 : func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle) {
322 1 : s.mu.Lock()
323 1 : defer s.mu.Unlock()
324 1 : if s.mu.unregistered {
325 1 : return false, nil
326 1 : }
327 1 : s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
328 1 : if s.mu.lastAllowedWithoutPermission > s.mu.runningCompactions {
329 1 : s.mu.runningCompactions++
330 1 : return true, s
331 1 : }
332 1 : return false, nil
333 : }
334 :
335 1 : func (s *ConcurrencyLimitScheduler) Started() {}
336 1 : func (s *ConcurrencyLimitScheduler) MeasureCPU(CompactionGoroutineKind) {}
337 1 : func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {}
338 :
339 1 : func (s *ConcurrencyLimitScheduler) Done() {
340 1 : s.mu.Lock()
341 1 : s.mu.runningCompactions--
342 1 : s.tryGrantLockedAndUnlock()
343 1 : }
344 :
345 1 : func (s *ConcurrencyLimitScheduler) UpdateGetAllowedWithoutPermission() {
346 1 : s.mu.Lock()
347 1 : allowedWithoutPermission := s.db.GetAllowedWithoutPermission()
348 1 : tryGrant := allowedWithoutPermission > s.mu.lastAllowedWithoutPermission
349 1 : s.mu.lastAllowedWithoutPermission = allowedWithoutPermission
350 1 : s.mu.Unlock()
351 1 : if tryGrant {
352 1 : select {
353 0 : case s.pokePeriodicGranterCh <- struct{}{}:
354 1 : default:
355 : }
356 : }
357 : }
358 :
359 1 : func (s *ConcurrencyLimitScheduler) tryGrantLockedAndUnlock() {
360 1 : defer s.mu.Unlock()
361 1 : if s.mu.unregistered {
362 1 : return
363 1 : }
364 : // Wait for turn to grant.
365 1 : for s.mu.isGranting {
366 1 : s.mu.isGrantingCond.Wait()
367 1 : }
368 : // INVARIANT: !isGranting.
369 1 : if s.mu.unregistered {
370 1 : return
371 1 : }
372 1 : s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
373 1 : toGrant := s.mu.lastAllowedWithoutPermission - s.mu.runningCompactions
374 1 : if toGrant > 0 {
375 1 : s.mu.isGranting = true
376 1 : } else {
377 1 : return
378 1 : }
379 1 : s.mu.Unlock()
380 1 : // We call GetWaitingCompaction iff we can successfully grant, so that there
381 1 : // is no wasted pickedCompaction.
382 1 : //
383 1 : // INVARIANT: loop exits with s.mu unlocked.
384 1 : for toGrant > 0 {
385 1 : waiting, _ := s.db.GetWaitingCompaction()
386 1 : if !waiting {
387 1 : break
388 : }
389 1 : accepted := s.db.Schedule(s)
390 1 : if !accepted {
391 1 : break
392 : }
393 1 : s.mu.Lock()
394 1 : s.mu.runningCompactions++
395 1 : toGrant--
396 1 : s.mu.Unlock()
397 : }
398 : // Will be unlocked by the defer statement.
399 1 : s.mu.Lock()
400 1 : s.mu.isGranting = false
401 1 : s.mu.isGrantingCond.Broadcast()
402 : }
403 :
404 0 : func (s *ConcurrencyLimitScheduler) periodicGranter() {
405 0 : ticker := s.ts.newTicker(100 * time.Millisecond)
406 0 : for {
407 0 : select {
408 0 : case <-ticker.ch():
409 0 : s.mu.Lock()
410 0 : s.tryGrantLockedAndUnlock()
411 0 : case <-s.pokePeriodicGranterCh:
412 0 : s.mu.Lock()
413 0 : s.tryGrantLockedAndUnlock()
414 0 : case <-s.stopPeriodicGranterCh:
415 0 : ticker.stop()
416 0 : return
417 : }
418 0 : if s.periodicGranterRanChForTesting != nil {
419 0 : s.periodicGranterRanChForTesting <- struct{}{}
420 0 : }
421 : }
422 : }
423 :
424 0 : func (s *ConcurrencyLimitScheduler) adjustRunningCompactionsForTesting(delta int) {
425 0 : s.mu.Lock()
426 0 : s.mu.runningCompactions += delta
427 0 : if delta < 0 {
428 0 : s.tryGrantLockedAndUnlock()
429 0 : } else {
430 0 : s.mu.Unlock()
431 0 : }
432 : }
433 :
434 0 : func (s *ConcurrencyLimitScheduler) isUnregisteredForTesting() bool {
435 0 : s.mu.Lock()
436 0 : defer s.mu.Unlock()
437 0 : return s.mu.unregistered
438 0 : }
439 :
440 : // schedulerTimeSource is used to abstract time.NewTicker for
441 : // ConcurrencyLimitScheduler.
442 : type schedulerTimeSource interface {
443 : newTicker(duration time.Duration) schedulerTicker
444 : }
445 :
446 : // schedulerTicker is used to abstract time.Ticker for
447 : // ConcurrencyLimitScheduler.
448 : type schedulerTicker interface {
449 : stop()
450 : ch() <-chan time.Time
451 : }
452 :
453 : // defaultTime is a schedulerTimeSource using the time package.
454 : type defaultTimeSource struct{}
455 :
456 : var _ schedulerTimeSource = defaultTimeSource{}
457 :
458 0 : func (defaultTimeSource) newTicker(duration time.Duration) schedulerTicker {
459 0 : return (*defaultTicker)(time.NewTicker(duration))
460 0 : }
461 :
462 : // defaultTicker uses time.Ticker.
463 : type defaultTicker time.Ticker
464 :
465 : var _ schedulerTicker = &defaultTicker{}
466 :
467 0 : func (t *defaultTicker) stop() {
468 0 : (*time.Ticker)(t).Stop()
469 0 : }
470 :
471 0 : func (t *defaultTicker) ch() <-chan time.Time {
472 0 : return (*time.Ticker)(t).C
473 0 : }
|