Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package compact
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/invariants"
14 : "github.com/cockroachdb/pebble/internal/manifest"
15 : )
16 :
17 : // ShouldSplit indicates whether a compaction should split between output files.
18 : // See the OutputSplitter interface.
19 : type ShouldSplit bool
20 :
21 : const (
22 : // NoSplit may be returned by an OutputSplitter to indicate that it does NOT
23 : // recommend splitting compaction output sstables between the previous key
24 : // and the next key.
25 : NoSplit ShouldSplit = false
26 : // SplitNow may be returned by an OutputSplitter to indicate that it does
27 : // recommend splitting compaction output sstables between the previous key
28 : // and the next key.
29 : SplitNow ShouldSplit = true
30 : )
31 :
32 : // String implements the Stringer interface.
33 0 : func (s ShouldSplit) String() string {
34 0 : if s == NoSplit {
35 0 : return "no-split"
36 0 : }
37 0 : return "split-now"
38 : }
39 :
40 : // OutputSplitter is used to determine where to split output tables in a
41 : // compaction.
42 : //
43 : // An OutputSplitter is initialized when we start an output file:
44 : //
45 : // s := NewOutputSplitter(...)
46 : // for nextKey != nil && !s.ShouldSplitBefore(nextKey, ...) {
47 : // ...
48 : // }
49 : // splitKey := s.SplitKey()
50 : //
51 : // OutputSplitter enforces a target file size. This splitter splits to a new
52 : // output file when the estimated file size is 0.5x-2x the target file size. If
53 : // there are overlapping grandparent files, this splitter will attempt to split
54 : // at a grandparent boundary. For example, consider the example where a
55 : // compaction wrote 'd' to the current output file, and the next key has a user
56 : // key 'g':
57 : //
58 : // previous key next key
59 : // | |
60 : // | |
61 : // +---------------|----+ +--|----------+
62 : // grandparents: | 000006 | | | | 000007 |
63 : // +---------------|----+ +--|----------+
64 : // a b d e f g i
65 : //
66 : // Splitting the output file F before 'g' will ensure that the current output
67 : // file F does not overlap the grandparent file 000007. Aligning sstable
68 : // boundaries like this can significantly reduce write amplification, since a
69 : // subsequent compaction of F into the grandparent level will avoid needlessly
70 : // rewriting any keys within 000007 that do not overlap F's bounds. Consider the
71 : // following compaction:
72 : //
73 : // +----------------------+
74 : // input | |
75 : // level +----------------------+
76 : // \/
77 : // +---------------+ +---------------+
78 : // output |XXXXXXX| | | |XXXXXXXX|
79 : // level +---------------+ +---------------+
80 : //
81 : // The input-level file overlaps two files in the output level, but only
82 : // partially. The beginning of the first output-level file and the end of the
83 : // second output-level file will be rewritten verbatim. This write I/O is
84 : // "wasted" in the sense that no merging is being performed.
85 : //
86 : // To prevent the above waste, this splitter attempts to split output files
87 : // before the start key of grandparent files. It still strives to write output
88 : // files of approximately the target file size, by constraining this splitting
89 : // at grandparent points to apply only if the current output's file size is
90 : // about the right order of magnitude.
91 : //
92 : // OutputSplitter guarantees that we never split user keys between files.
93 : //
94 : // The dominant cost of OutputSplitter is one key comparison per
95 : // ShouldSplitBefore call.
96 : type OutputSplitter struct {
97 : cmp base.Compare
98 : startKey []byte
99 : limit []byte
100 : targetFileSize uint64
101 : frontier frontier
102 :
103 : shouldSplitCalled bool
104 :
105 : nextBoundary splitterBoundary
106 : // reachedBoundary is set when the frontier reaches a boundary and is cleared
107 : // in the first ShouldSplitBefore call after that.
108 : reachedBoundary splitterBoundary
109 :
110 : grandparentBoundariesObserved uint64
111 : grandparentLevel manifest.LevelIterator
112 :
113 : splitKey []byte
114 : }
115 :
116 : type splitterBoundary struct {
117 : key []byte
118 : // isGrandparent is true if this boundary corresponds to a grandparent boundary.
119 : // It is false when the boundary is unset or it is a limit boundary.
120 : isGrandparent bool
121 : }
122 :
123 : // NewOutputSplitter creates a new OutputSplitter. See OutputSplitter for more
124 : // information.
125 : //
126 : // The limitKey must be either nil (no limit) or a key greater than startKey.
127 : //
128 : // NewOutputSplitter registers the splitter with the provided Frontiers.
129 : //
130 : // Note: it is allowed for the startKey to be behind the current frontier, as
131 : // long as the key in the first ShouldSplitBefore call is at the frontier.
132 : func NewOutputSplitter(
133 : cmp base.Compare,
134 : startKey []byte,
135 : limit []byte,
136 : targetFileSize uint64,
137 : grandparentLevel manifest.LevelIterator,
138 : frontiers *Frontiers,
139 2 : ) *OutputSplitter {
140 2 : s := &OutputSplitter{
141 2 : cmp: cmp,
142 2 : startKey: slices.Clone(startKey),
143 2 : targetFileSize: targetFileSize,
144 2 : grandparentLevel: grandparentLevel,
145 2 : }
146 2 : if len(limit) > 0 {
147 2 : if invariants.Enabled && cmp(startKey, limit) >= 0 {
148 0 : panic("limit <= startKey")
149 : }
150 2 : s.limit = slices.Clone(limit)
151 : }
152 : // Find the first grandparent that starts at or after startKey.
153 2 : grandparent := s.grandparentLevel.SeekGE(cmp, startKey)
154 2 : if grandparent != nil && cmp(grandparent.Smallest.UserKey, startKey) <= 0 {
155 2 : grandparent = s.grandparentLevel.Next()
156 2 : }
157 2 : s.setNextBoundary(grandparent)
158 2 : if invariants.Enabled && s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, startKey) <= 0 {
159 0 : panic("first boundary is not after startKey")
160 : }
161 : // We start using the frontier after the first ShouldSplitBefore call.
162 2 : s.frontier.Init(frontiers, nil, s.boundaryReached)
163 2 : return s
164 : }
165 :
166 : // boundaryReached is the callback registered with Frontiers; it runs whenever
167 : // the frontier advances past the current boundary.
168 2 : func (s *OutputSplitter) boundaryReached(key []byte) (nextBoundary []byte) {
169 2 : // The passed key can be past the next boundary.
170 2 : s.reachedBoundary = s.nextBoundary
171 2 : if !s.nextBoundary.isGrandparent {
172 2 : s.nextBoundary = splitterBoundary{}
173 2 : return nil
174 2 : }
175 2 : s.grandparentBoundariesObserved++
176 2 : s.setNextBoundary(s.grandparentLevel.Next())
177 2 : // It is possible that the next boundary is already reached; in that case
178 2 : // boundaryReached will just fire again immediately.
179 2 : return s.nextBoundary.key
180 : }
181 :
182 2 : func (s *OutputSplitter) setNextBoundary(nextGrandparent *manifest.FileMetadata) {
183 2 : if nextGrandparent != nil && (s.limit == nil || s.cmp(nextGrandparent.Smallest.UserKey, s.limit) < 0) {
184 2 : s.nextBoundary = splitterBoundary{
185 2 : key: nextGrandparent.Smallest.UserKey,
186 2 : isGrandparent: true,
187 2 : }
188 2 : } else {
189 2 : s.nextBoundary = splitterBoundary{
190 2 : key: s.limit,
191 2 : isGrandparent: false,
192 2 : }
193 2 : }
194 : }
195 :
196 : // ShouldSplitBefore returns whether we should split the output before the next
197 : // key. It is passed the current estimated file size and a function that can be
198 : // used to retrieve the previous user key.
199 : //
200 : // The equalPrevFn function is used to guarantee no split user keys, without
201 : // OutputSplitter copying each key internally. It is not performance sensitive,
202 : // as it is only called once we decide to split.
203 : //
204 : // Once ShouldSplitBefore returns SplitNow, it must not be called again.
205 : // SplitKey() can be used to retrieve the recommended split key.
206 : //
207 : // INVARIANT: nextUserKey must match the current frontier.
208 : func (s *OutputSplitter) ShouldSplitBefore(
209 : nextUserKey []byte, estimatedFileSize uint64, equalPrevFn func([]byte) bool,
210 2 : ) ShouldSplit {
211 2 : if invariants.Enabled && s.splitKey != nil {
212 0 : panic("ShouldSplitBefore called after it returned SplitNow")
213 : }
214 2 : if !s.shouldSplitCalled {
215 2 : // The boundary could have been advanced to nextUserKey before the splitter
216 2 : // was created. So one single time, we advance the boundary manually.
217 2 : s.shouldSplitCalled = true
218 2 : for s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, nextUserKey) <= 0 {
219 2 : s.boundaryReached(nextUserKey)
220 2 : }
221 2 : s.frontier.Update(s.nextBoundary.key)
222 : }
223 :
224 2 : if invariants.Enabled && s.nextBoundary.key != nil && s.cmp(s.nextBoundary.key, nextUserKey) <= 0 {
225 0 : panic("boundary is behind the next key (or startKey was before the boundary)")
226 : }
227 : // Note: s.reachedBoundary can be empty.
228 2 : reachedBoundary := s.reachedBoundary
229 2 : s.reachedBoundary = splitterBoundary{}
230 2 : if invariants.Enabled && reachedBoundary.key != nil && s.cmp(reachedBoundary.key, nextUserKey) > 0 {
231 0 : panic("reached boundary ahead of the next user key")
232 : }
233 2 : if reachedBoundary.key != nil && !reachedBoundary.isGrandparent {
234 2 : // Limit was reached.
235 2 : s.splitKey = s.limit
236 2 : return SplitNow
237 2 : }
238 :
239 2 : if s.shouldSplitBasedOnSize(estimatedFileSize, reachedBoundary.isGrandparent) == SplitNow {
240 2 : // We want to split here based on size, but we cannot split between two keys
241 2 : // with the same UserKey.
242 2 : //
243 2 : // If we are at a grandparent boundary, we know that this key cannot have the
244 2 : // same UserKey as the previous key (otherwise, that key would have been the
245 2 : // one hitting this boundary).
246 2 : if reachedBoundary.isGrandparent {
247 2 : s.splitKey = reachedBoundary.key
248 2 : return SplitNow
249 2 : }
250 :
251 : // When the target file size limit is very small (in tests), we could end up
252 : // splitting at the first key, which is not allowed.
253 2 : if s.cmp(nextUserKey, s.startKey) <= 0 {
254 2 : return NoSplit
255 2 : }
256 :
257 : // TODO(radu): it would make for a cleaner interface if we didn't rely on a
258 : // equalPrevFn. We could make a copy of the key here and split at the next
259 : // user key that is different; the main difficulty is that various tests
260 : // expect 1 key per output table if the target file size is very small.
261 2 : if !equalPrevFn(nextUserKey) {
262 2 : s.splitKey = slices.Clone(nextUserKey)
263 2 : return SplitNow
264 2 : }
265 : }
266 :
267 2 : return NoSplit
268 : }
269 :
270 : // SplitKey returns the suggested split key - the first key at which the next
271 : // output file should start.
272 : //
273 : // If ShouldSplitBefore never returned SplitNow, then SplitKey returns the limit
274 : // passed to NewOutputSplitter (which can be nil).
275 : //
276 : // Otherwise, it returns a key <= the key passed to the last ShouldSplitBefore
277 : // call and > the key passed to the previous call to ShouldSplitBefore (and >
278 : // than the start key). This key is guaranteed to be larger than the start key.
279 2 : func (s *OutputSplitter) SplitKey() []byte {
280 2 : s.frontier.Update(nil)
281 2 : if s.splitKey != nil {
282 2 : if invariants.Enabled && s.cmp(s.splitKey, s.startKey) <= 0 {
283 0 : panic(fmt.Sprintf("splitKey %q <= startKey %q", s.splitKey, s.startKey))
284 : }
285 2 : return s.splitKey
286 : }
287 2 : return s.limit
288 : }
289 :
290 : // shouldSplitBasedOnSize returns whether we should split based on the file size
291 : // and whether we are at a grandparent boundary.
292 : func (s *OutputSplitter) shouldSplitBasedOnSize(
293 : estSize uint64, atGrandparentBoundary bool,
294 2 : ) ShouldSplit {
295 2 : switch {
296 2 : case estSize < s.targetFileSize/2:
297 2 : // The estimated file size is less than half the target file size. Don't
298 2 : // split it, even if currently aligned with a grandparent file because
299 2 : // it's too small.
300 2 : return NoSplit
301 2 : case estSize >= 2*s.targetFileSize:
302 2 : // The estimated file size is double the target file size. Split it even
303 2 : // if we were not aligned with a grandparent file boundary to avoid
304 2 : // excessively exceeding the target file size.
305 2 : return SplitNow
306 2 : case !atGrandparentBoundary:
307 2 : // Don't split if we're not at a grandparent, except if we've exhausted all
308 2 : // the grandparents up to the limit. Then we may want to split purely based
309 2 : // on file size.
310 2 : if !s.nextBoundary.isGrandparent {
311 2 : // There are no more grandparents. Optimize for the target file size
312 2 : // and split as soon as we hit the target file size.
313 2 : if estSize >= s.targetFileSize {
314 2 : return SplitNow
315 2 : }
316 : }
317 2 : return NoSplit
318 2 : default:
319 2 : // INVARIANT: atGrandparentBoundary
320 2 : // INVARIANT: targetSize/2 < estSize < 2*targetSize
321 2 : //
322 2 : // The estimated file size is close enough to the target file size that
323 2 : // we should consider splitting.
324 2 : //
325 2 : // Determine whether to split now based on how many grandparent
326 2 : // boundaries we have already observed while building this output file.
327 2 : // The intuition here is that if the grandparent level is dense in this
328 2 : // part of the keyspace, we're likely to continue to have more
329 2 : // opportunities to split this file aligned with a grandparent. If this
330 2 : // is the first grandparent boundary observed, we split immediately
331 2 : // (we're already at ≥50% the target file size). Otherwise, each
332 2 : // overlapping grandparent we've observed increases the minimum file
333 2 : // size by 5% of the target file size, up to at most 90% of the target
334 2 : // file size.
335 2 : //
336 2 : // TODO(jackson): The particular thresholds are somewhat unprincipled.
337 2 : // This is the same heuristic as RocksDB implements. Is there are more
338 2 : // principled formulation that can, further reduce w-amp, produce files
339 2 : // closer to the target file size, or is more understandable?
340 2 :
341 2 : // NB: Subtract 1 from `boundariesObserved` to account for the current
342 2 : // boundary we're considering splitting at.
343 2 : minimumPctOfTargetSize := 50 + 5*min(s.grandparentBoundariesObserved-1, 8)
344 2 : if estSize < (minimumPctOfTargetSize*s.targetFileSize)/100 {
345 2 : return NoSplit
346 2 : }
347 2 : return SplitNow
348 : }
349 : }
350 :
351 : // A frontier is used to monitor a compaction's progression across the user
352 : // keyspace.
353 : //
354 : // A frontier hold a user key boundary that it's concerned with in its `key`
355 : // field. If/when the compaction iterator returns an InternalKey with a user key
356 : // _k_ such that k ≥ frontier.key, the compaction iterator invokes the
357 : // frontier's `reached` function, passing _k_ as its argument.
358 : //
359 : // The `reached` function returns a new value to use as the key. If `reached`
360 : // returns nil, the frontier is forgotten and its `reached` method will not be
361 : // invoked again, unless the user calls [Update] to set a new key.
362 : //
363 : // A frontier's key may be updated outside the context of a `reached`
364 : // invocation at any time, through its Update method.
365 : type frontier struct {
366 : // container points to the containing *Frontiers that was passed to Init
367 : // when the frontier was initialized.
368 : container *Frontiers
369 :
370 : // key holds the frontier's current key. If nil, this frontier is inactive
371 : // and its reached func will not be invoked. The value of this key may only
372 : // be updated by the `Frontiers` type, or the Update method.
373 : key []byte
374 :
375 : reached frontierReachedFn
376 : }
377 :
378 : // frontierReachedFn is invoked to inform a frontier that its key has been
379 : // reached. It's invoked with the user key that reached the limit. The `key`
380 : // argument is guaranteed to be ≥ the frontier's key.
381 : //
382 : // After frontierReachedFn is invoked, the frontier's key is updated to the
383 : // return value of frontierReachedFn. The frontier is permitted to update its
384 : // key to a user key ≤ the argument `key`.
385 : //
386 : // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
387 : // frontier will receive reached(k2) calls until it returns nil or a key k3 such
388 : // that k2 < k3. This property is useful for Frontiers that use
389 : // frontierReachedFn invocations to drive iteration through collections of keys
390 : // that may contain multiple keys that are both < k2 and ≥ k1.
391 : type frontierReachedFn func(currentFrontier []byte) (next []byte)
392 :
393 : // Init initializes the frontier with the provided key and reached callback.
394 : // The frontier is attached to the provided *Frontiers and the provided reached
395 : // func will be invoked when the *Frontiers is advanced to a key ≥ this
396 : // frontier's key.
397 2 : func (f *frontier) Init(frontiers *Frontiers, initialKey []byte, reached frontierReachedFn) {
398 2 : *f = frontier{
399 2 : container: frontiers,
400 2 : key: initialKey,
401 2 : reached: reached,
402 2 : }
403 2 : if initialKey != nil {
404 1 : f.container.push(f)
405 1 : }
406 : }
407 :
408 : // String implements fmt.Stringer.
409 1 : func (f *frontier) String() string {
410 1 : return string(f.key)
411 1 : }
412 :
413 : // Update replaces the existing frontier's key with the provided key. The
414 : // frontier's reached func will be invoked when the new key is reached.
415 2 : func (f *frontier) Update(key []byte) {
416 2 : c := f.container
417 2 : prevKeyIsNil := f.key == nil
418 2 : f.key = key
419 2 : if prevKeyIsNil {
420 2 : if key != nil {
421 2 : c.push(f)
422 2 : }
423 2 : return
424 : }
425 :
426 : // Find the frontier within the heap (it must exist within the heap because
427 : // f.key was != nil). If the frontier key is now nil, remove it from the
428 : // heap. Otherwise, fix up its position.
429 2 : for i := 0; i < len(c.items); i++ {
430 2 : if c.items[i] == f {
431 2 : if key != nil {
432 0 : c.fix(i)
433 2 : } else {
434 2 : n := c.len() - 1
435 2 : c.swap(i, n)
436 2 : c.down(i, n)
437 2 : c.items = c.items[:n]
438 2 : }
439 2 : return
440 : }
441 : }
442 0 : panic("unreachable")
443 : }
444 :
445 : // Frontiers is used to track progression of a task (eg, compaction) across the
446 : // keyspace. Clients that want to be informed when the task advances to a key ≥
447 : // some frontier may register a frontier, providing a callback. The task calls
448 : // `Advance(k)` with each user key encountered, which invokes the `reached` func
449 : // on all tracked Frontiers with `key`s ≤ k.
450 : //
451 : // Internally, Frontiers is implemented as a simple heap.
452 : type Frontiers struct {
453 : cmp base.Compare
454 : items []*frontier
455 : }
456 :
457 : // Init initializes a Frontiers for use.
458 2 : func (f *Frontiers) Init(cmp base.Compare) {
459 2 : f.cmp = cmp
460 2 : }
461 :
462 : // String implements fmt.Stringer.
463 1 : func (f *Frontiers) String() string {
464 1 : var buf bytes.Buffer
465 1 : for i := 0; i < len(f.items); i++ {
466 1 : if i > 0 {
467 1 : fmt.Fprint(&buf, ", ")
468 1 : }
469 1 : fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
470 : }
471 1 : return buf.String()
472 : }
473 :
474 : // Advance notifies all member Frontiers with keys ≤ k.
475 2 : func (f *Frontiers) Advance(k []byte) {
476 2 : for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
477 2 : // This frontier has been reached. Invoke the closure and update with
478 2 : // the next frontier.
479 2 : f.items[0].key = f.items[0].reached(k)
480 2 : if f.items[0].key == nil {
481 2 : // This was the final frontier that this user was concerned with.
482 2 : // Remove it from the heap.
483 2 : f.pop()
484 2 : } else {
485 2 : // Fix up the heap root. Note that if the key is still smaller than k, the
486 2 : // callback will be invoked again in the same loop.
487 2 : f.fix(0)
488 2 : }
489 : }
490 : }
491 :
492 2 : func (f *Frontiers) len() int {
493 2 : return len(f.items)
494 2 : }
495 :
496 2 : func (f *Frontiers) less(i, j int) bool {
497 2 : return f.cmp(f.items[i].key, f.items[j].key) < 0
498 2 : }
499 :
500 2 : func (f *Frontiers) swap(i, j int) {
501 2 : f.items[i], f.items[j] = f.items[j], f.items[i]
502 2 : }
503 :
504 : // fix, up and down are copied from the go stdlib.
505 :
506 2 : func (f *Frontiers) fix(i int) {
507 2 : if !f.down(i, f.len()) {
508 2 : f.up(i)
509 2 : }
510 : }
511 :
512 2 : func (f *Frontiers) push(ff *frontier) {
513 2 : n := len(f.items)
514 2 : f.items = append(f.items, ff)
515 2 : f.up(n)
516 2 : }
517 :
518 2 : func (f *Frontiers) pop() *frontier {
519 2 : n := f.len() - 1
520 2 : f.swap(0, n)
521 2 : f.down(0, n)
522 2 : item := f.items[n]
523 2 : f.items = f.items[:n]
524 2 : return item
525 2 : }
526 :
527 2 : func (f *Frontiers) up(j int) {
528 2 : for {
529 2 : i := (j - 1) / 2 // parent
530 2 : if i == j || !f.less(j, i) {
531 2 : break
532 : }
533 2 : f.swap(i, j)
534 2 : j = i
535 : }
536 : }
537 :
538 2 : func (f *Frontiers) down(i0, n int) bool {
539 2 : i := i0
540 2 : for {
541 2 : j1 := 2*i + 1
542 2 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
543 2 : break
544 : }
545 2 : j := j1 // left child
546 2 : if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
547 1 : j = j2 // = 2*i + 2 // right child
548 1 : }
549 2 : if !f.less(j, i) {
550 2 : break
551 : }
552 2 : f.swap(i, j)
553 2 : i = j
554 : }
555 2 : return i > i0
556 : }
|