Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package compact
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/invariants"
14 : "github.com/cockroachdb/pebble/internal/manifest"
15 : )
16 :
17 : // ShouldSplit indicates whether a compaction should split between output files.
18 : // See the OutputSplitter interface.
19 : type ShouldSplit bool
20 :
21 : const (
22 : // NoSplit may be returned by an OutputSplitter to indicate that it does NOT
23 : // recommend splitting compaction output sstables between the previous key
24 : // and the next key.
25 : NoSplit ShouldSplit = false
26 : // SplitNow may be returned by an OutputSplitter to indicate that it does
27 : // recommend splitting compaction output sstables between the previous key
28 : // and the next key.
29 : SplitNow ShouldSplit = true
30 : )
31 :
32 : // String implements the Stringer interface.
33 0 : func (s ShouldSplit) String() string {
34 0 : if s == NoSplit {
35 0 : return "no-split"
36 0 : }
37 0 : return "split-now"
38 : }
39 :
40 : // OutputSplitter is used to determine where to split output tables in a
41 : // compaction.
42 : //
43 : // An OutputSplitter is initialized when we start an output file:
44 : //
45 : // s := NewOutputSplitter(...)
46 : // for nextKey != nil && !s.ShouldSplitBefore(nextKey, ...) {
47 : // ...
48 : // }
49 : // splitKey := s.SplitKey()
50 : //
51 : // OutputSplitter enforces a target file size. This splitter splits to a new
52 : // output file when the estimated file size is 0.5x-2x the target file size. If
53 : // there are overlapping grandparent files, this splitter will attempt to split
54 : // at a grandparent boundary. For example, consider the example where a
55 : // compaction wrote 'd' to the current output file, and the next key has a user
56 : // key 'g':
57 : //
58 : // previous key next key
59 : // | |
60 : // | |
61 : // +---------------|----+ +--|----------+
62 : // grandparents: | 000006 | | | | 000007 |
63 : // +---------------|----+ +--|----------+
64 : // a b d e f g i
65 : //
66 : // Splitting the output file F before 'g' will ensure that the current output
67 : // file F does not overlap the grandparent file 000007. Aligning sstable
68 : // boundaries like this can significantly reduce write amplification, since a
69 : // subsequent compaction of F into the grandparent level will avoid needlessly
70 : // rewriting any keys within 000007 that do not overlap F's bounds. Consider the
71 : // following compaction:
72 : //
73 : // +----------------------+
74 : // input | |
75 : // level +----------------------+
76 : // \/
77 : // +---------------+ +---------------+
78 : // output |XXXXXXX| | | |XXXXXXXX|
79 : // level +---------------+ +---------------+
80 : //
81 : // The input-level file overlaps two files in the output level, but only
82 : // partially. The beginning of the first output-level file and the end of the
83 : // second output-level file will be rewritten verbatim. This write I/O is
84 : // "wasted" in the sense that no merging is being performed.
85 : //
86 : // To prevent the above waste, this splitter attempts to split output files
87 : // before the start key of grandparent files. It still strives to write output
88 : // files of approximately the target file size, by constraining this splitting
89 : // at grandparent points to apply only if the current output's file size is
90 : // about the right order of magnitude.
91 : //
92 : // OutputSplitter guarantees that we never split user keys between files.
93 : //
94 : // The dominant cost of OutputSplitter is one key comparison per
95 : // ShouldSplitBefore call.
96 : type OutputSplitter struct {
97 : cmp base.Compare
98 : startKey []byte
99 : limit []byte
100 : targetFileSize uint64
101 : frontier frontier
102 :
103 : shouldSplitCalled bool
104 :
105 : nextBoundary splitterBoundary
106 : // reachedBoundary is set when the frontier reaches a boundary and is cleared
107 : // in the first ShouldSplitBefore call after that.
108 : reachedBoundary splitterBoundary
109 :
110 : grandparentBoundariesObserved uint64
111 : grandparentLevel manifest.LevelIterator
112 :
113 : splitKey []byte
114 : }
115 :
116 : type splitterBoundary struct {
117 : key []byte
118 : // isGrandparent is true if this boundary corresponds to a grandparent boundary.
119 : // It is false when the boundary is unset or it is a limit boundary.
120 : isGrandparent bool
121 : }
122 :
123 : // NewOutputSplitter creates a new OutputSplitter. See OutputSplitter for more
124 : // information.
125 : //
126 : // The limitKey must be either nil (no limit) or a key greater than startKey.
127 : //
128 : // NewOutputSplitter registers the splitter with the provided Frontiers.
129 : //
130 : // Note: it is allowed for the startKey to be behind the current frontier, as
131 : // long as the key in the first ShouldSplitBefore call is at the frontier.
132 : func NewOutputSplitter(
133 : cmp base.Compare,
134 : startKey []byte,
135 : limit []byte,
136 : targetFileSize uint64,
137 : grandparentLevel manifest.LevelIterator,
138 : frontiers *Frontiers,
139 1 : ) *OutputSplitter {
140 1 : s := &OutputSplitter{
141 1 : cmp: cmp,
142 1 : startKey: slices.Clone(startKey),
143 1 : targetFileSize: targetFileSize,
144 1 : grandparentLevel: grandparentLevel,
145 1 : }
146 1 : if len(limit) > 0 {
147 1 : if invariants.Enabled && cmp(startKey, limit) >= 0 {
148 0 : panic("limit <= startKey")
149 : }
150 1 : s.limit = slices.Clone(limit)
151 : }
152 : // Find the first grandparent that starts at or after startKey.
153 1 : grandparent := s.grandparentLevel.SeekGE(cmp, startKey)
154 1 : if grandparent != nil && cmp(grandparent.Smallest.UserKey, startKey) <= 0 {
155 1 : grandparent = s.grandparentLevel.Next()
156 1 : }
157 1 : s.setNextBoundary(grandparent)
158 1 : if invariants.Enabled && s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, startKey) <= 0 {
159 0 : panic("first boundary is not after startKey")
160 : }
161 : // We start using the frontier after the first ShouldSplitBefore call.
162 1 : s.frontier.Init(frontiers, nil, s.boundaryReached)
163 1 : return s
164 : }
165 :
166 : // boundaryReached is the callback registered with Frontiers; it runs whenever
167 : // the frontier advances past the current boundary.
168 1 : func (s *OutputSplitter) boundaryReached(key []byte) (nextBoundary []byte) {
169 1 : // The passed key can be past the next boundary.
170 1 : s.reachedBoundary = s.nextBoundary
171 1 : if !s.nextBoundary.isGrandparent {
172 1 : s.nextBoundary = splitterBoundary{}
173 1 : return nil
174 1 : }
175 1 : s.grandparentBoundariesObserved++
176 1 : s.setNextBoundary(s.grandparentLevel.Next())
177 1 : // It is possible that the next boundary is already reached; in that case
178 1 : // boundaryReached will just fire again immediately.
179 1 : return s.nextBoundary.key
180 : }
181 :
182 1 : func (s *OutputSplitter) setNextBoundary(nextGrandparent *manifest.FileMetadata) {
183 1 : if nextGrandparent != nil && (s.limit == nil || s.cmp(nextGrandparent.Smallest.UserKey, s.limit) < 0) {
184 1 : s.nextBoundary = splitterBoundary{
185 1 : key: nextGrandparent.Smallest.UserKey,
186 1 : isGrandparent: true,
187 1 : }
188 1 : } else {
189 1 : s.nextBoundary = splitterBoundary{
190 1 : key: s.limit,
191 1 : isGrandparent: false,
192 1 : }
193 1 : }
194 : }
195 :
196 : // ShouldSplitBefore returns whether we should split the output before the next
197 : // key. It is passed the current estimated file size and a function that can be
198 : // used to retrieve the previous user key.
199 : //
200 : // The equalPrevFn function is used to guarantee no split user keys, without
201 : // OutputSplitter copying each key internally. It is not performance sensitive,
202 : // as it is only called once we decide to split.
203 : //
204 : // Once ShouldSplitBefore returns SplitNow, it must not be called again.
205 : // SplitKey() can be used to retrieve the recommended split key.
206 : //
207 : // INVARIANT: nextUserKey must match the current frontier.
208 : func (s *OutputSplitter) ShouldSplitBefore(
209 : nextUserKey []byte, estimatedFileSize uint64, equalPrevFn func([]byte) bool,
210 1 : ) ShouldSplit {
211 1 : if invariants.Enabled && s.splitKey != nil {
212 0 : panic("ShouldSplitBefore called after it returned SplitNow")
213 : }
214 1 : if !s.shouldSplitCalled {
215 1 : // The boundary could have been advanced to nextUserKey before the splitter
216 1 : // was created. So one single time, we advance the boundary manually.
217 1 : s.shouldSplitCalled = true
218 1 : for s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, nextUserKey) <= 0 {
219 1 : s.boundaryReached(nextUserKey)
220 1 : }
221 1 : s.frontier.Update(s.nextBoundary.key)
222 : }
223 :
224 1 : if invariants.Enabled && s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, nextUserKey) <= 0 {
225 0 : panic("boundary is behind the next key (or startKey was before the boundary)")
226 : }
227 : // Note: s.reachedBoundary can be empty.
228 1 : reachedBoundary := s.reachedBoundary
229 1 : s.reachedBoundary = splitterBoundary{}
230 1 : if invariants.Enabled && reachedBoundary.key != nil && s.cmp(reachedBoundary.key, nextUserKey) > 0 {
231 0 : panic("reached boundary ahead of the next user key")
232 : }
233 1 : if reachedBoundary.key != nil && !reachedBoundary.isGrandparent {
234 1 : // Limit was reached.
235 1 : s.splitKey = s.limit
236 1 : return SplitNow
237 1 : }
238 :
239 1 : if s.shouldSplitBasedOnSize(estimatedFileSize, reachedBoundary.isGrandparent) == SplitNow {
240 1 : // We want to split here based on size, but we cannot split between two keys
241 1 : // with the same UserKey.
242 1 : //
243 1 : // If we are at a grandparent boundary, we know that this key cannot have the
244 1 : // same UserKey as the previous key (otherwise, that key would have been the
245 1 : // one hitting this boundary).
246 1 : if reachedBoundary.isGrandparent {
247 1 : s.splitKey = reachedBoundary.key
248 1 : return SplitNow
249 1 : }
250 :
251 : // When the target file size limit is very small (in tests), we could end up
252 : // splitting at the first key, which is not allowed.
253 1 : if s.cmp(nextUserKey, s.startKey) <= 0 {
254 1 : return NoSplit
255 1 : }
256 :
257 : // TODO(radu): it would make for a cleaner interface if we didn't rely on a
258 : // equalPrevFn. We could make a copy of the key here and split at the next
259 : // user key that is different; the main difficulty is that various tests
260 : // expect 1 key per output table if the target file size is very small.
261 1 : if !equalPrevFn(nextUserKey) {
262 1 : s.splitKey = slices.Clone(nextUserKey)
263 1 : return SplitNow
264 1 : }
265 : }
266 :
267 1 : return NoSplit
268 : }
269 :
270 : // SplitKey returns the suggested split key - the first key at which the next
271 : // output file should start.
272 : //
273 : // If ShouldSplitBefore never returned SplitNow, then SplitKey returns the limit
274 : // passed to NewOutputSplitter (which can be nil).
275 : //
276 : // Otherwise, it returns a key <= the key passed to the last ShouldSplitBefore
277 : // call and > the key passed to the previous call to ShouldSplitBefore (and >
278 : // than the start key). This key is guaranteed to be larger than the start key.
279 1 : func (s *OutputSplitter) SplitKey() []byte {
280 1 : s.frontier.Update(nil)
281 1 : if s.splitKey != nil {
282 1 : if invariants.Enabled && s.cmp(s.splitKey, s.startKey) <= 0 {
283 0 : panic(fmt.Sprintf("splitKey %q <= startKey %q", s.splitKey, s.startKey))
284 : }
285 1 : return s.splitKey
286 : }
287 1 : return s.limit
288 : }
289 :
290 : // shouldSplitBasedOnSize returns whether we should split based on the file size
291 : // and whether we are at a grandparent boundary.
292 : func (s *OutputSplitter) shouldSplitBasedOnSize(
293 : estSize uint64, atGrandparentBoundary bool,
294 1 : ) ShouldSplit {
295 1 : switch {
296 1 : case estSize < s.targetFileSize/2:
297 1 : // The estimated file size is less than half the target file size. Don't
298 1 : // split it, even if currently aligned with a grandparent file because
299 1 : // it's too small.
300 1 : return NoSplit
301 1 : case estSize >= 2*s.targetFileSize:
302 1 : // The estimated file size is double the target file size. Split it even
303 1 : // if we were not aligned with a grandparent file boundary to avoid
304 1 : // excessively exceeding the target file size.
305 1 : return SplitNow
306 1 : case !atGrandparentBoundary:
307 1 : // Don't split if we're not at a grandparent, except if we've exhausted all
308 1 : // the grandparents up to the limit. Then we may want to split purely based
309 1 : // on file size.
310 1 : if !s.nextBoundary.isGrandparent {
311 1 : // There are no more grandparents. Optimize for the target file size
312 1 : // and split as soon as we hit the target file size.
313 1 : if estSize >= s.targetFileSize {
314 1 : return SplitNow
315 1 : }
316 : }
317 1 : return NoSplit
318 1 : default:
319 1 : // INVARIANT: atGrandparentBoundary
320 1 : // INVARIANT: targetSize/2 < estSize < 2*targetSize
321 1 : //
322 1 : // The estimated file size is close enough to the target file size that
323 1 : // we should consider splitting.
324 1 : //
325 1 : // Determine whether to split now based on how many grandparent
326 1 : // boundaries we have already observed while building this output file.
327 1 : // The intuition here is that if the grandparent level is dense in this
328 1 : // part of the keyspace, we're likely to continue to have more
329 1 : // opportunities to split this file aligned with a grandparent. If this
330 1 : // is the first grandparent boundary observed, we split immediately
331 1 : // (we're already at ≥50% the target file size). Otherwise, each
332 1 : // overlapping grandparent we've observed increases the minimum file
333 1 : // size by 5% of the target file size, up to at most 90% of the target
334 1 : // file size.
335 1 : //
336 1 : // TODO(jackson): The particular thresholds are somewhat unprincipled.
337 1 : // This is the same heuristic as RocksDB implements. Is there are more
338 1 : // principled formulation that can, further reduce w-amp, produce files
339 1 : // closer to the target file size, or is more understandable?
340 1 :
341 1 : // NB: Subtract 1 from `boundariesObserved` to account for the current
342 1 : // boundary we're considering splitting at.
343 1 : minimumPctOfTargetSize := 50 + 5*min(s.grandparentBoundariesObserved-1, 8)
344 1 : if estSize < (minimumPctOfTargetSize*s.targetFileSize)/100 {
345 1 : return NoSplit
346 1 : }
347 1 : return SplitNow
348 : }
349 : }
350 :
351 : // A frontier is used to monitor a compaction's progression across the user
352 : // keyspace.
353 : //
354 : // A frontier hold a user key boundary that it's concerned with in its `key`
355 : // field. If/when the compaction iterator returns an InternalKey with a user key
356 : // _k_ such that k ≥ frontier.key, the compaction iterator invokes the
357 : // frontier's `reached` function, passing _k_ as its argument.
358 : //
359 : // The `reached` function returns a new value to use as the key. If `reached`
360 : // returns nil, the frontier is forgotten and its `reached` method will not be
361 : // invoked again, unless the user calls [Update] to set a new key.
362 : //
363 : // A frontier's key may be updated outside the context of a `reached`
364 : // invocation at any time, through its Update method.
365 : type frontier struct {
366 : // container points to the containing *Frontiers that was passed to Init
367 : // when the frontier was initialized.
368 : container *Frontiers
369 :
370 : // key holds the frontier's current key. If nil, this frontier is inactive
371 : // and its reached func will not be invoked. The value of this key may only
372 : // be updated by the `Frontiers` type, or the Update method.
373 : key []byte
374 :
375 : reached frontierReachedFn
376 : }
377 :
378 : // frontierReachedFn is invoked to inform a frontier that its key has been
379 : // reached. It's invoked with the user key that reached the limit. The `key`
380 : // argument is guaranteed to be ≥ the frontier's key.
381 : //
382 : // After frontierReachedFn is invoked, the frontier's key is updated to the
383 : // return value of frontierReachedFn. The frontier is permitted to update its
384 : // key to a user key ≤ the argument `key`.
385 : //
386 : // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
387 : // frontier will receive reached(k2) calls until it returns nil or a key k3 such
388 : // that k2 < k3. This property is useful for Frontiers that use
389 : // frontierReachedFn invocations to drive iteration through collections of keys
390 : // that may contain multiple keys that are both < k2 and ≥ k1.
391 : type frontierReachedFn func(currentFrontier []byte) (next []byte)
392 :
393 : // Init initializes the frontier with the provided key and reached callback.
394 : // The frontier is attached to the provided *Frontiers and the provided reached
395 : // func will be invoked when the *Frontiers is advanced to a key ≥ this
396 : // frontier's key.
397 1 : func (f *frontier) Init(frontiers *Frontiers, initialKey []byte, reached frontierReachedFn) {
398 1 : *f = frontier{
399 1 : container: frontiers,
400 1 : key: initialKey,
401 1 : reached: reached,
402 1 : }
403 1 : if initialKey != nil {
404 1 : f.container.push(f)
405 1 : }
406 : }
407 :
408 : // String implements fmt.Stringer.
409 1 : func (f *frontier) String() string {
410 1 : return string(f.key)
411 1 : }
412 :
413 : // Update replaces the existing frontier's key with the provided key. The
414 : // frontier's reached func will be invoked when the new key is reached.
415 1 : func (f *frontier) Update(key []byte) {
416 1 : c := f.container
417 1 : prevKeyIsNil := f.key == nil
418 1 : f.key = key
419 1 : if prevKeyIsNil {
420 1 : if key != nil {
421 1 : c.push(f)
422 1 : }
423 1 : return
424 : }
425 :
426 : // Find the frontier within the heap (it must exist within the heap because
427 : // f.key was != nil). If the frontier key is now nil, remove it from the
428 : // heap. Otherwise, fix up its position.
429 1 : for i := 0; i < len(c.items); i++ {
430 1 : if c.items[i] == f {
431 1 : if key != nil {
432 0 : c.fix(i)
433 1 : } else {
434 1 : n := c.len() - 1
435 1 : c.swap(i, n)
436 1 : c.down(i, n)
437 1 : c.items = c.items[:n]
438 1 : }
439 1 : return
440 : }
441 : }
442 0 : panic("unreachable")
443 : }
444 :
445 : // Frontiers is used to track progression of a task (eg, compaction) across the
446 : // keyspace. Clients that want to be informed when the task advances to a key ≥
447 : // some frontier may register a frontier, providing a callback. The task calls
448 : // `Advance(k)` with each user key encountered, which invokes the `reached` func
449 : // on all tracked Frontiers with `key`s ≤ k.
450 : //
451 : // Internally, Frontiers is implemented as a simple heap.
452 : type Frontiers struct {
453 : cmp base.Compare
454 : items []*frontier
455 : }
456 :
457 : // Init initializes a Frontiers for use.
458 1 : func (f *Frontiers) Init(cmp base.Compare) {
459 1 : f.cmp = cmp
460 1 : }
461 :
462 : // String implements fmt.Stringer.
463 1 : func (f *Frontiers) String() string {
464 1 : var buf bytes.Buffer
465 1 : for i := 0; i < len(f.items); i++ {
466 1 : if i > 0 {
467 1 : fmt.Fprint(&buf, ", ")
468 1 : }
469 1 : fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
470 : }
471 1 : return buf.String()
472 : }
473 :
474 : // Advance notifies all member Frontiers with keys ≤ k.
475 1 : func (f *Frontiers) Advance(k []byte) {
476 1 : for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
477 1 : // This frontier has been reached. Invoke the closure and update with
478 1 : // the next frontier.
479 1 : f.items[0].key = f.items[0].reached(k)
480 1 : if f.items[0].key == nil {
481 1 : // This was the final frontier that this user was concerned with.
482 1 : // Remove it from the heap.
483 1 : f.pop()
484 1 : } else {
485 1 : // Fix up the heap root. Note that if the key is still smaller than k, the
486 1 : // callback will be invoked again in the same loop.
487 1 : f.fix(0)
488 1 : }
489 : }
490 : }
491 :
492 1 : func (f *Frontiers) len() int {
493 1 : return len(f.items)
494 1 : }
495 :
496 1 : func (f *Frontiers) less(i, j int) bool {
497 1 : return f.cmp(f.items[i].key, f.items[j].key) < 0
498 1 : }
499 :
500 1 : func (f *Frontiers) swap(i, j int) {
501 1 : f.items[i], f.items[j] = f.items[j], f.items[i]
502 1 : }
503 :
504 : // fix, up and down are copied from the go stdlib.
505 :
506 1 : func (f *Frontiers) fix(i int) {
507 1 : if !f.down(i, f.len()) {
508 1 : f.up(i)
509 1 : }
510 : }
511 :
512 1 : func (f *Frontiers) push(ff *frontier) {
513 1 : n := len(f.items)
514 1 : f.items = append(f.items, ff)
515 1 : f.up(n)
516 1 : }
517 :
518 1 : func (f *Frontiers) pop() *frontier {
519 1 : n := f.len() - 1
520 1 : f.swap(0, n)
521 1 : f.down(0, n)
522 1 : item := f.items[n]
523 1 : f.items = f.items[:n]
524 1 : return item
525 1 : }
526 :
527 1 : func (f *Frontiers) up(j int) {
528 1 : for {
529 1 : i := (j - 1) / 2 // parent
530 1 : if i == j || !f.less(j, i) {
531 1 : break
532 : }
533 1 : f.swap(i, j)
534 1 : j = i
535 : }
536 : }
537 :
538 1 : func (f *Frontiers) down(i0, n int) bool {
539 1 : i := i0
540 1 : for {
541 1 : j1 := 2*i + 1
542 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
543 1 : break
544 : }
545 1 : j := j1 // left child
546 1 : if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
547 1 : j = j2 // = 2*i + 2 // right child
548 1 : }
549 1 : if !f.less(j, i) {
550 1 : break
551 : }
552 1 : f.swap(i, j)
553 1 : i = j
554 : }
555 1 : return i > i0
556 : }
|