Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package keyspan
6 :
7 : import (
8 : "bytes"
9 :
10 : "github.com/cockroachdb/pebble/internal/base"
11 : "github.com/cockroachdb/pebble/internal/bytealloc"
12 : "github.com/cockroachdb/pebble/internal/invariants"
13 : )
14 :
15 : // bufferReuseMaxCapacity is the maximum capacity of a DefragmentingIter buffer
16 : // that DefragmentingIter will reuse. Buffers larger than this will be
17 : // discarded and reallocated as necessary.
18 : const bufferReuseMaxCapacity = 10 << 10 // 10 KB
19 :
20 : // keysReuseMaxCapacity is the maximum capacity of a []keyspan.Key buffer that
21 : // DefragmentingIter will reuse. Buffers larger than this will be discarded and
22 : // reallocated as necessary.
23 : const keysReuseMaxCapacity = 100
24 :
25 : // DefragmentMethod configures the defragmentation performed by the
26 : // DefragmentingIter.
27 : type DefragmentMethod interface {
28 : // ShouldDefragment takes two abutting spans and returns whether the two
29 : // spans should be combined into a single, defragmented Span.
30 : ShouldDefragment(equal base.Equal, left, right *Span) bool
31 : }
32 :
33 : // The DefragmentMethodFunc type is an adapter to allow the use of ordinary
34 : // functions as DefragmentMethods. If f is a function with the appropriate
35 : // signature, DefragmentMethodFunc(f) is a DefragmentMethod that calls f.
36 : type DefragmentMethodFunc func(equal base.Equal, left, right *Span) bool
37 :
38 : // ShouldDefragment calls f(equal, left, right).
39 1 : func (f DefragmentMethodFunc) ShouldDefragment(equal base.Equal, left, right *Span) bool {
40 1 : return f(equal, left, right)
41 1 : }
42 :
43 : // DefragmentInternal configures a DefragmentingIter to defragment spans
44 : // only if they have identical keys. It requires spans' keys to be sorted in
45 : // trailer descending order.
46 : //
47 : // This defragmenting method is intended for use in compactions that may see
48 : // internal range keys fragments that may now be joined, because the state that
49 : // required their fragmentation has been dropped.
50 1 : var DefragmentInternal DefragmentMethod = DefragmentMethodFunc(func(equal base.Equal, a, b *Span) bool {
51 1 : if a.KeysOrder != ByTrailerDesc || b.KeysOrder != ByTrailerDesc {
52 0 : panic("pebble: span keys unexpectedly not in trailer descending order")
53 : }
54 1 : if len(a.Keys) != len(b.Keys) {
55 1 : return false
56 1 : }
57 1 : for i := range a.Keys {
58 1 : if a.Keys[i].Trailer != b.Keys[i].Trailer {
59 1 : return false
60 1 : }
61 1 : if !equal(a.Keys[i].Suffix, b.Keys[i].Suffix) {
62 0 : return false
63 0 : }
64 1 : if !bytes.Equal(a.Keys[i].Value, b.Keys[i].Value) {
65 0 : return false
66 0 : }
67 : }
68 1 : return true
69 : })
70 :
71 : // DefragmentReducer merges the current and next Key slices, returning a new Key
72 : // slice.
73 : //
74 : // Implementations should modify and return `cur` to save on allocations, or
75 : // consider allocating a new slice, as the `cur` slice may be retained by the
76 : // DefragmentingIter and mutated. The `next` slice must not be mutated.
77 : //
78 : // The incoming slices are sorted by (SeqNum, Kind) descending. The output slice
79 : // must also have this sort order.
80 : type DefragmentReducer func(cur, next []Key) []Key
81 :
82 : // StaticDefragmentReducer is a no-op DefragmentReducer that simply returns the
83 : // current key slice, effectively retaining the first set of keys encountered
84 : // for a defragmented span.
85 : //
86 : // This reducer can be used, for example, when the set of Keys for each Span
87 : // being reduced is not expected to change, and therefore the keys from the
88 : // first span encountered can be used without considering keys in subsequent
89 : // spans.
90 1 : var StaticDefragmentReducer DefragmentReducer = func(cur, _ []Key) []Key {
91 1 : return cur
92 1 : }
93 :
94 : // iterPos is an enum indicating the position of the defragmenting iter's
95 : // wrapped iter. The defragmenting iter must look ahead or behind when
96 : // defragmenting forward or backwards respectively, and this enum records that
97 : // current position.
98 : type iterPos int8
99 :
100 : const (
101 : iterPosPrev iterPos = -1
102 : iterPosCurr iterPos = 0
103 : iterPosNext iterPos = +1
104 : )
105 :
106 : // DefragmentingIter wraps a key span iterator, defragmenting physical
107 : // fragmentation during iteration.
108 : //
109 : // During flushes and compactions, keys applied over a span may be split at
110 : // sstable boundaries. This fragmentation can produce internal key bounds that
111 : // do not match any of the bounds ever supplied to a user operation. This
112 : // physical fragmentation is necessary to avoid excessively wide sstables.
113 : //
114 : // The defragmenting iterator undoes this physical fragmentation, joining spans
115 : // with abutting bounds and equal state. The defragmenting iterator takes a
116 : // DefragmentMethod to determine what is "equal state" for a span. The
117 : // DefragmentMethod is a function type, allowing arbitrary comparisons between
118 : // Span keys.
119 : //
120 : // Seeking (SeekGE, SeekLT) poses an obstacle to defragmentation. A seek may
121 : // land on a physical fragment in the middle of several fragments that must be
122 : // defragmented. A seek that lands in a fragment straddling the seek key must
123 : // first degfragment in the opposite direction of iteration to find the
124 : // beginning of the defragmented span, and then defragments in the iteration
125 : // direction, ensuring it's found a whole defragmented span.
126 : type DefragmentingIter struct {
127 : // DefragmentingBuffers holds buffers used for copying iterator state.
128 : *DefragmentingBuffers
129 : comparer *base.Comparer
130 : equal base.Equal
131 : iter FragmentIterator
132 : iterSpan *Span
133 : iterPos iterPos
134 :
135 : // curr holds the span at the current iterator position.
136 : curr Span
137 :
138 : // method is a comparison function for two spans. method is called when two
139 : // spans are abutting to determine whether they may be defragmented.
140 : // method does not itself check for adjacency for the two spans.
141 : method DefragmentMethod
142 :
143 : // reduce is the reducer function used to collect Keys across all spans that
144 : // constitute a defragmented span.
145 : reduce DefragmentReducer
146 : }
147 :
148 : // DefragmentingBuffers holds buffers used for copying iterator state.
149 : type DefragmentingBuffers struct {
150 : // currBuf is a buffer for use when copying user keys for curr. currBuf is
151 : // cleared between positioning methods.
152 : currBuf bytealloc.A
153 : // keysBuf is a buffer for use when copying Keys for DefragmentingIter.curr.
154 : keysBuf []Key
155 : // keyBuf is a buffer specifically for the defragmented start key when
156 : // defragmenting backwards or the defragmented end key when defragmenting
157 : // forwards. These bounds are overwritten repeatedly during defragmentation,
158 : // and the defragmentation routines overwrite keyBuf repeatedly to store
159 : // these extended bounds.
160 : keyBuf []byte
161 : }
162 :
163 : // PrepareForReuse discards any excessively large buffers.
164 1 : func (bufs *DefragmentingBuffers) PrepareForReuse() {
165 1 : if cap(bufs.currBuf) > bufferReuseMaxCapacity {
166 1 : bufs.currBuf = nil
167 1 : }
168 1 : if cap(bufs.keyBuf) > bufferReuseMaxCapacity {
169 0 : bufs.keyBuf = nil
170 0 : }
171 1 : if cap(bufs.keysBuf) > keysReuseMaxCapacity {
172 0 : bufs.keysBuf = nil
173 0 : }
174 : }
175 :
176 : // Assert that *DefragmentingIter implements the FragmentIterator interface.
177 : var _ FragmentIterator = (*DefragmentingIter)(nil)
178 :
179 : // Init initializes the defragmenting iter using the provided defragment
180 : // method.
181 : func (i *DefragmentingIter) Init(
182 : comparer *base.Comparer,
183 : iter FragmentIterator,
184 : equal DefragmentMethod,
185 : reducer DefragmentReducer,
186 : bufs *DefragmentingBuffers,
187 1 : ) {
188 1 : *i = DefragmentingIter{
189 1 : DefragmentingBuffers: bufs,
190 1 : comparer: comparer,
191 1 : equal: comparer.Equal,
192 1 : iter: iter,
193 1 : method: equal,
194 1 : reduce: reducer,
195 1 : }
196 1 : }
197 :
198 : // Close closes the underlying iterators.
199 1 : func (i *DefragmentingIter) Close() error {
200 1 : return i.iter.Close()
201 1 : }
202 :
203 : // SeekGE moves the iterator to the first span covering a key greater than or
204 : // equal to the given key. This is equivalent to seeking to the first span with
205 : // an end key greater than the given key.
206 1 : func (i *DefragmentingIter) SeekGE(key []byte) (*Span, error) {
207 1 : var err error
208 1 : i.iterSpan, err = i.iter.SeekGE(key)
209 1 : switch {
210 1 : case err != nil:
211 1 : return nil, err
212 1 : case i.iterSpan == nil:
213 1 : i.iterPos = iterPosCurr
214 1 : return nil, nil
215 1 : case i.iterSpan.Empty():
216 1 : i.iterPos = iterPosCurr
217 1 : return i.iterSpan, nil
218 : }
219 : // If the span starts strictly after key, we know there mustn't be an
220 : // earlier span that ends at i.iterSpan.Start, otherwise i.iter would've
221 : // returned that span instead.
222 1 : if i.comparer.Compare(i.iterSpan.Start, key) > 0 {
223 1 : return i.defragmentForward()
224 1 : }
225 :
226 : // The span we landed on has a Start bound ≤ key. There may be additional
227 : // fragments before this span. Defragment backward to find the start of the
228 : // defragmented span.
229 1 : if _, err := i.defragmentBackward(); err != nil {
230 1 : return nil, err
231 1 : }
232 1 : if i.iterPos == iterPosPrev {
233 1 : // Next once back onto the span.
234 1 : var err error
235 1 : i.iterSpan, err = i.iter.Next()
236 1 : if err != nil {
237 0 : return nil, err
238 0 : }
239 : }
240 : // Defragment the full span from its start.
241 1 : return i.defragmentForward()
242 : }
243 :
244 : // SeekLT moves the iterator to the last span covering a key less than the
245 : // given key. This is equivalent to seeking to the last span with a start
246 : // key less than the given key.
247 1 : func (i *DefragmentingIter) SeekLT(key []byte) (*Span, error) {
248 1 : var err error
249 1 : i.iterSpan, err = i.iter.SeekLT(key)
250 1 : switch {
251 1 : case err != nil:
252 1 : return nil, err
253 1 : case i.iterSpan == nil:
254 1 : i.iterPos = iterPosCurr
255 1 : return nil, nil
256 1 : case i.iterSpan.Empty():
257 1 : i.iterPos = iterPosCurr
258 1 : return i.iterSpan, nil
259 : }
260 : // If the span ends strictly before key, we know there mustn't be a later
261 : // span that starts at i.iterSpan.End, otherwise i.iter would've returned
262 : // that span instead.
263 1 : if i.comparer.Compare(i.iterSpan.End, key) < 0 {
264 1 : return i.defragmentBackward()
265 1 : }
266 :
267 : // The span we landed on has a End bound ≥ key. There may be additional
268 : // fragments after this span. Defragment forward to find the end of the
269 : // defragmented span.
270 1 : if _, err := i.defragmentForward(); err != nil {
271 1 : return nil, err
272 1 : }
273 :
274 1 : if i.iterPos == iterPosNext {
275 1 : // Prev once back onto the span.
276 1 : var err error
277 1 : i.iterSpan, err = i.iter.Prev()
278 1 : if err != nil {
279 0 : return nil, err
280 0 : }
281 : }
282 : // Defragment the full span from its end.
283 1 : return i.defragmentBackward()
284 : }
285 :
286 : // First seeks the iterator to the first span and returns it.
287 1 : func (i *DefragmentingIter) First() (*Span, error) {
288 1 : var err error
289 1 : i.iterSpan, err = i.iter.First()
290 1 : switch {
291 1 : case err != nil:
292 1 : return nil, err
293 1 : case i.iterSpan == nil:
294 1 : i.iterPos = iterPosCurr
295 1 : return nil, nil
296 1 : default:
297 1 : return i.defragmentForward()
298 : }
299 : }
300 :
301 : // Last seeks the iterator to the last span and returns it.
302 1 : func (i *DefragmentingIter) Last() (*Span, error) {
303 1 : var err error
304 1 : i.iterSpan, err = i.iter.Last()
305 1 : switch {
306 1 : case err != nil:
307 1 : return nil, err
308 1 : case i.iterSpan == nil:
309 1 : i.iterPos = iterPosCurr
310 1 : return nil, nil
311 1 : default:
312 1 : return i.defragmentBackward()
313 : }
314 : }
315 :
316 : // Next advances to the next span and returns it.
317 1 : func (i *DefragmentingIter) Next() (*Span, error) {
318 1 : switch i.iterPos {
319 1 : case iterPosPrev:
320 1 : // Switching directions; The iterator is currently positioned over the
321 1 : // last span of the previous set of fragments. In the below diagram,
322 1 : // the iterator is positioned over the last span that contributes to
323 1 : // the defragmented x position. We want to be positioned over the first
324 1 : // span that contributes to the z position.
325 1 : //
326 1 : // x x x y y y z z z
327 1 : // ^ ^
328 1 : // old new
329 1 : //
330 1 : // Next once to move onto y, defragment forward to land on the first z
331 1 : // position.
332 1 : var err error
333 1 : i.iterSpan, err = i.iter.Next()
334 1 : if err != nil {
335 1 : return nil, err
336 1 : } else if i.iterSpan == nil {
337 0 : panic("pebble: invariant violation: no next span while switching directions")
338 : }
339 : // We're now positioned on the first span that was defragmented into the
340 : // current iterator position. Skip over the rest of the current iterator
341 : // position's constitutent fragments. In the above example, this would
342 : // land on the first 'z'.
343 1 : if _, err = i.defragmentForward(); err != nil {
344 0 : return nil, err
345 0 : }
346 1 : if i.iterSpan == nil {
347 1 : i.iterPos = iterPosCurr
348 1 : return nil, nil
349 1 : }
350 :
351 : // Now that we're positioned over the first of the next set of
352 : // fragments, defragment forward.
353 1 : return i.defragmentForward()
354 1 : case iterPosCurr:
355 1 : // iterPosCurr is only used when the iter is exhausted or when the iterator
356 1 : // is at an empty span.
357 1 : if invariants.Enabled && i.iterSpan != nil && !i.iterSpan.Empty() {
358 0 : panic("pebble: invariant violation: iterPosCurr with valid iterSpan")
359 : }
360 :
361 1 : var err error
362 1 : i.iterSpan, err = i.iter.Next()
363 1 : if i.iterSpan == nil {
364 1 : // NB: err may be nil or non-nil.
365 1 : return nil, err
366 1 : }
367 1 : return i.defragmentForward()
368 1 : case iterPosNext:
369 1 : // Already at the next span.
370 1 : if i.iterSpan == nil {
371 1 : i.iterPos = iterPosCurr
372 1 : return nil, nil
373 1 : }
374 1 : return i.defragmentForward()
375 0 : default:
376 0 : panic("unreachable")
377 : }
378 : }
379 :
380 : // Prev steps back to the previous span and returns it.
381 1 : func (i *DefragmentingIter) Prev() (*Span, error) {
382 1 : switch i.iterPos {
383 1 : case iterPosPrev:
384 1 : // Already at the previous span.
385 1 : if i.iterSpan == nil {
386 1 : i.iterPos = iterPosCurr
387 1 : return nil, nil
388 1 : }
389 1 : return i.defragmentBackward()
390 1 : case iterPosCurr:
391 1 : // iterPosCurr is only used when the iter is exhausted or when the iterator
392 1 : // is at an empty span.
393 1 : if invariants.Enabled && i.iterSpan != nil && !i.iterSpan.Empty() {
394 0 : panic("pebble: invariant violation: iterPosCurr with valid iterSpan")
395 : }
396 :
397 1 : var err error
398 1 : i.iterSpan, err = i.iter.Prev()
399 1 : if i.iterSpan == nil {
400 1 : // NB: err may be nil or non-nil.
401 1 : return nil, err
402 1 : }
403 1 : return i.defragmentBackward()
404 1 : case iterPosNext:
405 1 : // Switching directions; The iterator is currently positioned over the
406 1 : // first fragment of the next set of fragments. In the below diagram,
407 1 : // the iterator is positioned over the first span that contributes to
408 1 : // the defragmented z position. We want to be positioned over the last
409 1 : // span that contributes to the x position.
410 1 : //
411 1 : // x x x y y y z z z
412 1 : // ^ ^
413 1 : // new old
414 1 : //
415 1 : // Prev once to move onto y, defragment backward to land on the last x
416 1 : // position.
417 1 : var err error
418 1 : i.iterSpan, err = i.iter.Prev()
419 1 : if err != nil {
420 1 : return nil, err
421 1 : } else if i.iterSpan == nil {
422 0 : panic("pebble: invariant violation: no previous span while switching directions")
423 : }
424 : // We're now positioned on the last span that was defragmented into the
425 : // current iterator position. Skip over the rest of the current iterator
426 : // position's constitutent fragments. In the above example, this would
427 : // land on the last 'x'.
428 1 : if _, err = i.defragmentBackward(); err != nil {
429 0 : return nil, err
430 0 : }
431 :
432 : // Now that we're positioned over the last of the prev set of
433 : // fragments, defragment backward.
434 1 : if i.iterSpan == nil {
435 1 : i.iterPos = iterPosCurr
436 1 : return nil, nil
437 1 : }
438 1 : return i.defragmentBackward()
439 0 : default:
440 0 : panic("unreachable")
441 : }
442 : }
443 :
444 : // checkEqual checks the two spans for logical equivalence. It uses the passed-in
445 : // DefragmentMethod and ensures both spans are NOT empty; not defragmenting empty
446 : // spans is an optimization that lets us load fewer sstable blocks.
447 1 : func (i *DefragmentingIter) checkEqual(left, right *Span) bool {
448 1 : return (!left.Empty() && !right.Empty()) && i.method.ShouldDefragment(i.equal, i.iterSpan, &i.curr)
449 1 : }
450 :
451 : // defragmentForward defragments spans in the forward direction, starting from
452 : // i.iter's current position. The span at the current position must be non-nil,
453 : // but may be Empty().
454 1 : func (i *DefragmentingIter) defragmentForward() (*Span, error) {
455 1 : if i.iterSpan.Empty() {
456 1 : // An empty span will never be equal to another span; see checkEqual for
457 1 : // why. To avoid loading non-empty range keys further ahead by calling Next,
458 1 : // return early.
459 1 : i.iterPos = iterPosCurr
460 1 : return i.iterSpan, nil
461 1 : }
462 1 : i.saveCurrent()
463 1 :
464 1 : var err error
465 1 : i.iterPos = iterPosNext
466 1 : i.iterSpan, err = i.iter.Next()
467 1 : for i.iterSpan != nil {
468 1 : if !i.equal(i.curr.End, i.iterSpan.Start) {
469 1 : // Not a continuation.
470 1 : break
471 : }
472 1 : if !i.checkEqual(i.iterSpan, &i.curr) {
473 1 : // Not a continuation.
474 1 : break
475 : }
476 1 : i.keyBuf = append(i.keyBuf[:0], i.iterSpan.End...)
477 1 : i.curr.End = i.keyBuf
478 1 : i.keysBuf = i.reduce(i.keysBuf, i.iterSpan.Keys)
479 1 : i.iterSpan, err = i.iter.Next()
480 : }
481 : // i.iterSpan == nil
482 1 : if err != nil {
483 1 : return nil, err
484 1 : }
485 1 : i.curr.Keys = i.keysBuf
486 1 : return &i.curr, nil
487 : }
488 :
489 : // defragmentBackward defragments spans in the backward direction, starting from
490 : // i.iter's current position. The span at the current position must be non-nil,
491 : // but may be Empty().
492 1 : func (i *DefragmentingIter) defragmentBackward() (*Span, error) {
493 1 : if i.iterSpan.Empty() {
494 1 : // An empty span will never be equal to another span; see checkEqual for
495 1 : // why. To avoid loading non-empty range keys further ahead by calling Next,
496 1 : // return early.
497 1 : i.iterPos = iterPosCurr
498 1 : return i.iterSpan, nil
499 1 : }
500 1 : i.saveCurrent()
501 1 :
502 1 : var err error
503 1 : i.iterPos = iterPosPrev
504 1 : i.iterSpan, err = i.iter.Prev()
505 1 : for i.iterSpan != nil {
506 1 : if !i.equal(i.curr.Start, i.iterSpan.End) {
507 1 : // Not a continuation.
508 1 : break
509 : }
510 1 : if !i.checkEqual(i.iterSpan, &i.curr) {
511 1 : // Not a continuation.
512 1 : break
513 : }
514 1 : i.keyBuf = append(i.keyBuf[:0], i.iterSpan.Start...)
515 1 : i.curr.Start = i.keyBuf
516 1 : i.keysBuf = i.reduce(i.keysBuf, i.iterSpan.Keys)
517 1 : i.iterSpan, err = i.iter.Prev()
518 : }
519 : // i.iterSpan == nil
520 1 : if err != nil {
521 1 : return nil, err
522 1 : }
523 1 : i.curr.Keys = i.keysBuf
524 1 : return &i.curr, nil
525 : }
526 :
527 1 : func (i *DefragmentingIter) saveCurrent() {
528 1 : i.currBuf.Reset()
529 1 : i.keysBuf = i.keysBuf[:0]
530 1 : i.keyBuf = i.keyBuf[:0]
531 1 : if i.iterSpan == nil {
532 0 : return
533 0 : }
534 1 : i.curr = Span{
535 1 : Start: i.saveBytes(i.iterSpan.Start),
536 1 : End: i.saveBytes(i.iterSpan.End),
537 1 : KeysOrder: i.iterSpan.KeysOrder,
538 1 : }
539 1 : for j := range i.iterSpan.Keys {
540 1 : i.keysBuf = append(i.keysBuf, Key{
541 1 : Trailer: i.iterSpan.Keys[j].Trailer,
542 1 : Suffix: i.saveBytes(i.iterSpan.Keys[j].Suffix),
543 1 : Value: i.saveBytes(i.iterSpan.Keys[j].Value),
544 1 : })
545 1 : }
546 1 : i.curr.Keys = i.keysBuf
547 : }
548 :
549 1 : func (i *DefragmentingIter) saveBytes(b []byte) []byte {
550 1 : if b == nil {
551 1 : return nil
552 1 : }
553 1 : i.currBuf, b = i.currBuf.Copy(b)
554 1 : return b
555 : }
556 :
557 : // WrapChildren implements FragmentIterator.
558 0 : func (i *DefragmentingIter) WrapChildren(wrap WrapFn) {
559 0 : i.iter = wrap(i.iter)
560 0 : }
|