LCOV - code coverage report
Current view: top level - pebble/internal/keyspan - defragment.go (source / functions) Hit Total Coverage
Test: 2023-10-18 08:17Z 5807b591 - tests + meta.lcov Lines: 248 267 92.9 %
Date: 2023-10-18 08:18:06 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package keyspan
       6             : 
       7             : import (
       8             :         "bytes"
       9             : 
      10             :         "github.com/cockroachdb/pebble/internal/base"
      11             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      12             :         "github.com/cockroachdb/pebble/internal/invariants"
      13             : )
      14             : 
      15             : // bufferReuseMaxCapacity is the maximum capacity of a DefragmentingIter buffer
      16             : // that DefragmentingIter will reuse. Buffers larger than this will be
      17             : // discarded and reallocated as necessary.
      18             : const bufferReuseMaxCapacity = 10 << 10 // 10 KB
      19             : 
      20             : // keysReuseMaxCapacity is the maximum capacity of a []keyspan.Key buffer that
      21             : // DefragmentingIter will reuse. Buffers larger than this will be discarded and
      22             : // reallocated as necessary.
      23             : const keysReuseMaxCapacity = 100
      24             : 
      25             : // DefragmentMethod configures the defragmentation performed by the
      26             : // DefragmentingIter.
      27             : type DefragmentMethod interface {
      28             :         // ShouldDefragment takes two abutting spans and returns whether the two
      29             :         // spans should be combined into a single, defragmented Span.
      30             :         ShouldDefragment(equal base.Equal, left, right *Span) bool
      31             : }
      32             : 
      33             : // The DefragmentMethodFunc type is an adapter to allow the use of ordinary
      34             : // functions as DefragmentMethods. If f is a function with the appropriate
      35             : // signature, DefragmentMethodFunc(f) is a DefragmentMethod that calls f.
      36             : type DefragmentMethodFunc func(equal base.Equal, left, right *Span) bool
      37             : 
      38             : // ShouldDefragment calls f(equal, left, right).
      39           2 : func (f DefragmentMethodFunc) ShouldDefragment(equal base.Equal, left, right *Span) bool {
      40           2 :         return f(equal, left, right)
      41           2 : }
      42             : 
      43             : // DefragmentInternal configures a DefragmentingIter to defragment spans
      44             : // only if they have identical keys. It requires spans' keys to be sorted in
      45             : // trailer descending order.
      46             : //
      47             : // This defragmenting method is intended for use in compactions that may see
      48             : // internal range keys fragments that may now be joined, because the state that
      49             : // required their fragmentation has been dropped.
      50           2 : var DefragmentInternal DefragmentMethod = DefragmentMethodFunc(func(equal base.Equal, a, b *Span) bool {
      51           2 :         if a.KeysOrder != ByTrailerDesc || b.KeysOrder != ByTrailerDesc {
      52           0 :                 panic("pebble: span keys unexpectedly not in trailer descending order")
      53             :         }
      54           2 :         if len(a.Keys) != len(b.Keys) {
      55           2 :                 return false
      56           2 :         }
      57           2 :         for i := range a.Keys {
      58           2 :                 if a.Keys[i].Trailer != b.Keys[i].Trailer {
      59           2 :                         return false
      60           2 :                 }
      61           2 :                 if !equal(a.Keys[i].Suffix, b.Keys[i].Suffix) {
      62           0 :                         return false
      63           0 :                 }
      64           2 :                 if !bytes.Equal(a.Keys[i].Value, b.Keys[i].Value) {
      65           0 :                         return false
      66           0 :                 }
      67             :         }
      68           2 :         return true
      69             : })
      70             : 
      71             : // DefragmentReducer merges the current and next Key slices, returning a new Key
      72             : // slice.
      73             : //
      74             : // Implementations should modify and return `cur` to save on allocations, or
      75             : // consider allocating a new slice, as the `cur` slice may be retained by the
      76             : // DefragmentingIter and mutated. The `next` slice must not be mutated.
      77             : //
      78             : // The incoming slices are sorted by (SeqNum, Kind) descending. The output slice
      79             : // must also have this sort order.
      80             : type DefragmentReducer func(cur, next []Key) []Key
      81             : 
      82             : // StaticDefragmentReducer is a no-op DefragmentReducer that simply returns the
      83             : // current key slice, effectively retaining the first set of keys encountered
      84             : // for a defragmented span.
      85             : //
      86             : // This reducer can be used, for example, when the set of Keys for each Span
      87             : // being reduced is not expected to change, and therefore the keys from the
      88             : // first span encountered can be used without considering keys in subsequent
      89             : // spans.
      90           2 : var StaticDefragmentReducer DefragmentReducer = func(cur, _ []Key) []Key {
      91           2 :         return cur
      92           2 : }
      93             : 
      94             : // iterPos is an enum indicating the position of the defragmenting iter's
      95             : // wrapped iter. The defragmenting iter must look ahead or behind when
      96             : // defragmenting forward or backwards respectively, and this enum records that
      97             : // current position.
      98             : type iterPos int8
      99             : 
     100             : const (
     101             :         iterPosPrev iterPos = -1
     102             :         iterPosCurr iterPos = 0
     103             :         iterPosNext iterPos = +1
     104             : )
     105             : 
     106             : // DefragmentingIter wraps a key span iterator, defragmenting physical
     107             : // fragmentation during iteration.
     108             : //
     109             : // During flushes and compactions, keys applied over a span may be split at
     110             : // sstable boundaries. This fragmentation can produce internal key bounds that
     111             : // do not match any of the bounds ever supplied to a user operation. This
     112             : // physical fragmentation is necessary to avoid excessively wide sstables.
     113             : //
     114             : // The defragmenting iterator undoes this physical fragmentation, joining spans
     115             : // with abutting bounds and equal state. The defragmenting iterator takes a
     116             : // DefragmentMethod to determine what is "equal state" for a span. The
     117             : // DefragmentMethod is a function type, allowing arbitrary comparisons between
     118             : // Span keys.
     119             : //
     120             : // Seeking (SeekGE, SeekLT) poses an obstacle to defragmentation. A seek may
     121             : // land on a physical fragment in the middle of several fragments that must be
     122             : // defragmented. A seek that lands in a fragment straddling the seek key must
     123             : // first degfragment in the opposite direction of iteration to find the
     124             : // beginning of the defragmented span, and then defragments in the iteration
     125             : // direction, ensuring it's found a whole defragmented span.
     126             : type DefragmentingIter struct {
     127             :         // DefragmentingBuffers holds buffers used for copying iterator state.
     128             :         *DefragmentingBuffers
     129             :         comparer *base.Comparer
     130             :         equal    base.Equal
     131             :         iter     FragmentIterator
     132             :         iterSpan *Span
     133             :         iterPos  iterPos
     134             : 
     135             :         // curr holds the span at the current iterator position.
     136             :         curr Span
     137             : 
     138             :         // method is a comparison function for two spans. method is called when two
     139             :         // spans are abutting to determine whether they may be defragmented.
     140             :         // method does not itself check for adjacency for the two spans.
     141             :         method DefragmentMethod
     142             : 
     143             :         // reduce is the reducer function used to collect Keys across all spans that
     144             :         // constitute a defragmented span.
     145             :         reduce DefragmentReducer
     146             : }
     147             : 
     148             : // DefragmentingBuffers holds buffers used for copying iterator state.
     149             : type DefragmentingBuffers struct {
     150             :         // currBuf is a buffer for use when copying user keys for curr. currBuf is
     151             :         // cleared between positioning methods.
     152             :         currBuf bytealloc.A
     153             :         // keysBuf is a buffer for use when copying Keys for DefragmentingIter.curr.
     154             :         keysBuf []Key
     155             :         // keyBuf is a buffer specifically for the defragmented start key when
     156             :         // defragmenting backwards or the defragmented end key when defragmenting
     157             :         // forwards. These bounds are overwritten repeatedly during defragmentation,
     158             :         // and the defragmentation routines overwrite keyBuf repeatedly to store
     159             :         // these extended bounds.
     160             :         keyBuf []byte
     161             : }
     162             : 
     163             : // PrepareForReuse discards any excessively large buffers.
     164           2 : func (bufs *DefragmentingBuffers) PrepareForReuse() {
     165           2 :         if cap(bufs.currBuf) > bufferReuseMaxCapacity {
     166           2 :                 bufs.currBuf = nil
     167           2 :         }
     168           2 :         if cap(bufs.keyBuf) > bufferReuseMaxCapacity {
     169           0 :                 bufs.keyBuf = nil
     170           0 :         }
     171           2 :         if cap(bufs.keysBuf) > keysReuseMaxCapacity {
     172           0 :                 bufs.keysBuf = nil
     173           0 :         }
     174             : }
     175             : 
     176             : // Assert that *DefragmentingIter implements the FragmentIterator interface.
     177             : var _ FragmentIterator = (*DefragmentingIter)(nil)
     178             : 
     179             : // Init initializes the defragmenting iter using the provided defragment
     180             : // method.
     181             : func (i *DefragmentingIter) Init(
     182             :         comparer *base.Comparer,
     183             :         iter FragmentIterator,
     184             :         equal DefragmentMethod,
     185             :         reducer DefragmentReducer,
     186             :         bufs *DefragmentingBuffers,
     187           2 : ) {
     188           2 :         *i = DefragmentingIter{
     189           2 :                 DefragmentingBuffers: bufs,
     190           2 :                 comparer:             comparer,
     191           2 :                 equal:                comparer.Equal,
     192           2 :                 iter:                 iter,
     193           2 :                 method:               equal,
     194           2 :                 reduce:               reducer,
     195           2 :         }
     196           2 : }
     197             : 
     198             : // Error returns any accumulated error.
     199           2 : func (i *DefragmentingIter) Error() error {
     200           2 :         return i.iter.Error()
     201           2 : }
     202             : 
     203             : // Close closes the underlying iterators.
     204           2 : func (i *DefragmentingIter) Close() error {
     205           2 :         return i.iter.Close()
     206           2 : }
     207             : 
     208             : // SeekGE moves the iterator to the first span covering a key greater than or
     209             : // equal to the given key. This is equivalent to seeking to the first span with
     210             : // an end key greater than the given key.
     211           2 : func (i *DefragmentingIter) SeekGE(key []byte) *Span {
     212           2 :         i.iterSpan = i.iter.SeekGE(key)
     213           2 :         if i.iterSpan == nil {
     214           2 :                 i.iterPos = iterPosCurr
     215           2 :                 return nil
     216           2 :         } else if i.iterSpan.Empty() {
     217           2 :                 i.iterPos = iterPosCurr
     218           2 :                 return i.iterSpan
     219           2 :         }
     220             :         // If the span starts strictly after key, we know there mustn't be an
     221             :         // earlier span that ends at i.iterSpan.Start, otherwise i.iter would've
     222             :         // returned that span instead.
     223           2 :         if i.comparer.Compare(i.iterSpan.Start, key) > 0 {
     224           2 :                 return i.defragmentForward()
     225           2 :         }
     226             : 
     227             :         // The span we landed on has a Start bound ≤ key. There may be additional
     228             :         // fragments before this span. Defragment backward to find the start of the
     229             :         // defragmented span.
     230           2 :         i.defragmentBackward()
     231           2 :         if i.iterPos == iterPosPrev {
     232           2 :                 // Next once back onto the span.
     233           2 :                 i.iterSpan = i.iter.Next()
     234           2 :         }
     235             :         // Defragment the full span from its start.
     236           2 :         return i.defragmentForward()
     237             : }
     238             : 
     239             : // SeekLT moves the iterator to the last span covering a key less than the
     240             : // given key. This is equivalent to seeking to the last span with a start
     241             : // key less than the given key.
     242           2 : func (i *DefragmentingIter) SeekLT(key []byte) *Span {
     243           2 :         i.iterSpan = i.iter.SeekLT(key)
     244           2 :         if i.iterSpan == nil {
     245           2 :                 i.iterPos = iterPosCurr
     246           2 :                 return nil
     247           2 :         } else if i.iterSpan.Empty() {
     248           2 :                 i.iterPos = iterPosCurr
     249           2 :                 return i.iterSpan
     250           2 :         }
     251             :         // If the span ends strictly before key, we know there mustn't be a later
     252             :         // span that starts at i.iterSpan.End, otherwise i.iter would've returned
     253             :         // that span instead.
     254           2 :         if i.comparer.Compare(i.iterSpan.End, key) < 0 {
     255           2 :                 return i.defragmentBackward()
     256           2 :         }
     257             : 
     258             :         // The span we landed on has a End bound ≥ key. There may be additional
     259             :         // fragments after this span. Defragment forward to find the end of the
     260             :         // defragmented span.
     261           2 :         i.defragmentForward()
     262           2 :         if i.iterPos == iterPosNext {
     263           2 :                 // Prev once back onto the span.
     264           2 :                 i.iterSpan = i.iter.Prev()
     265           2 :         }
     266             :         // Defragment the full span from its end.
     267           2 :         return i.defragmentBackward()
     268             : }
     269             : 
     270             : // First seeks the iterator to the first span and returns it.
     271           2 : func (i *DefragmentingIter) First() *Span {
     272           2 :         i.iterSpan = i.iter.First()
     273           2 :         if i.iterSpan == nil {
     274           2 :                 i.iterPos = iterPosCurr
     275           2 :                 return nil
     276           2 :         }
     277           2 :         return i.defragmentForward()
     278             : }
     279             : 
     280             : // Last seeks the iterator to the last span and returns it.
     281           2 : func (i *DefragmentingIter) Last() *Span {
     282           2 :         i.iterSpan = i.iter.Last()
     283           2 :         if i.iterSpan == nil {
     284           1 :                 i.iterPos = iterPosCurr
     285           1 :                 return nil
     286           1 :         }
     287           2 :         return i.defragmentBackward()
     288             : }
     289             : 
     290             : // Next advances to the next span and returns it.
     291           2 : func (i *DefragmentingIter) Next() *Span {
     292           2 :         switch i.iterPos {
     293           2 :         case iterPosPrev:
     294           2 :                 // Switching directions; The iterator is currently positioned over the
     295           2 :                 // last span of the previous set of fragments. In the below diagram,
     296           2 :                 // the iterator is positioned over the last span that contributes to
     297           2 :                 // the defragmented x position. We want to be positioned over the first
     298           2 :                 // span that contributes to the z position.
     299           2 :                 //
     300           2 :                 //   x x x y y y z z z
     301           2 :                 //       ^       ^
     302           2 :                 //      old     new
     303           2 :                 //
     304           2 :                 // Next once to move onto y, defragment forward to land on the first z
     305           2 :                 // position.
     306           2 :                 i.iterSpan = i.iter.Next()
     307           2 :                 if invariants.Enabled && i.iterSpan == nil {
     308           0 :                         panic("pebble: invariant violation: no next span while switching directions")
     309             :                 }
     310             :                 // We're now positioned on the first span that was defragmented into the
     311             :                 // current iterator position. Skip over the rest of the current iterator
     312             :                 // position's constitutent fragments. In the above example, this would
     313             :                 // land on the first 'z'.
     314           2 :                 i.defragmentForward()
     315           2 :                 if i.iterSpan == nil {
     316           2 :                         i.iterPos = iterPosCurr
     317           2 :                         return nil
     318           2 :                 }
     319             : 
     320             :                 // Now that we're positioned over the first of the next set of
     321             :                 // fragments, defragment forward.
     322           2 :                 return i.defragmentForward()
     323           2 :         case iterPosCurr:
     324           2 :                 // iterPosCurr is only used when the iter is exhausted or when the iterator
     325           2 :                 // is at an empty span.
     326           2 :                 if invariants.Enabled && i.iterSpan != nil && !i.iterSpan.Empty() {
     327           0 :                         panic("pebble: invariant violation: iterPosCurr with valid iterSpan")
     328             :                 }
     329             : 
     330           2 :                 i.iterSpan = i.iter.Next()
     331           2 :                 if i.iterSpan == nil {
     332           2 :                         return nil
     333           2 :                 }
     334           2 :                 return i.defragmentForward()
     335           2 :         case iterPosNext:
     336           2 :                 // Already at the next span.
     337           2 :                 if i.iterSpan == nil {
     338           2 :                         i.iterPos = iterPosCurr
     339           2 :                         return nil
     340           2 :                 }
     341           2 :                 return i.defragmentForward()
     342           0 :         default:
     343           0 :                 panic("unreachable")
     344             :         }
     345             : }
     346             : 
     347             : // Prev steps back to the previous span and returns it.
     348           2 : func (i *DefragmentingIter) Prev() *Span {
     349           2 :         switch i.iterPos {
     350           2 :         case iterPosPrev:
     351           2 :                 // Already at the previous span.
     352           2 :                 if i.iterSpan == nil {
     353           2 :                         i.iterPos = iterPosCurr
     354           2 :                         return nil
     355           2 :                 }
     356           2 :                 return i.defragmentBackward()
     357           2 :         case iterPosCurr:
     358           2 :                 // iterPosCurr is only used when the iter is exhausted or when the iterator
     359           2 :                 // is at an empty span.
     360           2 :                 if invariants.Enabled && i.iterSpan != nil && !i.iterSpan.Empty() {
     361           0 :                         panic("pebble: invariant violation: iterPosCurr with valid iterSpan")
     362             :                 }
     363             : 
     364           2 :                 i.iterSpan = i.iter.Prev()
     365           2 :                 if i.iterSpan == nil {
     366           2 :                         return nil
     367           2 :                 }
     368           2 :                 return i.defragmentBackward()
     369           2 :         case iterPosNext:
     370           2 :                 // Switching directions; The iterator is currently positioned over the
     371           2 :                 // first fragment of the next set of fragments. In the below diagram,
     372           2 :                 // the iterator is positioned over the first span that contributes to
     373           2 :                 // the defragmented z position. We want to be positioned over the last
     374           2 :                 // span that contributes to the x position.
     375           2 :                 //
     376           2 :                 //   x x x y y y z z z
     377           2 :                 //       ^       ^
     378           2 :                 //      new     old
     379           2 :                 //
     380           2 :                 // Prev once to move onto y, defragment backward to land on the last x
     381           2 :                 // position.
     382           2 :                 i.iterSpan = i.iter.Prev()
     383           2 :                 if invariants.Enabled && i.iterSpan == nil {
     384           0 :                         panic("pebble: invariant violation: no previous span while switching directions")
     385             :                 }
     386             :                 // We're now positioned on the last span that was defragmented into the
     387             :                 // current iterator position. Skip over the rest of the current iterator
     388             :                 // position's constitutent fragments. In the above example, this would
     389             :                 // land on the last 'x'.
     390           2 :                 i.defragmentBackward()
     391           2 : 
     392           2 :                 // Now that we're positioned over the last of the prev set of
     393           2 :                 // fragments, defragment backward.
     394           2 :                 if i.iterSpan == nil {
     395           2 :                         i.iterPos = iterPosCurr
     396           2 :                         return nil
     397           2 :                 }
     398           2 :                 return i.defragmentBackward()
     399           0 :         default:
     400           0 :                 panic("unreachable")
     401             :         }
     402             : }
     403             : 
     404             : // checkEqual checks the two spans for logical equivalence. It uses the passed-in
     405             : // DefragmentMethod and ensures both spans are NOT empty; not defragmenting empty
     406             : // spans is an optimization that lets us load fewer sstable blocks.
     407           2 : func (i *DefragmentingIter) checkEqual(left, right *Span) bool {
     408           2 :         return (!left.Empty() && !right.Empty()) && i.method.ShouldDefragment(i.equal, i.iterSpan, &i.curr)
     409           2 : }
     410             : 
     411             : // defragmentForward defragments spans in the forward direction, starting from
     412             : // i.iter's current position. The span at the current position must be non-nil,
     413             : // but may be Empty().
     414           2 : func (i *DefragmentingIter) defragmentForward() *Span {
     415           2 :         if i.iterSpan.Empty() {
     416           2 :                 // An empty span will never be equal to another span; see checkEqual for
     417           2 :                 // why. To avoid loading non-empty range keys further ahead by calling Next,
     418           2 :                 // return early.
     419           2 :                 i.iterPos = iterPosCurr
     420           2 :                 return i.iterSpan
     421           2 :         }
     422           2 :         i.saveCurrent()
     423           2 : 
     424           2 :         i.iterPos = iterPosNext
     425           2 :         i.iterSpan = i.iter.Next()
     426           2 :         for i.iterSpan != nil {
     427           2 :                 if !i.equal(i.curr.End, i.iterSpan.Start) {
     428           2 :                         // Not a continuation.
     429           2 :                         break
     430             :                 }
     431           2 :                 if !i.checkEqual(i.iterSpan, &i.curr) {
     432           2 :                         // Not a continuation.
     433           2 :                         break
     434             :                 }
     435           2 :                 i.keyBuf = append(i.keyBuf[:0], i.iterSpan.End...)
     436           2 :                 i.curr.End = i.keyBuf
     437           2 :                 i.keysBuf = i.reduce(i.keysBuf, i.iterSpan.Keys)
     438           2 :                 i.iterSpan = i.iter.Next()
     439             :         }
     440           2 :         i.curr.Keys = i.keysBuf
     441           2 :         return &i.curr
     442             : }
     443             : 
     444             : // defragmentBackward defragments spans in the backward direction, starting from
     445             : // i.iter's current position. The span at the current position must be non-nil,
     446             : // but may be Empty().
     447           2 : func (i *DefragmentingIter) defragmentBackward() *Span {
     448           2 :         if i.iterSpan.Empty() {
     449           2 :                 // An empty span will never be equal to another span; see checkEqual for
     450           2 :                 // why. To avoid loading non-empty range keys further ahead by calling Next,
     451           2 :                 // return early.
     452           2 :                 i.iterPos = iterPosCurr
     453           2 :                 return i.iterSpan
     454           2 :         }
     455           2 :         i.saveCurrent()
     456           2 : 
     457           2 :         i.iterPos = iterPosPrev
     458           2 :         i.iterSpan = i.iter.Prev()
     459           2 :         for i.iterSpan != nil {
     460           2 :                 if !i.equal(i.curr.Start, i.iterSpan.End) {
     461           2 :                         // Not a continuation.
     462           2 :                         break
     463             :                 }
     464           2 :                 if !i.checkEqual(i.iterSpan, &i.curr) {
     465           2 :                         // Not a continuation.
     466           2 :                         break
     467             :                 }
     468           2 :                 i.keyBuf = append(i.keyBuf[:0], i.iterSpan.Start...)
     469           2 :                 i.curr.Start = i.keyBuf
     470           2 :                 i.keysBuf = i.reduce(i.keysBuf, i.iterSpan.Keys)
     471           2 :                 i.iterSpan = i.iter.Prev()
     472             :         }
     473           2 :         i.curr.Keys = i.keysBuf
     474           2 :         return &i.curr
     475             : }
     476             : 
     477           2 : func (i *DefragmentingIter) saveCurrent() {
     478           2 :         i.currBuf.Reset()
     479           2 :         i.keysBuf = i.keysBuf[:0]
     480           2 :         i.keyBuf = i.keyBuf[:0]
     481           2 :         if i.iterSpan == nil {
     482           0 :                 return
     483           0 :         }
     484           2 :         i.curr = Span{
     485           2 :                 Start:     i.saveBytes(i.iterSpan.Start),
     486           2 :                 End:       i.saveBytes(i.iterSpan.End),
     487           2 :                 KeysOrder: i.iterSpan.KeysOrder,
     488           2 :         }
     489           2 :         for j := range i.iterSpan.Keys {
     490           2 :                 i.keysBuf = append(i.keysBuf, Key{
     491           2 :                         Trailer: i.iterSpan.Keys[j].Trailer,
     492           2 :                         Suffix:  i.saveBytes(i.iterSpan.Keys[j].Suffix),
     493           2 :                         Value:   i.saveBytes(i.iterSpan.Keys[j].Value),
     494           2 :                 })
     495           2 :         }
     496           2 :         i.curr.Keys = i.keysBuf
     497             : }
     498             : 
     499           2 : func (i *DefragmentingIter) saveBytes(b []byte) []byte {
     500           2 :         if b == nil {
     501           2 :                 return nil
     502           2 :         }
     503           2 :         i.currBuf, b = i.currBuf.Copy(b)
     504           2 :         return b
     505             : }

Generated by: LCOV version 1.14