Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "sync"
9 : "time"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : )
13 :
14 : type CompactionGrantHandle = base.CompactionGrantHandle
15 : type CompactionGrantHandleStats = base.CompactionGrantHandleStats
16 : type CompactionGoroutineKind = base.CompactionGoroutineKind
17 :
18 : const (
19 : CompactionGoroutinePrimary = base.CompactionGoroutinePrimary
20 : CompactionGoroutineSSTableSecondary = base.CompactionGoroutineSSTableSecondary
21 : CompactionGoroutineBlobFileSecondary = base.CompactionGoroutineBlobFileSecondary
22 : )
23 :
24 : // NB: This interface is experimental and subject to change.
25 : //
26 : // For instance, we may incorporate more information in TrySchedule and in the
27 : // return value of Schedule to tell CompactionScheduler of the sub-category of
28 : // compaction so that the scheduler can have more granular estimates. For
29 : // example, the input or output level could affect the write bandwidth if the
30 : // inputs are better cached (say at higher levels).
31 :
32 : // CompactionScheduler is responsible for scheduling both automatic and manual
33 : // compactions. In the case of multiple DB instances on a node (i.e. a
34 : // multi-store configuration), implementations of CompactionScheduler may
35 : // enforce a global maximum compaction concurrency. Additionally,
36 : // implementations of CompactionScheduler may be resource aware and permit
37 : // more than the compactions that are "allowed without permission" if
38 : // resources are available.
39 : //
40 : // Locking: CompactionScheduler's mutexes are ordered after DBForCompaction
41 : // mutexes. We need to specify some lock ordering since CompactionScheduler
42 : // and DBForCompaction call into each other. This ordering choice is made to
43 : // simplify the implementation of DBForCompaction. There are three exceptions
44 : // to this DBForCompaction.GetAllowedWithoutPermission,
45 : // CompactionScheduler.Unregister, CompactionGrantHandle.Done -- see those
46 : // declarations for details.
47 : type CompactionScheduler interface {
48 : // Register is called to register this DB and to specify the number of
49 : // goroutines that consume CPU in each compaction (see the CPU reporting
50 : // interface, CompactionGrantHandle.MeasureCPU). Must be called exactly once
51 : // by this DB if it successfully opens.
52 : Register(numGoroutinesPerCompaction int, db DBForCompaction)
53 : // Unregister is used to unregister the DB. Must be called once when the DB
54 : // is being closed. Unregister waits until all ongoing calls to
55 : // DBForCompaction are finished, so Unregister must not be called while
56 : // holding locks that DBForCompaction acquires in those calls.
57 : Unregister()
58 : // TrySchedule is called by DB when it wants to run a compaction. The bool
59 : // is true iff permission is granted, and in that case the
60 : // CompactionGrantHandle needs to be exercised by the DB.
61 : TrySchedule() (bool, CompactionGrantHandle)
62 : // UpdateGetAllowedWithoutPermission is to inform the scheduler that some
63 : // external behavior may have caused this value to change. It exists because
64 : // flushes are not otherwise visible to the CompactionScheduler, and can
65 : // cause the value to increase. CompactionScheduler implementation should do
66 : // periodic sampling (e.g. as done by
67 : // ConcurrencyLimitScheduler.periodicGranter), but this provides an
68 : // instantaneous opportunity to act.
69 : UpdateGetAllowedWithoutPermission()
70 : }
71 :
72 : // DBForCompaction is the interface implemented by the DB to interact with the
73 : // CompactionScheduler.
74 : type DBForCompaction interface {
75 : // GetAllowedWithoutPermission returns what is permitted at the DB-level
76 : // (there may be further restrictions at the node level, when there are
77 : // multiple DBs at a node, which is not captured by this number). This can
78 : // vary based on compaction backlog or other factors. This method must not
79 : // acquire any mutex in DBForCompaction that is covered by the general mutex
80 : // ordering rule stated earlier.
81 : GetAllowedWithoutPermission() int
82 : // GetWaitingCompaction returns true iff the DB can run a compaction. The
83 : // true return is accompanied by a populated WaitingForCompaction, that the
84 : // scheduler can use to pick across DBs or other work in the system. This
85 : // method should typically be efficient, in that the DB should try to cache
86 : // some state if its previous call to TrySchedule resulted in a failure to
87 : // get permission. It is ok if it is sometimes slow since all work scheduled
88 : // by CompactionScheduler is long-lived (often executing for multiple
89 : // seconds).
90 : GetWaitingCompaction() (bool, WaitingCompaction)
91 : // Schedule grants the DB permission to run a compaction. The DB returns
92 : // true iff it accepts the grant, in which case it must exercise the
93 : // CompactionGrantHandle.
94 : Schedule(CompactionGrantHandle) bool
95 : }
96 :
97 : // WaitingCompaction captures state for a compaction that can be used to
98 : // prioritize wrt compactions in other DBs or other long-lived work in the
99 : // system.
100 : type WaitingCompaction struct {
101 : // Optional is true for a compaction that isn't necessary for maintaining an
102 : // overall healthy LSM. This value can be compared across compactions and
103 : // other long-lived work.
104 : Optional bool
105 : // Priority is the priority of a compaction. It is only compared across
106 : // compactions, and when the Optional value is the same.
107 : Priority int
108 : // Score is only compared across compactions. It is only compared across
109 : // compactions, and when the Optional and Priority are the same.
110 : Score float64
111 : }
112 :
113 : // Ordering is by priority and if the optional value is different, false is
114 : // more important than true.
115 : //
116 : // The ordering here must be consistent with the order in which compactions
117 : // are picked in compactionPickerByScore.pickAuto.
118 : type compactionOptionalAndPriority struct {
119 : optional bool
120 : priority int
121 : }
122 :
123 : var scheduledCompactionMap map[compactionKind]compactionOptionalAndPriority
124 : var manualCompactionPriority int
125 :
126 2 : func init() {
127 2 : // Manual compactions have priority just below the score-rebased
128 2 : // compactions, since DB.pickAnyCompaction first picks score-based
129 2 : // compactions, and then manual compactions.
130 2 : manualCompactionPriority = 70
131 2 : scheduledCompactionMap = map[compactionKind]compactionOptionalAndPriority{}
132 2 : // Score-based-compactions have priorities {100, 90, 80}.
133 2 : //
134 2 : // We don't actually know if it is a compactionKindMove or
135 2 : // compactionKindCopy until a compactionKindDefault is turned from a
136 2 : // pickedCompaction into a compaction struct. So we will never see those
137 2 : // values here, but for completeness we include them.
138 2 : scheduledCompactionMap[compactionKindMove] = compactionOptionalAndPriority{priority: 100}
139 2 : scheduledCompactionMap[compactionKindCopy] = compactionOptionalAndPriority{priority: 90}
140 2 : scheduledCompactionMap[compactionKindDefault] = compactionOptionalAndPriority{priority: 80}
141 2 : scheduledCompactionMap[compactionKindTombstoneDensity] =
142 2 : compactionOptionalAndPriority{optional: true, priority: 60}
143 2 : scheduledCompactionMap[compactionKindElisionOnly] =
144 2 : compactionOptionalAndPriority{optional: true, priority: 50}
145 2 : scheduledCompactionMap[compactionKindVirtualRewrite] =
146 2 : compactionOptionalAndPriority{optional: true, priority: 40}
147 2 : scheduledCompactionMap[compactionKindBlobFileRewrite] =
148 2 : compactionOptionalAndPriority{optional: true, priority: 30}
149 2 : scheduledCompactionMap[compactionKindRead] =
150 2 : compactionOptionalAndPriority{optional: true, priority: 20}
151 2 : scheduledCompactionMap[compactionKindRewrite] =
152 2 : compactionOptionalAndPriority{optional: true, priority: 10}
153 2 : }
154 :
155 : // noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
156 : type noopGrantHandle struct{}
157 :
158 : var _ CompactionGrantHandle = noopGrantHandle{}
159 :
160 2 : func (h noopGrantHandle) Started() {}
161 2 : func (h noopGrantHandle) MeasureCPU(CompactionGoroutineKind) {}
162 2 : func (h noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {}
163 2 : func (h noopGrantHandle) Done() {}
164 :
165 : // pickedCompactionCache is used to avoid the work of repeatedly picking a
166 : // compaction that then fails to run immediately because TrySchedule returns
167 : // false.
168 : //
169 : // The high-level approach is to construct a pickedCompaction in
170 : // DB.maybeScheduleCompaction if there isn't one in the cache, and if
171 : // TrySchedule returns false, to remember it. Ignoring flushes, the worst-case
172 : // behavior is 1 of 2 pickedCompactions gets to run (so half the picking work
173 : // is wasted). This worst-case happens when the system is running at the limit
174 : // of the long-lived work (including compactions) it can support. In this
175 : // setting, each started compaction invalidates the pickedCompaction in the
176 : // cache when it completes, and the reason the cache has a pickedCompaction
177 : // (that got invalidated) is that the CompactionScheduler called
178 : // GetWaitingCompaction and decided not to run the pickedCompaction (some
179 : // other work won). We consider the CPU overhead of this waste acceptable.
180 : //
181 : // For the default case of a ConcurrencyLimitScheduler, which only considers a
182 : // single DB, the aforementioned worst-case is avoided by not constructing a
183 : // new pickedCompaction in DB.maybeScheduleCompaction when
184 : // pickedCompactionCache.isWaiting is already true (which became true once,
185 : // when a backlog developed). Whenever a compaction completes and a new
186 : // compaction can be started, the call to DBForCompaction.GetWaitingCompaction
187 : // constructs a new pickedCompaction and caches it, and then this immediately
188 : // gets to run when DBForCompaction.Schedule is called.
189 : type pickedCompactionCache struct {
190 : // pc != nil => waiting.
191 : //
192 : // It is acceptable for waiting to be true and pc to be nil, when pc is
193 : // invalidated due to starting a compaction, or completing a
194 : // compaction/flush (since it changes the latest version).
195 : waiting bool
196 : pc pickedCompaction
197 : }
198 :
199 : // invalidate the cache because a new Version is installed or a compaction is
200 : // started (since a new in-progress compaction affects future compaction
201 : // picking). The value of waiting is not changed.
202 2 : func (c *pickedCompactionCache) invalidate() {
203 2 : c.pc = nil
204 2 : }
205 :
206 : // isWaiting returns the value of waiting.
207 2 : func (c *pickedCompactionCache) isWaiting() bool {
208 2 : return c.waiting
209 2 : }
210 :
211 : // getForRunning returns a pickedCompaction if in the cache. The cache is
212 : // cleared. It may return nil.
213 2 : func (c *pickedCompactionCache) getForRunning() pickedCompaction {
214 2 : // NB: This does not set c.waiting = false, since there may be more
215 2 : // compactions to run.
216 2 : pc := c.pc
217 2 : c.pc = nil
218 2 : return pc
219 2 : }
220 :
221 : // setNotWaiting sets waiting to false.
222 2 : func (c *pickedCompactionCache) setNotWaiting() {
223 2 : c.waiting = false
224 2 : c.pc = nil
225 2 : }
226 :
227 : // peek return the pickedCompaction, if any, in the cache.
228 2 : func (c *pickedCompactionCache) peek() pickedCompaction {
229 2 : return c.pc
230 2 : }
231 :
232 : // add adds a pickedCompaction to the cache and sets waiting to true.
233 2 : func (c *pickedCompactionCache) add(pc pickedCompaction) {
234 2 : c.waiting = true
235 2 : c.pc = pc
236 2 : }
237 :
238 : // ConcurrencyLimitScheduler is the default scheduler used by Pebble. It
239 : // simply uses the concurrency limit retrieved from
240 : // DBForCompaction.GetAllowedWithoutPermission to decide the number of
241 : // compactions to schedule. ConcurrencyLimitScheduler must have its Register
242 : // method called at most once -- i.e., it cannot be reused across DBs.
243 : //
244 : // Since the GetAllowedWithoutPermission value changes over time, the
245 : // scheduler needs to be quite current in its sampling, especially if the
246 : // value is increasing, to prevent lag in scheduling compactions. Calls to
247 : // ConcurrencyLimitScheduler.Done and ConcurrencyLimitScheduler.TrySchedule
248 : // are obvious places this value is sampled. However, since
249 : // ConcurrencyLimitScheduler does not observe flushes (which can increase the
250 : // value), and there can be situations where compactions last 10+ seconds,
251 : // this sampling is not considered sufficient. Note that calls to
252 : // ConcurrencyLimitScheduler.TrySchedule are dampened in
253 : // DB.maybeScheduleCompaction when there is a waiting compaction (to prevent
254 : // wasted computation of pickedCompaction). If DB.maybeScheduleCompaction
255 : // always called ConcurrencyLimitScheduler.TrySchedule we would have no lag as
256 : // DB.maybeScheduleCompaction is called on flush completion. Hence, we resort
257 : // to having a background thread in ConcurrencyLimitScheduler sample the value
258 : // every 100ms, plus sample in UpdateGetAllowedWithoutPermission.
259 : type ConcurrencyLimitScheduler struct {
260 : ts schedulerTimeSource
261 : // db is set in Register, but not protected by mu since it is strictly
262 : // before any calls to the other methods.
263 : db DBForCompaction
264 : mu struct {
265 : sync.Mutex
266 : runningCompactions int
267 : // unregistered transitions once from false => true.
268 : unregistered bool
269 : // isGranting is used to (a) serialize granting from Done and
270 : // periodicGranter, (b) ensure that granting is stopped before returning
271 : // from Unregister.
272 : isGranting bool
273 : isGrantingCond *sync.Cond
274 : lastAllowedWithoutPermission int
275 : }
276 : stopPeriodicGranterCh chan struct{}
277 : pokePeriodicGranterCh chan struct{}
278 : // Only non-nil in some tests.
279 : periodicGranterRanChForTesting chan struct{}
280 : }
281 :
282 : var _ CompactionScheduler = &ConcurrencyLimitScheduler{}
283 :
284 1 : func newConcurrencyLimitScheduler(ts schedulerTimeSource) *ConcurrencyLimitScheduler {
285 1 : s := &ConcurrencyLimitScheduler{
286 1 : ts: ts,
287 1 : stopPeriodicGranterCh: make(chan struct{}),
288 1 : pokePeriodicGranterCh: make(chan struct{}, 1),
289 1 : }
290 1 : s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
291 1 : return s
292 1 : }
293 :
294 2 : func NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() *ConcurrencyLimitScheduler {
295 2 : s := &ConcurrencyLimitScheduler{
296 2 : ts: defaultTimeSource{},
297 2 : }
298 2 : s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
299 2 : return s
300 2 : }
301 :
302 2 : func (s *ConcurrencyLimitScheduler) Register(numGoroutinesPerCompaction int, db DBForCompaction) {
303 2 : s.db = db
304 2 : if s.stopPeriodicGranterCh != nil {
305 1 : go s.periodicGranter()
306 1 : }
307 2 : s.mu.Lock()
308 2 : defer s.mu.Unlock()
309 2 : if s.mu.unregistered {
310 0 : panic("cannot reuse ConcurrencyLimitScheduler")
311 : }
312 : }
313 :
314 2 : func (s *ConcurrencyLimitScheduler) Unregister() {
315 2 : if s.stopPeriodicGranterCh != nil {
316 1 : s.stopPeriodicGranterCh <- struct{}{}
317 1 : }
318 2 : s.mu.Lock()
319 2 : defer s.mu.Unlock()
320 2 : s.mu.unregistered = true
321 2 : // Wait until isGranting becomes false. Since unregistered has been set to
322 2 : // true, once isGranting becomes false, no more granting will happen.
323 2 : for s.mu.isGranting {
324 2 : s.mu.isGrantingCond.Wait()
325 2 : }
326 : }
327 :
328 2 : func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle) {
329 2 : s.mu.Lock()
330 2 : defer s.mu.Unlock()
331 2 : if s.mu.unregistered {
332 2 : return false, nil
333 2 : }
334 2 : s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
335 2 : if s.mu.lastAllowedWithoutPermission > s.mu.runningCompactions {
336 2 : s.mu.runningCompactions++
337 2 : return true, s
338 2 : }
339 2 : return false, nil
340 : }
341 :
342 2 : func (s *ConcurrencyLimitScheduler) Started() {}
343 2 : func (s *ConcurrencyLimitScheduler) MeasureCPU(CompactionGoroutineKind) {}
344 2 : func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {}
345 :
346 2 : func (s *ConcurrencyLimitScheduler) Done() {
347 2 : s.mu.Lock()
348 2 : s.mu.runningCompactions--
349 2 : s.tryGrantLockedAndUnlock()
350 2 : }
351 :
352 2 : func (s *ConcurrencyLimitScheduler) UpdateGetAllowedWithoutPermission() {
353 2 : s.mu.Lock()
354 2 : allowedWithoutPermission := s.db.GetAllowedWithoutPermission()
355 2 : tryGrant := allowedWithoutPermission > s.mu.lastAllowedWithoutPermission
356 2 : s.mu.lastAllowedWithoutPermission = allowedWithoutPermission
357 2 : s.mu.Unlock()
358 2 : if tryGrant {
359 2 : select {
360 1 : case s.pokePeriodicGranterCh <- struct{}{}:
361 2 : default:
362 : }
363 : }
364 : }
365 :
366 2 : func (s *ConcurrencyLimitScheduler) tryGrantLockedAndUnlock() {
367 2 : defer s.mu.Unlock()
368 2 : if s.mu.unregistered {
369 2 : return
370 2 : }
371 : // Wait for turn to grant.
372 2 : for s.mu.isGranting {
373 2 : s.mu.isGrantingCond.Wait()
374 2 : }
375 : // INVARIANT: !isGranting.
376 2 : if s.mu.unregistered {
377 2 : return
378 2 : }
379 2 : s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
380 2 : toGrant := s.mu.lastAllowedWithoutPermission - s.mu.runningCompactions
381 2 : if toGrant > 0 {
382 2 : s.mu.isGranting = true
383 2 : } else {
384 2 : return
385 2 : }
386 2 : s.mu.Unlock()
387 2 : // We call GetWaitingCompaction iff we can successfully grant, so that there
388 2 : // is no wasted pickedCompaction.
389 2 : //
390 2 : // INVARIANT: loop exits with s.mu unlocked.
391 2 : for toGrant > 0 {
392 2 : waiting, _ := s.db.GetWaitingCompaction()
393 2 : if !waiting {
394 2 : break
395 : }
396 2 : accepted := s.db.Schedule(s)
397 2 : if !accepted {
398 1 : break
399 : }
400 2 : s.mu.Lock()
401 2 : s.mu.runningCompactions++
402 2 : toGrant--
403 2 : s.mu.Unlock()
404 : }
405 : // Will be unlocked by the defer statement.
406 2 : s.mu.Lock()
407 2 : s.mu.isGranting = false
408 2 : s.mu.isGrantingCond.Broadcast()
409 : }
410 :
411 1 : func (s *ConcurrencyLimitScheduler) periodicGranter() {
412 1 : ticker := s.ts.newTicker(100 * time.Millisecond)
413 1 : for {
414 1 : select {
415 1 : case <-ticker.ch():
416 1 : s.mu.Lock()
417 1 : s.tryGrantLockedAndUnlock()
418 1 : case <-s.pokePeriodicGranterCh:
419 1 : s.mu.Lock()
420 1 : s.tryGrantLockedAndUnlock()
421 1 : case <-s.stopPeriodicGranterCh:
422 1 : ticker.stop()
423 1 : return
424 : }
425 1 : if s.periodicGranterRanChForTesting != nil {
426 1 : s.periodicGranterRanChForTesting <- struct{}{}
427 1 : }
428 : }
429 : }
430 :
431 1 : func (s *ConcurrencyLimitScheduler) adjustRunningCompactionsForTesting(delta int) {
432 1 : s.mu.Lock()
433 1 : s.mu.runningCompactions += delta
434 1 : if delta < 0 {
435 1 : s.tryGrantLockedAndUnlock()
436 1 : } else {
437 1 : s.mu.Unlock()
438 1 : }
439 : }
440 :
441 : // schedulerTimeSource is used to abstract time.NewTicker for
442 : // ConcurrencyLimitScheduler.
443 : type schedulerTimeSource interface {
444 : newTicker(duration time.Duration) schedulerTicker
445 : }
446 :
447 : // schedulerTicker is used to abstract time.Ticker for
448 : // ConcurrencyLimitScheduler.
449 : type schedulerTicker interface {
450 : stop()
451 : ch() <-chan time.Time
452 : }
453 :
454 : // defaultTime is a schedulerTimeSource using the time package.
455 : type defaultTimeSource struct{}
456 :
457 : var _ schedulerTimeSource = defaultTimeSource{}
458 :
459 1 : func (defaultTimeSource) newTicker(duration time.Duration) schedulerTicker {
460 1 : return (*defaultTicker)(time.NewTicker(duration))
461 1 : }
462 :
463 : // defaultTicker uses time.Ticker.
464 : type defaultTicker time.Ticker
465 :
466 : var _ schedulerTicker = &defaultTicker{}
467 :
468 1 : func (t *defaultTicker) stop() {
469 1 : (*time.Ticker)(t).Stop()
470 1 : }
471 :
472 1 : func (t *defaultTicker) ch() <-chan time.Time {
473 1 : return (*time.Ticker)(t).C
474 1 : }
|