Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package compact
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/manifest"
13 : "github.com/cockroachdb/pebble/sstable"
14 : )
15 :
16 : // ShouldSplit indicates whether a compaction should split between output files.
17 : // See the OutputSplitter interface.
18 : type ShouldSplit bool
19 :
20 : const (
21 : // NoSplit may be returned by an OutputSplitter to indicate that it does NOT
22 : // recommend splitting compaction output sstables between the previous key
23 : // and the next key.
24 : NoSplit ShouldSplit = false
25 : // SplitNow may be returned by an OutputSplitter to indicate that it does
26 : // recommend splitting compaction output sstables between the previous key
27 : // and the next key.
28 : SplitNow ShouldSplit = true
29 : )
30 :
31 : // String implements the Stringer interface.
32 0 : func (s ShouldSplit) String() string {
33 0 : if s == NoSplit {
34 0 : return "no-split"
35 0 : }
36 0 : return "split-now"
37 : }
38 :
39 : // An OutputSplitter encapsulates logic around switching the output of a
40 : // compaction to a new output file. Additional constraints around switching
41 : // compaction outputs that are specific to that compaction type (eg. flush
42 : // splits) are implemented as separate OutputSplitters that may be composed.
43 : type OutputSplitter interface {
44 : // ShouldSplitBefore returns whether we should split outputs before the
45 : // specified "current key". The return value is SplitNow or NoSplit.
46 : // SplitNow means a split is advised before the specified key, and NoSplit
47 : // means no split is advised. If ShouldSplitBefore(a) advises a split then
48 : // ShouldSplitBefore(b) should also advise a split given b >= a, until
49 : // OnNewOutput is called.
50 : ShouldSplitBefore(key *base.InternalKey, tw *sstable.Writer) ShouldSplit
51 : // OnNewOutput updates internal splitter state when the compaction switches
52 : // to a new sstable, and returns the next limit for the new output which
53 : // should be used to truncate range tombstones if the compaction iterator
54 : // runs out of keys. The limit returned MUST be > key according to the
55 : // compaction's comparator. The key passed into OnNewOutput is the first key
56 : // in the new output, or nil if this sstable will only contain range
57 : // tombstones and/or range keys.
58 : OnNewOutput(key []byte) []byte
59 : }
60 :
61 : // CombineSplitters takes a list of OutputSplitters returning an OutputSplitter
62 : // that requests a split whenever any of its child splitters advises a split.
63 1 : func CombineSplitters(cmp base.Compare, splitters ...OutputSplitter) OutputSplitter {
64 1 : return &splitterGroup{
65 1 : cmp: cmp,
66 1 : splitters: splitters,
67 1 : }
68 1 : }
69 :
70 : // splitterGroup is an OutputSplitter that splits whenever one of its child
71 : // splitters advises a compaction split.
72 : type splitterGroup struct {
73 : cmp base.Compare
74 : splitters []OutputSplitter
75 : }
76 :
77 : func (a *splitterGroup) ShouldSplitBefore(
78 : key *base.InternalKey, tw *sstable.Writer,
79 1 : ) (suggestion ShouldSplit) {
80 1 : for _, s := range a.splitters {
81 1 : if s.ShouldSplitBefore(key, tw) == SplitNow {
82 1 : return SplitNow
83 1 : }
84 : }
85 1 : return NoSplit
86 : }
87 :
88 1 : func (a *splitterGroup) OnNewOutput(key []byte) []byte {
89 1 : var earliestLimit []byte
90 1 : for _, s := range a.splitters {
91 1 : limit := s.OnNewOutput(key)
92 1 : if limit == nil {
93 1 : continue
94 : }
95 1 : if earliestLimit == nil || a.cmp(limit, earliestLimit) < 0 {
96 1 : earliestLimit = limit
97 1 : }
98 : }
99 1 : return earliestLimit
100 : }
101 :
102 : // LimitFuncSplitter returns a new output splitter that splits according to the
103 : // sequence of user keys returned by the provided func. When a new output file
104 : // is begun, the provided func is called with the first key of the new output
105 : // file. If the func returns a non-nil key, a split will be requested before the
106 : // returned key.
107 1 : func LimitFuncSplitter(f *Frontiers, limitFunc func(userKey []byte) []byte) OutputSplitter {
108 1 : lf := &limitFuncSplitter{limitFunc: limitFunc}
109 1 : lf.frontier.Init(f, nil, lf.reached)
110 1 : return lf
111 1 : }
112 :
113 : type limitFuncSplitter struct {
114 : frontier frontier
115 : limitFunc func(userKey []byte) []byte
116 : split ShouldSplit
117 : }
118 :
119 : func (lf *limitFuncSplitter) ShouldSplitBefore(
120 : key *base.InternalKey, tw *sstable.Writer,
121 1 : ) ShouldSplit {
122 1 : return lf.split
123 1 : }
124 :
125 1 : func (lf *limitFuncSplitter) reached(nextKey []byte) []byte {
126 1 : lf.split = SplitNow
127 1 : return nil
128 1 : }
129 :
130 1 : func (lf *limitFuncSplitter) OnNewOutput(key []byte) []byte {
131 1 : lf.split = NoSplit
132 1 : if key != nil {
133 1 : // TODO(jackson): For some users, like L0 flush splits, there's no need
134 1 : // to binary search over all the flush splits every time. The next split
135 1 : // point must be ahead of the previous flush split point.
136 1 : limit := lf.limitFunc(key)
137 1 : lf.frontier.Update(limit)
138 1 : return limit
139 1 : }
140 1 : lf.frontier.Update(nil)
141 1 : return nil
142 : }
143 :
144 : // PreventSplitUserKeys returns an output splitter that takes in a child
145 : // splitter and splits when both A) that child splitter has advised a split and
146 : // B) the compaction output is at the boundary between two user keys.
147 : //
148 : // Adjacent sstables within a level are required to not contain the same user
149 : // key, but some splitter implementations do not independently have knowledge of
150 : // when a compaction is between two internal keys with the same user key. This
151 : // splitter may be used to wrap any splitters that don't guarantee user key
152 : // splits. If a wrapped splitter advises a split, it must continue to advise a
153 : // split until a new output.
154 : func PreventSplitUserKeys(
155 : cmp base.Compare, inner OutputSplitter, unsafePrevUserKey func() []byte,
156 1 : ) OutputSplitter {
157 1 : return &preventSplitUserKeysSplitter{
158 1 : cmp: cmp,
159 1 : splitter: inner,
160 1 : unsafePrevUserKey: unsafePrevUserKey,
161 1 : }
162 1 : }
163 :
164 : type preventSplitUserKeysSplitter struct {
165 : cmp base.Compare
166 : splitter OutputSplitter
167 : unsafePrevUserKey func() []byte
168 : }
169 :
170 : func (u *preventSplitUserKeysSplitter) ShouldSplitBefore(
171 : key *base.InternalKey, tw *sstable.Writer,
172 1 : ) ShouldSplit {
173 1 : // NB: The preventSplitUserKeysSplitter only needs to suffer a key
174 1 : // comparison if the wrapped splitter requests a split.
175 1 : //
176 1 : // We could implement this splitter using Frontiers: When the inner splitter
177 1 : // requests a split before key `k`, we'd update a frontier to be
178 1 : // ImmediateSuccessor(k). Then on the next key greater than >k, the
179 1 : // frontier's `reached` func would be called and we'd return splitNow.
180 1 : // This doesn't really save work since duplicate user keys are rare, and it
181 1 : // requires us to materialize the ImmediateSuccessor key. It also prevents
182 1 : // us from splitting on the same key that the inner splitter requested a
183 1 : // split for—instead we need to wait until the next key. The current
184 1 : // implementation uses `unsafePrevUserKey` to gain access to the previous
185 1 : // key which allows it to immediately respect the inner splitter if
186 1 : // possible.
187 1 : if split := u.splitter.ShouldSplitBefore(key, tw); split != SplitNow {
188 1 : return split
189 1 : }
190 1 : if u.cmp(key.UserKey, u.unsafePrevUserKey()) > 0 {
191 1 : return SplitNow
192 1 : }
193 1 : return NoSplit
194 : }
195 :
196 1 : func (u *preventSplitUserKeysSplitter) OnNewOutput(key []byte) []byte {
197 1 : return u.splitter.OnNewOutput(key)
198 1 : }
199 :
200 : type fileSizeSplitter struct {
201 : frontier frontier
202 : targetFileSize uint64
203 : atGrandparentBoundary bool
204 : boundariesObserved uint64
205 : nextGrandparent *manifest.FileMetadata
206 : grandparents manifest.LevelIterator
207 : }
208 :
209 : // FileSizeSplitter constructs an OutputSplitter that enforces target file
210 : // sizes. This splitter splits to a new output file when the estimated file size
211 : // is 0.5x-2x the target file size. If there are overlapping grandparent files,
212 : // this splitter will attempt to split at a grandparent boundary. For example,
213 : // consider the example where a compaction wrote 'd' to the current output file,
214 : // and the next key has a user key 'g':
215 : //
216 : // previous key next key
217 : // | |
218 : // | |
219 : // +---------------|----+ +--|----------+
220 : // grandparents: | 000006 | | | | 000007 |
221 : // +---------------|----+ +--|----------+
222 : // a b d e f g i
223 : //
224 : // Splitting the output file F before 'g' will ensure that the current output
225 : // file F does not overlap the grandparent file 000007. Aligning sstable
226 : // boundaries like this can significantly reduce write amplification, since a
227 : // subsequent compaction of F into the grandparent level will avoid needlessly
228 : // rewriting any keys within 000007 that do not overlap F's bounds. Consider the
229 : // following compaction:
230 : //
231 : // +----------------------+
232 : // input | |
233 : // level +----------------------+
234 : // \/
235 : // +---------------+ +---------------+
236 : // output |XXXXXXX| | | |XXXXXXXX|
237 : // level +---------------+ +---------------+
238 : //
239 : // The input-level file overlaps two files in the output level, but only
240 : // partially. The beginning of the first output-level file and the end of the
241 : // second output-level file will be rewritten verbatim. This write I/O is
242 : // "wasted" in the sense that no merging is being performed.
243 : //
244 : // To prevent the above waste, this splitter attempts to split output files
245 : // before the start key of grandparent files. It still strives to write output
246 : // files of approximately the target file size, by constraining this splitting
247 : // at grandparent points to apply only if the current output's file size is
248 : // about the right order of magnitude.
249 : //
250 : // Note that, unlike most other splitters, this splitter does not guarantee that
251 : // it will advise splits only at user key change boundaries.
252 : func FileSizeSplitter(
253 : frontiers *Frontiers, targetFileSize uint64, grandparents manifest.LevelIterator,
254 1 : ) OutputSplitter {
255 1 : s := &fileSizeSplitter{targetFileSize: targetFileSize}
256 1 : s.nextGrandparent = grandparents.First()
257 1 : s.grandparents = grandparents
258 1 : if s.nextGrandparent != nil {
259 1 : s.frontier.Init(frontiers, s.nextGrandparent.Smallest.UserKey, s.reached)
260 1 : }
261 1 : return s
262 : }
263 :
264 1 : func (f *fileSizeSplitter) reached(nextKey []byte) []byte {
265 1 : f.atGrandparentBoundary = true
266 1 : f.boundariesObserved++
267 1 : // NB: f.grandparents is a bounded iterator, constrained to the compaction
268 1 : // key range.
269 1 : f.nextGrandparent = f.grandparents.Next()
270 1 : if f.nextGrandparent == nil {
271 1 : return nil
272 1 : }
273 : // TODO(jackson): Should we also split before or immediately after
274 : // grandparents' largest keys? Splitting before the start boundary prevents
275 : // overlap with the grandparent. Also splitting after the end boundary may
276 : // increase the probability of move compactions.
277 1 : return f.nextGrandparent.Smallest.UserKey
278 : }
279 :
280 : func (f *fileSizeSplitter) ShouldSplitBefore(
281 : key *base.InternalKey, tw *sstable.Writer,
282 1 : ) ShouldSplit {
283 1 : atGrandparentBoundary := f.atGrandparentBoundary
284 1 :
285 1 : // Clear f.atGrandparentBoundary unconditionally.
286 1 : //
287 1 : // This is a bit subtle. Even if do decide to split, it's possible that a
288 1 : // higher-level splitter will ignore our request (eg, because we're between
289 1 : // two internal keys with the same user key). In this case, the next call to
290 1 : // shouldSplitBefore will find atGrandparentBoundary=false. This is
291 1 : // desirable, because in this case we would've already written the earlier
292 1 : // key with the same user key to the output file. The current output file is
293 1 : // already doomed to overlap the grandparent whose bound triggered
294 1 : // atGrandparentBoundary=true. We should continue on, waiting for the next
295 1 : // grandparent boundary.
296 1 : f.atGrandparentBoundary = false
297 1 :
298 1 : if tw == nil {
299 1 : return NoSplit
300 1 : }
301 :
302 1 : estSize := tw.EstimatedSize()
303 1 : switch {
304 1 : case estSize < f.targetFileSize/2:
305 1 : // The estimated file size is less than half the target file size. Don't
306 1 : // split it, even if currently aligned with a grandparent file because
307 1 : // it's too small.
308 1 : return NoSplit
309 1 : case estSize >= 2*f.targetFileSize:
310 1 : // The estimated file size is double the target file size. Split it even
311 1 : // if we were not aligned with a grandparent file boundary to avoid
312 1 : // excessively exceeding the target file size.
313 1 : return SplitNow
314 1 : case !atGrandparentBoundary:
315 1 : // Don't split if we're not at a grandparent, except if we've exhausted
316 1 : // all the grandparents overlapping this compaction's key range. Then we
317 1 : // may want to split purely based on file size.
318 1 : if f.nextGrandparent == nil {
319 1 : // There are no more grandparents. Optimize for the target file size
320 1 : // and split as soon as we hit the target file size.
321 1 : if estSize >= f.targetFileSize {
322 1 : return SplitNow
323 1 : }
324 : }
325 1 : return NoSplit
326 1 : default:
327 1 : // INVARIANT: atGrandparentBoundary
328 1 : // INVARIANT: targetSize/2 < estSize < 2*targetSize
329 1 : //
330 1 : // The estimated file size is close enough to the target file size that
331 1 : // we should consider splitting.
332 1 : //
333 1 : // Determine whether to split now based on how many grandparent
334 1 : // boundaries we have already observed while building this output file.
335 1 : // The intuition here is that if the grandparent level is dense in this
336 1 : // part of the keyspace, we're likely to continue to have more
337 1 : // opportunities to split this file aligned with a grandparent. If this
338 1 : // is the first grandparent boundary observed, we split immediately
339 1 : // (we're already at ≥50% the target file size). Otherwise, each
340 1 : // overlapping grandparent we've observed increases the minimum file
341 1 : // size by 5% of the target file size, up to at most 90% of the target
342 1 : // file size.
343 1 : //
344 1 : // TODO(jackson): The particular thresholds are somewhat unprincipled.
345 1 : // This is the same heuristic as RocksDB implements. Is there are more
346 1 : // principled formulation that can, further reduce w-amp, produce files
347 1 : // closer to the target file size, or is more understandable?
348 1 :
349 1 : // NB: Subtract 1 from `boundariesObserved` to account for the current
350 1 : // boundary we're considering splitting at. `reached` will have
351 1 : // incremented it at the same time it set `atGrandparentBoundary`.
352 1 : minimumPctOfTargetSize := 50 + 5*min(f.boundariesObserved-1, 8)
353 1 : if estSize < (minimumPctOfTargetSize*f.targetFileSize)/100 {
354 1 : return NoSplit
355 1 : }
356 1 : return SplitNow
357 : }
358 : }
359 :
360 1 : func (f *fileSizeSplitter) OnNewOutput(key []byte) []byte {
361 1 : f.boundariesObserved = 0
362 1 : return nil
363 1 : }
364 :
365 : // A frontier is used to monitor a compaction's progression across the user
366 : // keyspace.
367 : //
368 : // A frontier hold a user key boundary that it's concerned with in its `key`
369 : // field. If/when the compaction iterator returns an InternalKey with a user key
370 : // _k_ such that k ≥ frontier.key, the compaction iterator invokes the
371 : // frontier's `reached` function, passing _k_ as its argument.
372 : //
373 : // The `reached` function returns a new value to use as the key. If `reached`
374 : // returns nil, the frontier is forgotten and its `reached` method will not be
375 : // invoked again, unless the user calls [Update] to set a new key.
376 : //
377 : // A frontier's key may be updated outside the context of a `reached`
378 : // invocation at any time, through its Update method.
379 : type frontier struct {
380 : // container points to the containing *Frontiers that was passed to Init
381 : // when the frontier was initialized.
382 : container *Frontiers
383 :
384 : // key holds the frontier's current key. If nil, this frontier is inactive
385 : // and its reached func will not be invoked. The value of this key may only
386 : // be updated by the `Frontiers` type, or the Update method.
387 : key []byte
388 :
389 : // reached is invoked to inform a frontier that its key has been reached.
390 : // It's invoked with the user key that reached the limit. The `key` argument
391 : // is guaranteed to be ≥ the frontier's key.
392 : //
393 : // After reached is invoked, the frontier's key is updated to the return
394 : // value of `reached`. Note bene, the frontier is permitted to update its
395 : // key to a user key ≤ the argument `key`.
396 : //
397 : // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
398 : // frontier will receive reached(k2) calls until it returns nil or a key
399 : // `k3` such that k2 < k3. This property is useful for Frontiers that use
400 : // `reached` invocations to drive iteration through collections of keys that
401 : // may contain multiple keys that are both < k2 and ≥ k1.
402 : reached func(key []byte) (next []byte)
403 : }
404 :
405 : // Init initializes the frontier with the provided key and reached callback.
406 : // The frontier is attached to the provided *Frontiers and the provided reached
407 : // func will be invoked when the *Frontiers is advanced to a key ≥ this
408 : // frontier's key.
409 : func (f *frontier) Init(
410 : frontiers *Frontiers, initialKey []byte, reached func(key []byte) (next []byte),
411 1 : ) {
412 1 : *f = frontier{
413 1 : container: frontiers,
414 1 : key: initialKey,
415 1 : reached: reached,
416 1 : }
417 1 : if initialKey != nil {
418 1 : f.container.push(f)
419 1 : }
420 : }
421 :
422 : // String implements fmt.Stringer.
423 0 : func (f *frontier) String() string {
424 0 : return string(f.key)
425 0 : }
426 :
427 : // Update replaces the existing frontier's key with the provided key. The
428 : // frontier's reached func will be invoked when the new key is reached.
429 1 : func (f *frontier) Update(key []byte) {
430 1 : c := f.container
431 1 : prevKeyIsNil := f.key == nil
432 1 : f.key = key
433 1 : if prevKeyIsNil {
434 1 : if key != nil {
435 1 : c.push(f)
436 1 : }
437 1 : return
438 : }
439 :
440 : // Find the frontier within the heap (it must exist within the heap because
441 : // f.key was != nil). If the frontier key is now nil, remove it from the
442 : // heap. Otherwise, fix up its position.
443 1 : for i := 0; i < len(c.items); i++ {
444 1 : if c.items[i] == f {
445 1 : if key != nil {
446 1 : c.fix(i)
447 1 : } else {
448 1 : n := c.len() - 1
449 1 : c.swap(i, n)
450 1 : c.down(i, n)
451 1 : c.items = c.items[:n]
452 1 : }
453 1 : return
454 : }
455 : }
456 0 : panic("unreachable")
457 : }
458 :
459 : // Frontiers is used to track progression of a task (eg, compaction) across the
460 : // keyspace. Clients that want to be informed when the task advances to a key ≥
461 : // some frontier may register a frontier, providing a callback. The task calls
462 : // `Advance(k)` with each user key encountered, which invokes the `reached` func
463 : // on all tracked Frontiers with `key`s ≤ k.
464 : //
465 : // Internally, Frontiers is implemented as a simple heap.
466 : type Frontiers struct {
467 : cmp base.Compare
468 : items []*frontier
469 : }
470 :
471 : // Init initializes a Frontiers for use.
472 1 : func (f *Frontiers) Init(cmp base.Compare) {
473 1 : f.cmp = cmp
474 1 : }
475 :
476 : // String implements fmt.Stringer.
477 0 : func (f *Frontiers) String() string {
478 0 : var buf bytes.Buffer
479 0 : for i := 0; i < len(f.items); i++ {
480 0 : if i > 0 {
481 0 : fmt.Fprint(&buf, ", ")
482 0 : }
483 0 : fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
484 : }
485 0 : return buf.String()
486 : }
487 :
488 : // Advance notifies all member Frontiers with keys ≤ k.
489 1 : func (f *Frontiers) Advance(k []byte) {
490 1 : for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
491 1 : // This frontier has been reached. Invoke the closure and update with
492 1 : // the next frontier.
493 1 : f.items[0].key = f.items[0].reached(k)
494 1 : if f.items[0].key == nil {
495 1 : // This was the final frontier that this user was concerned with.
496 1 : // Remove it from the heap.
497 1 : f.pop()
498 1 : } else {
499 1 : // Fix up the heap root.
500 1 : f.fix(0)
501 1 : }
502 : }
503 : }
504 :
505 1 : func (f *Frontiers) len() int {
506 1 : return len(f.items)
507 1 : }
508 :
509 1 : func (f *Frontiers) less(i, j int) bool {
510 1 : return f.cmp(f.items[i].key, f.items[j].key) < 0
511 1 : }
512 :
513 1 : func (f *Frontiers) swap(i, j int) {
514 1 : f.items[i], f.items[j] = f.items[j], f.items[i]
515 1 : }
516 :
517 : // fix, up and down are copied from the go stdlib.
518 :
519 1 : func (f *Frontiers) fix(i int) {
520 1 : if !f.down(i, f.len()) {
521 1 : f.up(i)
522 1 : }
523 : }
524 :
525 1 : func (f *Frontiers) push(ff *frontier) {
526 1 : n := len(f.items)
527 1 : f.items = append(f.items, ff)
528 1 : f.up(n)
529 1 : }
530 :
531 1 : func (f *Frontiers) pop() *frontier {
532 1 : n := f.len() - 1
533 1 : f.swap(0, n)
534 1 : f.down(0, n)
535 1 : item := f.items[n]
536 1 : f.items = f.items[:n]
537 1 : return item
538 1 : }
539 :
540 1 : func (f *Frontiers) up(j int) {
541 1 : for {
542 1 : i := (j - 1) / 2 // parent
543 1 : if i == j || !f.less(j, i) {
544 1 : break
545 : }
546 1 : f.swap(i, j)
547 1 : j = i
548 : }
549 : }
550 :
551 1 : func (f *Frontiers) down(i0, n int) bool {
552 1 : i := i0
553 1 : for {
554 1 : j1 := 2*i + 1
555 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
556 1 : break
557 : }
558 1 : j := j1 // left child
559 1 : if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
560 1 : j = j2 // = 2*i + 2 // right child
561 1 : }
562 1 : if !f.less(j, i) {
563 1 : break
564 : }
565 1 : f.swap(i, j)
566 1 : i = j
567 : }
568 1 : return i > i0
569 : }
|