Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "sync"
9 : "time"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : )
13 :
14 : type CompactionGrantHandle = base.CompactionGrantHandle
15 : type CompactionGrantHandleStats = base.CompactionGrantHandleStats
16 : type CompactionGoroutineKind = base.CompactionGoroutineKind
17 :
18 : const (
19 : CompactionGoroutinePrimary = base.CompactionGoroutinePrimary
20 : CompactionGoroutineSSTableSecondary = base.CompactionGoroutineSSTableSecondary
21 : CompactionGoroutineBlobFileSecondary = base.CompactionGoroutineBlobFileSecondary
22 : )
23 :
24 : // NB: This interface is experimental and subject to change.
25 : //
26 : // For instance, we may incorporate more information in TrySchedule and in the
27 : // return value of Schedule to tell CompactionScheduler of the sub-category of
28 : // compaction so that the scheduler can have more granular estimates. For
29 : // example, the input or output level could affect the write bandwidth if the
30 : // inputs are better cached (say at higher levels).
31 :
32 : // CompactionScheduler is responsible for scheduling both automatic and manual
33 : // compactions. In the case of multiple DB instances on a node (i.e. a
34 : // multi-store configuration), implementations of CompactionScheduler may
35 : // enforce a global maximum compaction concurrency. Additionally,
36 : // implementations of CompactionScheduler may be resource aware and permit
37 : // more than the compactions that are "allowed without permission" if
38 : // resources are available.
39 : //
40 : // Locking: CompactionScheduler's mutexes are ordered after DBForCompaction
41 : // mutexes. We need to specify some lock ordering since CompactionScheduler
42 : // and DBForCompaction call into each other. This ordering choice is made to
43 : // simplify the implementation of DBForCompaction. There are three exceptions
44 : // to this DBForCompaction.GetAllowedWithoutPermission,
45 : // CompactionScheduler.Unregister, CompactionGrantHandle.Done -- see those
46 : // declarations for details.
47 : type CompactionScheduler interface {
48 : // Register is called to register this DB and to specify the number of
49 : // goroutines that consume CPU in each compaction (see the CPU reporting
50 : // interface, CompactionGrantHandle.MeasureCPU). Must be called exactly once
51 : // by this DB if it successfully opens.
52 : Register(numGoroutinesPerCompaction int, db DBForCompaction)
53 : // Unregister is used to unregister the DB. Must be called once when the DB
54 : // is being closed. Unregister waits until all ongoing calls to
55 : // DBForCompaction are finished, so Unregister must not be called while
56 : // holding locks that DBForCompaction acquires in those calls.
57 : Unregister()
58 : // TrySchedule is called by DB when it wants to run a compaction. The bool
59 : // is true iff permission is granted, and in that case the
60 : // CompactionGrantHandle needs to be exercised by the DB.
61 : TrySchedule() (bool, CompactionGrantHandle)
62 : // UpdateGetAllowedWithoutPermission is to inform the scheduler that some
63 : // external behavior may have caused this value to change. It exists because
64 : // flushes are not otherwise visible to the CompactionScheduler, and can
65 : // cause the value to increase. CompactionScheduler implementation should do
66 : // periodic sampling (e.g. as done by
67 : // ConcurrencyLimitScheduler.periodicGranter), but this provides an
68 : // instantaneous opportunity to act.
69 : UpdateGetAllowedWithoutPermission()
70 : }
71 :
72 : // DBForCompaction is the interface implemented by the DB to interact with the
73 : // CompactionScheduler.
74 : type DBForCompaction interface {
75 : // GetAllowedWithoutPermission returns what is permitted at the DB-level
76 : // (there may be further restrictions at the node level, when there are
77 : // multiple DBs at a node, which is not captured by this number). This can
78 : // vary based on compaction backlog or other factors. This method must not
79 : // acquire any mutex in DBForCompaction that is covered by the general mutex
80 : // ordering rule stated earlier.
81 : GetAllowedWithoutPermission() int
82 : // GetWaitingCompaction returns true iff the DB can run a compaction. The
83 : // true return is accompanied by a populated WaitingForCompaction, that the
84 : // scheduler can use to pick across DBs or other work in the system. This
85 : // method should typically be efficient, in that the DB should try to cache
86 : // some state if its previous call to TrySchedule resulted in a failure to
87 : // get permission. It is ok if it is sometimes slow since all work scheduled
88 : // by CompactionScheduler is long-lived (often executing for multiple
89 : // seconds).
90 : GetWaitingCompaction() (bool, WaitingCompaction)
91 : // Schedule grants the DB permission to run a compaction. The DB returns
92 : // true iff it accepts the grant, in which case it must exercise the
93 : // CompactionGrantHandle.
94 : Schedule(CompactionGrantHandle) bool
95 : }
96 :
97 : // WaitingCompaction captures state for a compaction that can be used to
98 : // prioritize wrt compactions in other DBs or other long-lived work in the
99 : // system.
100 : type WaitingCompaction struct {
101 : // Optional is true for a compaction that isn't necessary for maintaining an
102 : // overall healthy LSM. This value can be compared across compactions and
103 : // other long-lived work.
104 : Optional bool
105 : // Priority is the priority of a compaction. It is only compared across
106 : // compactions, and when the Optional value is the same.
107 : Priority int
108 : // Score is only compared across compactions. It is only compared across
109 : // compactions, and when the Optional and Priority are the same.
110 : Score float64
111 : }
112 :
113 : // Ordering is by priority and if the optional value is different, false is
114 : // more important than true.
115 : //
116 : // The ordering here must be consistent with the order in which compactions
117 : // are picked in compactionPickerByScore.pickAuto.
118 : type compactionOptionalAndPriority struct {
119 : optional bool
120 : priority int
121 : }
122 :
123 : var scheduledCompactionMap map[compactionKind]compactionOptionalAndPriority
124 : var manualCompactionPriority int
125 :
126 2 : func init() {
127 2 : // Manual compactions have priority just below the score-rebased
128 2 : // compactions, since DB.pickAnyCompaction first picks score-based
129 2 : // compactions, and then manual compactions.
130 2 : manualCompactionPriority = 70
131 2 : scheduledCompactionMap = map[compactionKind]compactionOptionalAndPriority{}
132 2 : // Score-based-compactions have priorities {100, 90, 80}.
133 2 : //
134 2 : // We don't actually know if it is a compactionKindMove or
135 2 : // compactionKindCopy until a compactionKindDefault is turned from a
136 2 : // pickedCompaction into a compaction struct. So we will never see those
137 2 : // values here, but for completeness we include them.
138 2 : scheduledCompactionMap[compactionKindMove] = compactionOptionalAndPriority{priority: 100}
139 2 : scheduledCompactionMap[compactionKindCopy] = compactionOptionalAndPriority{priority: 90}
140 2 : scheduledCompactionMap[compactionKindDefault] = compactionOptionalAndPriority{priority: 80}
141 2 : scheduledCompactionMap[compactionKindTombstoneDensity] =
142 2 : compactionOptionalAndPriority{optional: true, priority: 60}
143 2 : scheduledCompactionMap[compactionKindElisionOnly] =
144 2 : compactionOptionalAndPriority{optional: true, priority: 50}
145 2 : scheduledCompactionMap[compactionKindBlobFileRewrite] =
146 2 : compactionOptionalAndPriority{optional: true, priority: 40}
147 2 : scheduledCompactionMap[compactionKindRead] =
148 2 : compactionOptionalAndPriority{optional: true, priority: 30}
149 2 : scheduledCompactionMap[compactionKindRewrite] =
150 2 : compactionOptionalAndPriority{optional: true, priority: 20}
151 2 : }
152 :
153 : // noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
154 : type noopGrantHandle struct{}
155 :
156 : var _ CompactionGrantHandle = noopGrantHandle{}
157 :
158 2 : func (h noopGrantHandle) Started() {}
159 2 : func (h noopGrantHandle) MeasureCPU(CompactionGoroutineKind) {}
160 2 : func (h noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {}
161 2 : func (h noopGrantHandle) Done() {}
162 :
163 : // pickedCompactionCache is used to avoid the work of repeatedly picking a
164 : // compaction that then fails to run immediately because TrySchedule returns
165 : // false.
166 : //
167 : // The high-level approach is to construct a pickedCompaction in
168 : // DB.maybeScheduleCompaction if there isn't one in the cache, and if
169 : // TrySchedule returns false, to remember it. Ignoring flushes, the worst-case
170 : // behavior is 1 of 2 pickedCompactions gets to run (so half the picking work
171 : // is wasted). This worst-case happens when the system is running at the limit
172 : // of the long-lived work (including compactions) it can support. In this
173 : // setting, each started compaction invalidates the pickedCompaction in the
174 : // cache when it completes, and the reason the cache has a pickedCompaction
175 : // (that got invalidated) is that the CompactionScheduler called
176 : // GetWaitingCompaction and decided not to run the pickedCompaction (some
177 : // other work won). We consider the CPU overhead of this waste acceptable.
178 : //
179 : // For the default case of a ConcurrencyLimitScheduler, which only considers a
180 : // single DB, the aforementioned worst-case is avoided by not constructing a
181 : // new pickedCompaction in DB.maybeScheduleCompaction when
182 : // pickedCompactionCache.isWaiting is already true (which became true once,
183 : // when a backlog developed). Whenever a compaction completes and a new
184 : // compaction can be started, the call to DBForCompaction.GetWaitingCompaction
185 : // constructs a new pickedCompaction and caches it, and then this immediately
186 : // gets to run when DBForCompaction.Schedule is called.
187 : type pickedCompactionCache struct {
188 : // pc != nil => waiting.
189 : //
190 : // It is acceptable for waiting to be true and pc to be nil, when pc is
191 : // invalidated due to starting a compaction, or completing a
192 : // compaction/flush (since it changes the latest version).
193 : waiting bool
194 : pc pickedCompaction
195 : }
196 :
197 : // invalidate the cache because a new Version is installed or a compaction is
198 : // started (since a new in-progress compaction affects future compaction
199 : // picking). The value of waiting is not changed.
200 2 : func (c *pickedCompactionCache) invalidate() {
201 2 : c.pc = nil
202 2 : }
203 :
204 : // isWaiting returns the value of waiting.
205 2 : func (c *pickedCompactionCache) isWaiting() bool {
206 2 : return c.waiting
207 2 : }
208 :
209 : // getForRunning returns a pickedCompaction if in the cache. The cache is
210 : // cleared. It may return nil.
211 2 : func (c *pickedCompactionCache) getForRunning() pickedCompaction {
212 2 : // NB: This does not set c.waiting = false, since there may be more
213 2 : // compactions to run.
214 2 : pc := c.pc
215 2 : c.pc = nil
216 2 : return pc
217 2 : }
218 :
219 : // setNotWaiting sets waiting to false.
220 2 : func (c *pickedCompactionCache) setNotWaiting() {
221 2 : c.waiting = false
222 2 : c.pc = nil
223 2 : }
224 :
225 : // peek return the pickedCompaction, if any, in the cache.
226 2 : func (c *pickedCompactionCache) peek() pickedCompaction {
227 2 : return c.pc
228 2 : }
229 :
230 : // add adds a pickedCompaction to the cache and sets waiting to true.
231 2 : func (c *pickedCompactionCache) add(pc pickedCompaction) {
232 2 : c.waiting = true
233 2 : c.pc = pc
234 2 : }
235 :
236 : // ConcurrencyLimitScheduler is the default scheduler used by Pebble. It
237 : // simply uses the concurrency limit retrieved from
238 : // DBForCompaction.GetAllowedWithoutPermission to decide the number of
239 : // compactions to schedule. ConcurrencyLimitScheduler must have its Register
240 : // method called at most once -- i.e., it cannot be reused across DBs.
241 : //
242 : // Since the GetAllowedWithoutPermission value changes over time, the
243 : // scheduler needs to be quite current in its sampling, especially if the
244 : // value is increasing, to prevent lag in scheduling compactions. Calls to
245 : // ConcurrencyLimitScheduler.Done and ConcurrencyLimitScheduler.TrySchedule
246 : // are obvious places this value is sampled. However, since
247 : // ConcurrencyLimitScheduler does not observe flushes (which can increase the
248 : // value), and there can be situations where compactions last 10+ seconds,
249 : // this sampling is not considered sufficient. Note that calls to
250 : // ConcurrencyLimitScheduler.TrySchedule are dampened in
251 : // DB.maybeScheduleCompaction when there is a waiting compaction (to prevent
252 : // wasted computation of pickedCompaction). If DB.maybeScheduleCompaction
253 : // always called ConcurrencyLimitScheduler.TrySchedule we would have no lag as
254 : // DB.maybeScheduleCompaction is called on flush completion. Hence, we resort
255 : // to having a background thread in ConcurrencyLimitScheduler sample the value
256 : // every 100ms, plus sample in UpdateGetAllowedWithoutPermission.
257 : type ConcurrencyLimitScheduler struct {
258 : ts schedulerTimeSource
259 : // db is set in Register, but not protected by mu since it is strictly
260 : // before any calls to the other methods.
261 : db DBForCompaction
262 : mu struct {
263 : sync.Mutex
264 : runningCompactions int
265 : // unregistered transitions once from false => true.
266 : unregistered bool
267 : // isGranting is used to (a) serialize granting from Done and
268 : // periodicGranter, (b) ensure that granting is stopped before returning
269 : // from Unregister.
270 : isGranting bool
271 : isGrantingCond *sync.Cond
272 : lastAllowedWithoutPermission int
273 : }
274 : stopPeriodicGranterCh chan struct{}
275 : pokePeriodicGranterCh chan struct{}
276 : // Only non-nil in some tests.
277 : periodicGranterRanChForTesting chan struct{}
278 : }
279 :
280 : var _ CompactionScheduler = &ConcurrencyLimitScheduler{}
281 :
282 1 : func newConcurrencyLimitScheduler(ts schedulerTimeSource) *ConcurrencyLimitScheduler {
283 1 : s := &ConcurrencyLimitScheduler{
284 1 : ts: ts,
285 1 : stopPeriodicGranterCh: make(chan struct{}),
286 1 : pokePeriodicGranterCh: make(chan struct{}, 1),
287 1 : }
288 1 : s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
289 1 : return s
290 1 : }
291 :
292 2 : func NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest() *ConcurrencyLimitScheduler {
293 2 : s := &ConcurrencyLimitScheduler{
294 2 : ts: defaultTimeSource{},
295 2 : }
296 2 : s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex)
297 2 : return s
298 2 : }
299 :
300 2 : func (s *ConcurrencyLimitScheduler) Register(numGoroutinesPerCompaction int, db DBForCompaction) {
301 2 : s.db = db
302 2 : if s.stopPeriodicGranterCh != nil {
303 1 : go s.periodicGranter()
304 1 : }
305 2 : s.mu.Lock()
306 2 : defer s.mu.Unlock()
307 2 : if s.mu.unregistered {
308 0 : panic("cannot reuse ConcurrencyLimitScheduler")
309 : }
310 : }
311 :
312 2 : func (s *ConcurrencyLimitScheduler) Unregister() {
313 2 : if s.stopPeriodicGranterCh != nil {
314 1 : s.stopPeriodicGranterCh <- struct{}{}
315 1 : }
316 2 : s.mu.Lock()
317 2 : defer s.mu.Unlock()
318 2 : s.mu.unregistered = true
319 2 : // Wait until isGranting becomes false. Since unregistered has been set to
320 2 : // true, once isGranting becomes false, no more granting will happen.
321 2 : for s.mu.isGranting {
322 2 : s.mu.isGrantingCond.Wait()
323 2 : }
324 : }
325 :
326 2 : func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle) {
327 2 : s.mu.Lock()
328 2 : defer s.mu.Unlock()
329 2 : if s.mu.unregistered {
330 2 : return false, nil
331 2 : }
332 2 : s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
333 2 : if s.mu.lastAllowedWithoutPermission > s.mu.runningCompactions {
334 2 : s.mu.runningCompactions++
335 2 : return true, s
336 2 : }
337 2 : return false, nil
338 : }
339 :
340 2 : func (s *ConcurrencyLimitScheduler) Started() {}
341 2 : func (s *ConcurrencyLimitScheduler) MeasureCPU(CompactionGoroutineKind) {}
342 2 : func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {}
343 :
344 2 : func (s *ConcurrencyLimitScheduler) Done() {
345 2 : s.mu.Lock()
346 2 : s.mu.runningCompactions--
347 2 : s.tryGrantLockedAndUnlock()
348 2 : }
349 :
350 2 : func (s *ConcurrencyLimitScheduler) UpdateGetAllowedWithoutPermission() {
351 2 : s.mu.Lock()
352 2 : allowedWithoutPermission := s.db.GetAllowedWithoutPermission()
353 2 : tryGrant := allowedWithoutPermission > s.mu.lastAllowedWithoutPermission
354 2 : s.mu.lastAllowedWithoutPermission = allowedWithoutPermission
355 2 : s.mu.Unlock()
356 2 : if tryGrant {
357 2 : select {
358 1 : case s.pokePeriodicGranterCh <- struct{}{}:
359 2 : default:
360 : }
361 : }
362 : }
363 :
364 2 : func (s *ConcurrencyLimitScheduler) tryGrantLockedAndUnlock() {
365 2 : defer s.mu.Unlock()
366 2 : if s.mu.unregistered {
367 2 : return
368 2 : }
369 : // Wait for turn to grant.
370 2 : for s.mu.isGranting {
371 2 : s.mu.isGrantingCond.Wait()
372 2 : }
373 : // INVARIANT: !isGranting.
374 2 : if s.mu.unregistered {
375 2 : return
376 2 : }
377 2 : s.mu.lastAllowedWithoutPermission = s.db.GetAllowedWithoutPermission()
378 2 : toGrant := s.mu.lastAllowedWithoutPermission - s.mu.runningCompactions
379 2 : if toGrant > 0 {
380 2 : s.mu.isGranting = true
381 2 : } else {
382 2 : return
383 2 : }
384 2 : s.mu.Unlock()
385 2 : // We call GetWaitingCompaction iff we can successfully grant, so that there
386 2 : // is no wasted pickedCompaction.
387 2 : //
388 2 : // INVARIANT: loop exits with s.mu unlocked.
389 2 : for toGrant > 0 {
390 2 : waiting, _ := s.db.GetWaitingCompaction()
391 2 : if !waiting {
392 2 : break
393 : }
394 2 : accepted := s.db.Schedule(s)
395 2 : if !accepted {
396 1 : break
397 : }
398 2 : s.mu.Lock()
399 2 : s.mu.runningCompactions++
400 2 : toGrant--
401 2 : s.mu.Unlock()
402 : }
403 : // Will be unlocked by the defer statement.
404 2 : s.mu.Lock()
405 2 : s.mu.isGranting = false
406 2 : s.mu.isGrantingCond.Broadcast()
407 : }
408 :
409 1 : func (s *ConcurrencyLimitScheduler) periodicGranter() {
410 1 : ticker := s.ts.newTicker(100 * time.Millisecond)
411 1 : for {
412 1 : select {
413 1 : case <-ticker.ch():
414 1 : s.mu.Lock()
415 1 : s.tryGrantLockedAndUnlock()
416 1 : case <-s.pokePeriodicGranterCh:
417 1 : s.mu.Lock()
418 1 : s.tryGrantLockedAndUnlock()
419 1 : case <-s.stopPeriodicGranterCh:
420 1 : ticker.stop()
421 1 : return
422 : }
423 1 : if s.periodicGranterRanChForTesting != nil {
424 1 : s.periodicGranterRanChForTesting <- struct{}{}
425 1 : }
426 : }
427 : }
428 :
429 1 : func (s *ConcurrencyLimitScheduler) adjustRunningCompactionsForTesting(delta int) {
430 1 : s.mu.Lock()
431 1 : s.mu.runningCompactions += delta
432 1 : if delta < 0 {
433 1 : s.tryGrantLockedAndUnlock()
434 1 : } else {
435 1 : s.mu.Unlock()
436 1 : }
437 : }
438 :
439 : // schedulerTimeSource is used to abstract time.NewTicker for
440 : // ConcurrencyLimitScheduler.
441 : type schedulerTimeSource interface {
442 : newTicker(duration time.Duration) schedulerTicker
443 : }
444 :
445 : // schedulerTicker is used to abstract time.Ticker for
446 : // ConcurrencyLimitScheduler.
447 : type schedulerTicker interface {
448 : stop()
449 : ch() <-chan time.Time
450 : }
451 :
452 : // defaultTime is a schedulerTimeSource using the time package.
453 : type defaultTimeSource struct{}
454 :
455 : var _ schedulerTimeSource = defaultTimeSource{}
456 :
457 1 : func (defaultTimeSource) newTicker(duration time.Duration) schedulerTicker {
458 1 : return (*defaultTicker)(time.NewTicker(duration))
459 1 : }
460 :
461 : // defaultTicker uses time.Ticker.
462 : type defaultTicker time.Ticker
463 :
464 : var _ schedulerTicker = &defaultTicker{}
465 :
466 1 : func (t *defaultTicker) stop() {
467 1 : (*time.Ticker)(t).Stop()
468 1 : }
469 :
470 1 : func (t *defaultTicker) ch() <-chan time.Time {
471 1 : return (*time.Ticker)(t).C
472 1 : }
|