Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package keyspan
6 :
7 : import (
8 : "bytes"
9 : "context"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/internal/invariants"
14 : "github.com/cockroachdb/pebble/internal/treeprinter"
15 : )
16 :
17 : // BufferReuseMaxCapacity is the maximum capacity of a DefragmentingIter buffer
18 : // that DefragmentingIter will reuse. Buffers larger than this will be
19 : // discarded and reallocated as necessary.
20 : const BufferReuseMaxCapacity = 10 << 10 // 10 KB
21 :
22 : // keysReuseMaxCapacity is the maximum capacity of a []keyspan.Key buffer that
23 : // DefragmentingIter will reuse. Buffers larger than this will be discarded and
24 : // reallocated as necessary.
25 : const keysReuseMaxCapacity = 100
26 :
27 : // DefragmentMethod configures the defragmentation performed by the
28 : // DefragmentingIter.
29 : type DefragmentMethod interface {
30 : // ShouldDefragment takes two abutting spans and returns whether the two
31 : // spans should be combined into a single, defragmented Span.
32 : ShouldDefragment(suffixCmp base.CompareSuffixes, left, right *Span) bool
33 : }
34 :
35 : // The DefragmentMethodFunc type is an adapter to allow the use of ordinary
36 : // functions as DefragmentMethods. If f is a function with the appropriate
37 : // signature, DefragmentMethodFunc(f) is a DefragmentMethod that calls f.
38 : type DefragmentMethodFunc func(suffixCmp base.CompareSuffixes, left, right *Span) bool
39 :
40 : // ShouldDefragment calls f(equal, left, right).
41 : func (f DefragmentMethodFunc) ShouldDefragment(
42 : suffixCmp base.CompareSuffixes, left, right *Span,
43 2 : ) bool {
44 2 : return f(suffixCmp, left, right)
45 2 : }
46 :
47 : // DefragmentInternal configures a DefragmentingIter to defragment spans only if
48 : // they have identical keys. It requires spans' keys to be sorted in trailer
49 : // descending order.
50 : //
51 : // This defragmenting method is intended for use in compactions that may see
52 : // internal range keys fragments that may now be joined, because the state that
53 : // required their fragmentation has been dropped.
54 2 : var DefragmentInternal DefragmentMethod = DefragmentMethodFunc(func(suffixCmp base.CompareSuffixes, a, b *Span) bool {
55 2 : if a.KeysOrder != ByTrailerDesc || b.KeysOrder != ByTrailerDesc {
56 0 : panic("pebble: span keys unexpectedly not in trailer descending order")
57 : }
58 2 : if len(a.Keys) != len(b.Keys) {
59 2 : return false
60 2 : }
61 2 : for i := range a.Keys {
62 2 : if a.Keys[i].Trailer != b.Keys[i].Trailer {
63 2 : return false
64 2 : }
65 2 : if suffixCmp(a.Keys[i].Suffix, b.Keys[i].Suffix) != 0 {
66 1 : return false
67 1 : }
68 2 : if !bytes.Equal(a.Keys[i].Value, b.Keys[i].Value) {
69 1 : return false
70 1 : }
71 : }
72 2 : return true
73 : })
74 :
75 : // DefragmentReducer merges the current and next Key slices, returning a new Key
76 : // slice.
77 : //
78 : // Implementations should modify and return `cur` to save on allocations, or
79 : // consider allocating a new slice, as the `cur` slice may be retained by the
80 : // DefragmentingIter and mutated. The `next` slice must not be mutated.
81 : //
82 : // The incoming slices are sorted by (SeqNum, Kind) descending. The output slice
83 : // must also have this sort order.
84 : type DefragmentReducer func(cur, next []Key) []Key
85 :
86 : // StaticDefragmentReducer is a no-op DefragmentReducer that simply returns the
87 : // current key slice, effectively retaining the first set of keys encountered
88 : // for a defragmented span.
89 : //
90 : // This reducer can be used, for example, when the set of Keys for each Span
91 : // being reduced is not expected to change, and therefore the keys from the
92 : // first span encountered can be used without considering keys in subsequent
93 : // spans.
94 2 : var StaticDefragmentReducer DefragmentReducer = func(cur, _ []Key) []Key {
95 2 : return cur
96 2 : }
97 :
98 : // iterPos is an enum indicating the position of the defragmenting iter's
99 : // wrapped iter. The defragmenting iter must look ahead or behind when
100 : // defragmenting forward or backwards respectively, and this enum records that
101 : // current position.
102 : type iterPos int8
103 :
104 : const (
105 : iterPosPrev iterPos = -1
106 : iterPosCurr iterPos = 0
107 : iterPosNext iterPos = +1
108 : )
109 :
110 : // DefragmentingIter wraps a key span iterator, defragmenting physical
111 : // fragmentation during iteration.
112 : //
113 : // During flushes and compactions, keys applied over a span may be split at
114 : // sstable boundaries. This fragmentation can produce internal key bounds that
115 : // do not match any of the bounds ever supplied to a user operation. This
116 : // physical fragmentation is necessary to avoid excessively wide sstables.
117 : //
118 : // The defragmenting iterator undoes this physical fragmentation, joining spans
119 : // with abutting bounds and equal state. The defragmenting iterator takes a
120 : // DefragmentMethod to determine what is "equal state" for a span. The
121 : // DefragmentMethod is a function type, allowing arbitrary comparisons between
122 : // Span keys.
123 : //
124 : // Seeking (SeekGE, SeekLT) poses an obstacle to defragmentation. A seek may
125 : // land on a physical fragment in the middle of several fragments that must be
126 : // defragmented. A seek that lands in a fragment straddling the seek key must
127 : // first degfragment in the opposite direction of iteration to find the
128 : // beginning of the defragmented span, and then defragments in the iteration
129 : // direction, ensuring it's found a whole defragmented span.
130 : type DefragmentingIter struct {
131 : // DefragmentingBuffers holds buffers used for copying iterator state.
132 : *DefragmentingBuffers
133 : comparer *base.Comparer
134 : equal base.Equal
135 : iter FragmentIterator
136 : iterSpan *Span
137 : iterPos iterPos
138 :
139 : // curr holds the span at the current iterator position.
140 : curr Span
141 :
142 : // method is a comparison function for two spans. method is called when two
143 : // spans are abutting to determine whether they may be defragmented.
144 : // method does not itself check for adjacency for the two spans.
145 : method DefragmentMethod
146 :
147 : // reduce is the reducer function used to collect Keys across all spans that
148 : // constitute a defragmented span.
149 : reduce DefragmentReducer
150 : }
151 :
152 : // DefragmentingBuffers holds buffers used for copying iterator state.
153 : type DefragmentingBuffers struct {
154 : // currBuf is a buffer for use when copying user keys for curr. currBuf is
155 : // cleared between positioning methods.
156 : currBuf bytealloc.A
157 : // keysBuf is a buffer for use when copying Keys for DefragmentingIter.curr.
158 : keysBuf []Key
159 : // keyBuf is a buffer specifically for the defragmented start key when
160 : // defragmenting backwards or the defragmented end key when defragmenting
161 : // forwards. These bounds are overwritten repeatedly during defragmentation,
162 : // and the defragmentation routines overwrite keyBuf repeatedly to store
163 : // these extended bounds.
164 : keyBuf []byte
165 : }
166 :
167 : // PrepareForReuse discards any excessively large buffers.
168 2 : func (bufs *DefragmentingBuffers) PrepareForReuse() {
169 2 : if cap(bufs.currBuf) > BufferReuseMaxCapacity {
170 1 : bufs.currBuf = nil
171 1 : }
172 2 : if cap(bufs.keyBuf) > BufferReuseMaxCapacity {
173 0 : bufs.keyBuf = nil
174 0 : }
175 2 : if cap(bufs.keysBuf) > keysReuseMaxCapacity {
176 0 : bufs.keysBuf = nil
177 0 : }
178 : }
179 :
180 : // Assert that *DefragmentingIter implements the FragmentIterator interface.
181 : var _ FragmentIterator = (*DefragmentingIter)(nil)
182 :
183 : // Init initializes the defragmenting iter using the provided defragment
184 : // method.
185 : func (i *DefragmentingIter) Init(
186 : comparer *base.Comparer,
187 : iter FragmentIterator,
188 : equal DefragmentMethod,
189 : reducer DefragmentReducer,
190 : bufs *DefragmentingBuffers,
191 2 : ) {
192 2 : *i = DefragmentingIter{
193 2 : DefragmentingBuffers: bufs,
194 2 : comparer: comparer,
195 2 : equal: comparer.Equal,
196 2 : iter: iter,
197 2 : method: equal,
198 2 : reduce: reducer,
199 2 : }
200 2 : }
201 :
202 : // SetContext is part of the FragmentIterator interface.
203 0 : func (i *DefragmentingIter) SetContext(ctx context.Context) {
204 0 : i.iter.SetContext(ctx)
205 0 : }
206 :
207 : // Close closes the underlying iterators.
208 2 : func (i *DefragmentingIter) Close() {
209 2 : i.iter.Close()
210 2 : }
211 :
212 : // SeekGE moves the iterator to the first span covering a key greater than or
213 : // equal to the given key. This is equivalent to seeking to the first span with
214 : // an end key greater than the given key.
215 2 : func (i *DefragmentingIter) SeekGE(key []byte) (*Span, error) {
216 2 : var err error
217 2 : i.iterSpan, err = i.iter.SeekGE(key)
218 2 : switch {
219 1 : case err != nil:
220 1 : return nil, err
221 2 : case i.iterSpan == nil:
222 2 : i.iterPos = iterPosCurr
223 2 : return nil, nil
224 2 : case i.iterSpan.Empty():
225 2 : i.iterPos = iterPosCurr
226 2 : return i.iterSpan, nil
227 : }
228 : // If the span starts strictly after key, we know there mustn't be an
229 : // earlier span that ends at i.iterSpan.Start, otherwise i.iter would've
230 : // returned that span instead.
231 2 : if i.comparer.Compare(i.iterSpan.Start, key) > 0 {
232 2 : return i.defragmentForward()
233 2 : }
234 :
235 : // The span we landed on has a Start bound ≤ key. There may be additional
236 : // fragments before this span. Defragment backward to find the start of the
237 : // defragmented span.
238 2 : if _, err := i.defragmentBackward(); err != nil {
239 1 : return nil, err
240 1 : }
241 2 : if i.iterPos == iterPosPrev {
242 2 : // Next once back onto the span.
243 2 : var err error
244 2 : i.iterSpan, err = i.iter.Next()
245 2 : if err != nil {
246 0 : return nil, err
247 0 : }
248 : }
249 : // Defragment the full span from its start.
250 2 : return i.defragmentForward()
251 : }
252 :
253 : // SeekLT moves the iterator to the last span covering a key less than the
254 : // given key. This is equivalent to seeking to the last span with a start
255 : // key less than the given key.
256 2 : func (i *DefragmentingIter) SeekLT(key []byte) (*Span, error) {
257 2 : var err error
258 2 : i.iterSpan, err = i.iter.SeekLT(key)
259 2 : switch {
260 1 : case err != nil:
261 1 : return nil, err
262 2 : case i.iterSpan == nil:
263 2 : i.iterPos = iterPosCurr
264 2 : return nil, nil
265 2 : case i.iterSpan.Empty():
266 2 : i.iterPos = iterPosCurr
267 2 : return i.iterSpan, nil
268 : }
269 : // If the span ends strictly before key, we know there mustn't be a later
270 : // span that starts at i.iterSpan.End, otherwise i.iter would've returned
271 : // that span instead.
272 2 : if i.comparer.Compare(i.iterSpan.End, key) < 0 {
273 2 : return i.defragmentBackward()
274 2 : }
275 :
276 : // The span we landed on has a End bound ≥ key. There may be additional
277 : // fragments after this span. Defragment forward to find the end of the
278 : // defragmented span.
279 2 : if _, err := i.defragmentForward(); err != nil {
280 1 : return nil, err
281 1 : }
282 :
283 2 : if i.iterPos == iterPosNext {
284 2 : // Prev once back onto the span.
285 2 : var err error
286 2 : i.iterSpan, err = i.iter.Prev()
287 2 : if err != nil {
288 0 : return nil, err
289 0 : }
290 : }
291 : // Defragment the full span from its end.
292 2 : return i.defragmentBackward()
293 : }
294 :
295 : // First seeks the iterator to the first span and returns it.
296 2 : func (i *DefragmentingIter) First() (*Span, error) {
297 2 : var err error
298 2 : i.iterSpan, err = i.iter.First()
299 2 : switch {
300 1 : case err != nil:
301 1 : return nil, err
302 2 : case i.iterSpan == nil:
303 2 : i.iterPos = iterPosCurr
304 2 : return nil, nil
305 2 : default:
306 2 : return i.defragmentForward()
307 : }
308 : }
309 :
310 : // Last seeks the iterator to the last span and returns it.
311 2 : func (i *DefragmentingIter) Last() (*Span, error) {
312 2 : var err error
313 2 : i.iterSpan, err = i.iter.Last()
314 2 : switch {
315 1 : case err != nil:
316 1 : return nil, err
317 1 : case i.iterSpan == nil:
318 1 : i.iterPos = iterPosCurr
319 1 : return nil, nil
320 2 : default:
321 2 : return i.defragmentBackward()
322 : }
323 : }
324 :
325 : // Next advances to the next span and returns it.
326 2 : func (i *DefragmentingIter) Next() (*Span, error) {
327 2 : switch i.iterPos {
328 2 : case iterPosPrev:
329 2 : // Switching directions; The iterator is currently positioned over the
330 2 : // last span of the previous set of fragments. In the below diagram,
331 2 : // the iterator is positioned over the last span that contributes to
332 2 : // the defragmented x position. We want to be positioned over the first
333 2 : // span that contributes to the z position.
334 2 : //
335 2 : // x x x y y y z z z
336 2 : // ^ ^
337 2 : // old new
338 2 : //
339 2 : // Next once to move onto y, defragment forward to land on the first z
340 2 : // position.
341 2 : var err error
342 2 : i.iterSpan, err = i.iter.Next()
343 2 : if err != nil {
344 1 : return nil, err
345 2 : } else if i.iterSpan == nil {
346 0 : panic("pebble: invariant violation: no next span while switching directions")
347 : }
348 : // We're now positioned on the first span that was defragmented into the
349 : // current iterator position. Skip over the rest of the current iterator
350 : // position's constitutent fragments. In the above example, this would
351 : // land on the first 'z'.
352 2 : if _, err = i.defragmentForward(); err != nil {
353 0 : return nil, err
354 0 : }
355 2 : if i.iterSpan == nil {
356 2 : i.iterPos = iterPosCurr
357 2 : return nil, nil
358 2 : }
359 :
360 : // Now that we're positioned over the first of the next set of
361 : // fragments, defragment forward.
362 2 : return i.defragmentForward()
363 2 : case iterPosCurr:
364 2 : // iterPosCurr is only used when the iter is exhausted or when the iterator
365 2 : // is at an empty span.
366 2 : if invariants.Enabled && i.iterSpan != nil && !i.iterSpan.Empty() {
367 0 : panic("pebble: invariant violation: iterPosCurr with valid iterSpan")
368 : }
369 :
370 2 : var err error
371 2 : i.iterSpan, err = i.iter.Next()
372 2 : if i.iterSpan == nil {
373 2 : // NB: err may be nil or non-nil.
374 2 : return nil, err
375 2 : }
376 2 : return i.defragmentForward()
377 2 : case iterPosNext:
378 2 : // Already at the next span.
379 2 : if i.iterSpan == nil {
380 2 : i.iterPos = iterPosCurr
381 2 : return nil, nil
382 2 : }
383 2 : return i.defragmentForward()
384 0 : default:
385 0 : panic("unreachable")
386 : }
387 : }
388 :
389 : // Prev steps back to the previous span and returns it.
390 2 : func (i *DefragmentingIter) Prev() (*Span, error) {
391 2 : switch i.iterPos {
392 2 : case iterPosPrev:
393 2 : // Already at the previous span.
394 2 : if i.iterSpan == nil {
395 2 : i.iterPos = iterPosCurr
396 2 : return nil, nil
397 2 : }
398 2 : return i.defragmentBackward()
399 2 : case iterPosCurr:
400 2 : // iterPosCurr is only used when the iter is exhausted or when the iterator
401 2 : // is at an empty span.
402 2 : if invariants.Enabled && i.iterSpan != nil && !i.iterSpan.Empty() {
403 0 : panic("pebble: invariant violation: iterPosCurr with valid iterSpan")
404 : }
405 :
406 2 : var err error
407 2 : i.iterSpan, err = i.iter.Prev()
408 2 : if i.iterSpan == nil {
409 2 : // NB: err may be nil or non-nil.
410 2 : return nil, err
411 2 : }
412 2 : return i.defragmentBackward()
413 2 : case iterPosNext:
414 2 : // Switching directions; The iterator is currently positioned over the
415 2 : // first fragment of the next set of fragments. In the below diagram,
416 2 : // the iterator is positioned over the first span that contributes to
417 2 : // the defragmented z position. We want to be positioned over the last
418 2 : // span that contributes to the x position.
419 2 : //
420 2 : // x x x y y y z z z
421 2 : // ^ ^
422 2 : // new old
423 2 : //
424 2 : // Prev once to move onto y, defragment backward to land on the last x
425 2 : // position.
426 2 : var err error
427 2 : i.iterSpan, err = i.iter.Prev()
428 2 : if err != nil {
429 1 : return nil, err
430 2 : } else if i.iterSpan == nil {
431 0 : panic("pebble: invariant violation: no previous span while switching directions")
432 : }
433 : // We're now positioned on the last span that was defragmented into the
434 : // current iterator position. Skip over the rest of the current iterator
435 : // position's constitutent fragments. In the above example, this would
436 : // land on the last 'x'.
437 2 : if _, err = i.defragmentBackward(); err != nil {
438 0 : return nil, err
439 0 : }
440 :
441 : // Now that we're positioned over the last of the prev set of
442 : // fragments, defragment backward.
443 2 : if i.iterSpan == nil {
444 2 : i.iterPos = iterPosCurr
445 2 : return nil, nil
446 2 : }
447 2 : return i.defragmentBackward()
448 0 : default:
449 0 : panic("unreachable")
450 : }
451 : }
452 :
453 : // checkEqual checks the two spans for logical equivalence. It uses the passed-in
454 : // DefragmentMethod and ensures both spans are NOT empty; not defragmenting empty
455 : // spans is an optimization that lets us load fewer sstable blocks.
456 2 : func (i *DefragmentingIter) checkEqual(left, right *Span) bool {
457 2 : return (!left.Empty() && !right.Empty()) && i.method.ShouldDefragment(i.comparer.CompareSuffixes, i.iterSpan, &i.curr)
458 2 : }
459 :
460 : // defragmentForward defragments spans in the forward direction, starting from
461 : // i.iter's current position. The span at the current position must be non-nil,
462 : // but may be Empty().
463 2 : func (i *DefragmentingIter) defragmentForward() (*Span, error) {
464 2 : if i.iterSpan.Empty() {
465 2 : // An empty span will never be equal to another span; see checkEqual for
466 2 : // why. To avoid loading non-empty range keys further ahead by calling Next,
467 2 : // return early.
468 2 : i.iterPos = iterPosCurr
469 2 : return i.iterSpan, nil
470 2 : }
471 2 : i.saveCurrent()
472 2 :
473 2 : var err error
474 2 : i.iterPos = iterPosNext
475 2 : i.iterSpan, err = i.iter.Next()
476 2 : for i.iterSpan != nil {
477 2 : if !i.equal(i.curr.End, i.iterSpan.Start) {
478 2 : // Not a continuation.
479 2 : break
480 : }
481 2 : if !i.checkEqual(i.iterSpan, &i.curr) {
482 2 : // Not a continuation.
483 2 : break
484 : }
485 2 : i.keyBuf = append(i.keyBuf[:0], i.iterSpan.End...)
486 2 : i.curr.End = i.keyBuf
487 2 : i.keysBuf = i.reduce(i.keysBuf, i.iterSpan.Keys)
488 2 : i.iterSpan, err = i.iter.Next()
489 : }
490 : // i.iterSpan == nil
491 2 : if err != nil {
492 1 : return nil, err
493 1 : }
494 2 : i.curr.Keys = i.keysBuf
495 2 : return &i.curr, nil
496 : }
497 :
498 : // defragmentBackward defragments spans in the backward direction, starting from
499 : // i.iter's current position. The span at the current position must be non-nil,
500 : // but may be Empty().
501 2 : func (i *DefragmentingIter) defragmentBackward() (*Span, error) {
502 2 : if i.iterSpan.Empty() {
503 2 : // An empty span will never be equal to another span; see checkEqual for
504 2 : // why. To avoid loading non-empty range keys further ahead by calling Next,
505 2 : // return early.
506 2 : i.iterPos = iterPosCurr
507 2 : return i.iterSpan, nil
508 2 : }
509 2 : i.saveCurrent()
510 2 :
511 2 : var err error
512 2 : i.iterPos = iterPosPrev
513 2 : i.iterSpan, err = i.iter.Prev()
514 2 : for i.iterSpan != nil {
515 2 : if !i.equal(i.curr.Start, i.iterSpan.End) {
516 2 : // Not a continuation.
517 2 : break
518 : }
519 2 : if !i.checkEqual(i.iterSpan, &i.curr) {
520 2 : // Not a continuation.
521 2 : break
522 : }
523 2 : i.keyBuf = append(i.keyBuf[:0], i.iterSpan.Start...)
524 2 : i.curr.Start = i.keyBuf
525 2 : i.keysBuf = i.reduce(i.keysBuf, i.iterSpan.Keys)
526 2 : i.iterSpan, err = i.iter.Prev()
527 : }
528 : // i.iterSpan == nil
529 2 : if err != nil {
530 1 : return nil, err
531 1 : }
532 2 : i.curr.Keys = i.keysBuf
533 2 : return &i.curr, nil
534 : }
535 :
536 2 : func (i *DefragmentingIter) saveCurrent() {
537 2 : i.currBuf.Reset()
538 2 : i.keysBuf = i.keysBuf[:0]
539 2 : i.keyBuf = i.keyBuf[:0]
540 2 : if i.iterSpan == nil {
541 0 : return
542 0 : }
543 2 : i.curr = Span{
544 2 : Start: i.saveBytes(i.iterSpan.Start),
545 2 : End: i.saveBytes(i.iterSpan.End),
546 2 : KeysOrder: i.iterSpan.KeysOrder,
547 2 : }
548 2 : for j := range i.iterSpan.Keys {
549 2 : i.keysBuf = append(i.keysBuf, Key{
550 2 : Trailer: i.iterSpan.Keys[j].Trailer,
551 2 : Suffix: i.saveBytes(i.iterSpan.Keys[j].Suffix),
552 2 : Value: i.saveBytes(i.iterSpan.Keys[j].Value),
553 2 : })
554 2 : }
555 2 : i.curr.Keys = i.keysBuf
556 : }
557 :
558 2 : func (i *DefragmentingIter) saveBytes(b []byte) []byte {
559 2 : if b == nil {
560 2 : return nil
561 2 : }
562 2 : i.currBuf, b = i.currBuf.Copy(b)
563 2 : return b
564 : }
565 :
566 : // WrapChildren implements FragmentIterator.
567 0 : func (i *DefragmentingIter) WrapChildren(wrap WrapFn) {
568 0 : i.iter = wrap(i.iter)
569 0 : }
570 :
571 : // DebugTree is part of the FragmentIterator interface.
572 0 : func (i *DefragmentingIter) DebugTree(tp treeprinter.Node) {
573 0 : n := tp.Childf("%T(%p)", i, i)
574 0 : if i.iter != nil {
575 0 : i.iter.DebugTree(n)
576 0 : }
577 : }
|