LCOV - code coverage report
Current view: top level - pebble/internal/keyspan - defragment.go (source / functions) Hit Total Coverage
Test: 2023-10-16 08:17Z bbbf3df1 - meta test only.lcov Lines: 248 267 92.9 %
Date: 2023-10-16 08:18:51 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package keyspan
       6             : 
       7             : import (
       8             :         "bytes"
       9             : 
      10             :         "github.com/cockroachdb/pebble/internal/base"
      11             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      12             :         "github.com/cockroachdb/pebble/internal/invariants"
      13             : )
      14             : 
      15             : // bufferReuseMaxCapacity is the maximum capacity of a DefragmentingIter buffer
      16             : // that DefragmentingIter will reuse. Buffers larger than this will be
      17             : // discarded and reallocated as necessary.
      18             : const bufferReuseMaxCapacity = 10 << 10 // 10 KB
      19             : 
      20             : // keysReuseMaxCapacity is the maximum capacity of a []keyspan.Key buffer that
      21             : // DefragmentingIter will reuse. Buffers larger than this will be discarded and
      22             : // reallocated as necessary.
      23             : const keysReuseMaxCapacity = 100
      24             : 
      25             : // DefragmentMethod configures the defragmentation performed by the
      26             : // DefragmentingIter.
      27             : type DefragmentMethod interface {
      28             :         // ShouldDefragment takes two abutting spans and returns whether the two
      29             :         // spans should be combined into a single, defragmented Span.
      30             :         ShouldDefragment(equal base.Equal, left, right *Span) bool
      31             : }
      32             : 
      33             : // The DefragmentMethodFunc type is an adapter to allow the use of ordinary
      34             : // functions as DefragmentMethods. If f is a function with the appropriate
      35             : // signature, DefragmentMethodFunc(f) is a DefragmentMethod that calls f.
      36             : type DefragmentMethodFunc func(equal base.Equal, left, right *Span) bool
      37             : 
      38             : // ShouldDefragment calls f(equal, left, right).
      39           1 : func (f DefragmentMethodFunc) ShouldDefragment(equal base.Equal, left, right *Span) bool {
      40           1 :         return f(equal, left, right)
      41           1 : }
      42             : 
      43             : // DefragmentInternal configures a DefragmentingIter to defragment spans
      44             : // only if they have identical keys. It requires spans' keys to be sorted in
      45             : // trailer descending order.
      46             : //
      47             : // This defragmenting method is intended for use in compactions that may see
      48             : // internal range keys fragments that may now be joined, because the state that
      49             : // required their fragmentation has been dropped.
      50           1 : var DefragmentInternal DefragmentMethod = DefragmentMethodFunc(func(equal base.Equal, a, b *Span) bool {
      51           1 :         if a.KeysOrder != ByTrailerDesc || b.KeysOrder != ByTrailerDesc {
      52           0 :                 panic("pebble: span keys unexpectedly not in trailer descending order")
      53             :         }
      54           1 :         if len(a.Keys) != len(b.Keys) {
      55           1 :                 return false
      56           1 :         }
      57           1 :         for i := range a.Keys {
      58           1 :                 if a.Keys[i].Trailer != b.Keys[i].Trailer {
      59           1 :                         return false
      60           1 :                 }
      61           1 :                 if !equal(a.Keys[i].Suffix, b.Keys[i].Suffix) {
      62           0 :                         return false
      63           0 :                 }
      64           1 :                 if !bytes.Equal(a.Keys[i].Value, b.Keys[i].Value) {
      65           0 :                         return false
      66           0 :                 }
      67             :         }
      68           1 :         return true
      69             : })
      70             : 
      71             : // DefragmentReducer merges the current and next Key slices, returning a new Key
      72             : // slice.
      73             : //
      74             : // Implementations should modify and return `cur` to save on allocations, or
      75             : // consider allocating a new slice, as the `cur` slice may be retained by the
      76             : // DefragmentingIter and mutated. The `next` slice must not be mutated.
      77             : //
      78             : // The incoming slices are sorted by (SeqNum, Kind) descending. The output slice
      79             : // must also have this sort order.
      80             : type DefragmentReducer func(cur, next []Key) []Key
      81             : 
      82             : // StaticDefragmentReducer is a no-op DefragmentReducer that simply returns the
      83             : // current key slice, effectively retaining the first set of keys encountered
      84             : // for a defragmented span.
      85             : //
      86             : // This reducer can be used, for example, when the set of Keys for each Span
      87             : // being reduced is not expected to change, and therefore the keys from the
      88             : // first span encountered can be used without considering keys in subsequent
      89             : // spans.
      90           1 : var StaticDefragmentReducer DefragmentReducer = func(cur, _ []Key) []Key {
      91           1 :         return cur
      92           1 : }
      93             : 
      94             : // iterPos is an enum indicating the position of the defragmenting iter's
      95             : // wrapped iter. The defragmenting iter must look ahead or behind when
      96             : // defragmenting forward or backwards respectively, and this enum records that
      97             : // current position.
      98             : type iterPos int8
      99             : 
     100             : const (
     101             :         iterPosPrev iterPos = -1
     102             :         iterPosCurr iterPos = 0
     103             :         iterPosNext iterPos = +1
     104             : )
     105             : 
     106             : // DefragmentingIter wraps a key span iterator, defragmenting physical
     107             : // fragmentation during iteration.
     108             : //
     109             : // During flushes and compactions, keys applied over a span may be split at
     110             : // sstable boundaries. This fragmentation can produce internal key bounds that
     111             : // do not match any of the bounds ever supplied to a user operation. This
     112             : // physical fragmentation is necessary to avoid excessively wide sstables.
     113             : //
     114             : // The defragmenting iterator undoes this physical fragmentation, joining spans
     115             : // with abutting bounds and equal state. The defragmenting iterator takes a
     116             : // DefragmentMethod to determine what is "equal state" for a span. The
     117             : // DefragmentMethod is a function type, allowing arbitrary comparisons between
     118             : // Span keys.
     119             : //
     120             : // Seeking (SeekGE, SeekLT) poses an obstacle to defragmentation. A seek may
     121             : // land on a physical fragment in the middle of several fragments that must be
     122             : // defragmented. A seek that lands in a fragment straddling the seek key must
     123             : // first degfragment in the opposite direction of iteration to find the
     124             : // beginning of the defragmented span, and then defragments in the iteration
     125             : // direction, ensuring it's found a whole defragmented span.
     126             : type DefragmentingIter struct {
     127             :         // DefragmentingBuffers holds buffers used for copying iterator state.
     128             :         *DefragmentingBuffers
     129             :         comparer *base.Comparer
     130             :         equal    base.Equal
     131             :         iter     FragmentIterator
     132             :         iterSpan *Span
     133             :         iterPos  iterPos
     134             : 
     135             :         // curr holds the span at the current iterator position.
     136             :         curr Span
     137             : 
     138             :         // method is a comparison function for two spans. method is called when two
     139             :         // spans are abutting to determine whether they may be defragmented.
     140             :         // method does not itself check for adjacency for the two spans.
     141             :         method DefragmentMethod
     142             : 
     143             :         // reduce is the reducer function used to collect Keys across all spans that
     144             :         // constitute a defragmented span.
     145             :         reduce DefragmentReducer
     146             : }
     147             : 
     148             : // DefragmentingBuffers holds buffers used for copying iterator state.
     149             : type DefragmentingBuffers struct {
     150             :         // currBuf is a buffer for use when copying user keys for curr. currBuf is
     151             :         // cleared between positioning methods.
     152             :         currBuf bytealloc.A
     153             :         // keysBuf is a buffer for use when copying Keys for DefragmentingIter.curr.
     154             :         keysBuf []Key
     155             :         // keyBuf is a buffer specifically for the defragmented start key when
     156             :         // defragmenting backwards or the defragmented end key when defragmenting
     157             :         // forwards. These bounds are overwritten repeatedly during defragmentation,
     158             :         // and the defragmentation routines overwrite keyBuf repeatedly to store
     159             :         // these extended bounds.
     160             :         keyBuf []byte
     161             : }
     162             : 
     163             : // PrepareForReuse discards any excessively large buffers.
     164           1 : func (bufs *DefragmentingBuffers) PrepareForReuse() {
     165           1 :         if cap(bufs.currBuf) > bufferReuseMaxCapacity {
     166           1 :                 bufs.currBuf = nil
     167           1 :         }
     168           1 :         if cap(bufs.keyBuf) > bufferReuseMaxCapacity {
     169           0 :                 bufs.keyBuf = nil
     170           0 :         }
     171           1 :         if cap(bufs.keysBuf) > keysReuseMaxCapacity {
     172           0 :                 bufs.keysBuf = nil
     173           0 :         }
     174             : }
     175             : 
     176             : // Assert that *DefragmentingIter implements the FragmentIterator interface.
     177             : var _ FragmentIterator = (*DefragmentingIter)(nil)
     178             : 
     179             : // Init initializes the defragmenting iter using the provided defragment
     180             : // method.
     181             : func (i *DefragmentingIter) Init(
     182             :         comparer *base.Comparer,
     183             :         iter FragmentIterator,
     184             :         equal DefragmentMethod,
     185             :         reducer DefragmentReducer,
     186             :         bufs *DefragmentingBuffers,
     187           1 : ) {
     188           1 :         *i = DefragmentingIter{
     189           1 :                 DefragmentingBuffers: bufs,
     190           1 :                 comparer:             comparer,
     191           1 :                 equal:                comparer.Equal,
     192           1 :                 iter:                 iter,
     193           1 :                 method:               equal,
     194           1 :                 reduce:               reducer,
     195           1 :         }
     196           1 : }
     197             : 
     198             : // Error returns any accumulated error.
     199           1 : func (i *DefragmentingIter) Error() error {
     200           1 :         return i.iter.Error()
     201           1 : }
     202             : 
     203             : // Close closes the underlying iterators.
     204           1 : func (i *DefragmentingIter) Close() error {
     205           1 :         return i.iter.Close()
     206           1 : }
     207             : 
     208             : // SeekGE moves the iterator to the first span covering a key greater than or
     209             : // equal to the given key. This is equivalent to seeking to the first span with
     210             : // an end key greater than the given key.
     211           1 : func (i *DefragmentingIter) SeekGE(key []byte) *Span {
     212           1 :         i.iterSpan = i.iter.SeekGE(key)
     213           1 :         if i.iterSpan == nil {
     214           1 :                 i.iterPos = iterPosCurr
     215           1 :                 return nil
     216           1 :         } else if i.iterSpan.Empty() {
     217           1 :                 i.iterPos = iterPosCurr
     218           1 :                 return i.iterSpan
     219           1 :         }
     220             :         // If the span starts strictly after key, we know there mustn't be an
     221             :         // earlier span that ends at i.iterSpan.Start, otherwise i.iter would've
     222             :         // returned that span instead.
     223           1 :         if i.comparer.Compare(i.iterSpan.Start, key) > 0 {
     224           1 :                 return i.defragmentForward()
     225           1 :         }
     226             : 
     227             :         // The span we landed on has a Start bound ≤ key. There may be additional
     228             :         // fragments before this span. Defragment backward to find the start of the
     229             :         // defragmented span.
     230           1 :         i.defragmentBackward()
     231           1 :         if i.iterPos == iterPosPrev {
     232           1 :                 // Next once back onto the span.
     233           1 :                 i.iterSpan = i.iter.Next()
     234           1 :         }
     235             :         // Defragment the full span from its start.
     236           1 :         return i.defragmentForward()
     237             : }
     238             : 
     239             : // SeekLT moves the iterator to the last span covering a key less than the
     240             : // given key. This is equivalent to seeking to the last span with a start
     241             : // key less than the given key.
     242           1 : func (i *DefragmentingIter) SeekLT(key []byte) *Span {
     243           1 :         i.iterSpan = i.iter.SeekLT(key)
     244           1 :         if i.iterSpan == nil {
     245           1 :                 i.iterPos = iterPosCurr
     246           1 :                 return nil
     247           1 :         } else if i.iterSpan.Empty() {
     248           1 :                 i.iterPos = iterPosCurr
     249           1 :                 return i.iterSpan
     250           1 :         }
     251             :         // If the span ends strictly before key, we know there mustn't be a later
     252             :         // span that starts at i.iterSpan.End, otherwise i.iter would've returned
     253             :         // that span instead.
     254           1 :         if i.comparer.Compare(i.iterSpan.End, key) < 0 {
     255           1 :                 return i.defragmentBackward()
     256           1 :         }
     257             : 
     258             :         // The span we landed on has a End bound ≥ key. There may be additional
     259             :         // fragments after this span. Defragment forward to find the end of the
     260             :         // defragmented span.
     261           1 :         i.defragmentForward()
     262           1 :         if i.iterPos == iterPosNext {
     263           1 :                 // Prev once back onto the span.
     264           1 :                 i.iterSpan = i.iter.Prev()
     265           1 :         }
     266             :         // Defragment the full span from its end.
     267           1 :         return i.defragmentBackward()
     268             : }
     269             : 
     270             : // First seeks the iterator to the first span and returns it.
     271           1 : func (i *DefragmentingIter) First() *Span {
     272           1 :         i.iterSpan = i.iter.First()
     273           1 :         if i.iterSpan == nil {
     274           1 :                 i.iterPos = iterPosCurr
     275           1 :                 return nil
     276           1 :         }
     277           1 :         return i.defragmentForward()
     278             : }
     279             : 
     280             : // Last seeks the iterator to the last span and returns it.
     281           1 : func (i *DefragmentingIter) Last() *Span {
     282           1 :         i.iterSpan = i.iter.Last()
     283           1 :         if i.iterSpan == nil {
     284           1 :                 i.iterPos = iterPosCurr
     285           1 :                 return nil
     286           1 :         }
     287           1 :         return i.defragmentBackward()
     288             : }
     289             : 
     290             : // Next advances to the next span and returns it.
     291           1 : func (i *DefragmentingIter) Next() *Span {
     292           1 :         switch i.iterPos {
     293           1 :         case iterPosPrev:
     294           1 :                 // Switching directions; The iterator is currently positioned over the
     295           1 :                 // last span of the previous set of fragments. In the below diagram,
     296           1 :                 // the iterator is positioned over the last span that contributes to
     297           1 :                 // the defragmented x position. We want to be positioned over the first
     298           1 :                 // span that contributes to the z position.
     299           1 :                 //
     300           1 :                 //   x x x y y y z z z
     301           1 :                 //       ^       ^
     302           1 :                 //      old     new
     303           1 :                 //
     304           1 :                 // Next once to move onto y, defragment forward to land on the first z
     305           1 :                 // position.
     306           1 :                 i.iterSpan = i.iter.Next()
     307           1 :                 if invariants.Enabled && i.iterSpan == nil {
     308           0 :                         panic("pebble: invariant violation: no next span while switching directions")
     309             :                 }
     310             :                 // We're now positioned on the first span that was defragmented into the
     311             :                 // current iterator position. Skip over the rest of the current iterator
     312             :                 // position's constitutent fragments. In the above example, this would
     313             :                 // land on the first 'z'.
     314           1 :                 i.defragmentForward()
     315           1 :                 if i.iterSpan == nil {
     316           1 :                         i.iterPos = iterPosCurr
     317           1 :                         return nil
     318           1 :                 }
     319             : 
     320             :                 // Now that we're positioned over the first of the next set of
     321             :                 // fragments, defragment forward.
     322           1 :                 return i.defragmentForward()
     323           1 :         case iterPosCurr:
     324           1 :                 // iterPosCurr is only used when the iter is exhausted or when the iterator
     325           1 :                 // is at an empty span.
     326           1 :                 if invariants.Enabled && i.iterSpan != nil && !i.iterSpan.Empty() {
     327           0 :                         panic("pebble: invariant violation: iterPosCurr with valid iterSpan")
     328             :                 }
     329             : 
     330           1 :                 i.iterSpan = i.iter.Next()
     331           1 :                 if i.iterSpan == nil {
     332           1 :                         return nil
     333           1 :                 }
     334           1 :                 return i.defragmentForward()
     335           1 :         case iterPosNext:
     336           1 :                 // Already at the next span.
     337           1 :                 if i.iterSpan == nil {
     338           1 :                         i.iterPos = iterPosCurr
     339           1 :                         return nil
     340           1 :                 }
     341           1 :                 return i.defragmentForward()
     342           0 :         default:
     343           0 :                 panic("unreachable")
     344             :         }
     345             : }
     346             : 
     347             : // Prev steps back to the previous span and returns it.
     348           1 : func (i *DefragmentingIter) Prev() *Span {
     349           1 :         switch i.iterPos {
     350           1 :         case iterPosPrev:
     351           1 :                 // Already at the previous span.
     352           1 :                 if i.iterSpan == nil {
     353           1 :                         i.iterPos = iterPosCurr
     354           1 :                         return nil
     355           1 :                 }
     356           1 :                 return i.defragmentBackward()
     357           1 :         case iterPosCurr:
     358           1 :                 // iterPosCurr is only used when the iter is exhausted or when the iterator
     359           1 :                 // is at an empty span.
     360           1 :                 if invariants.Enabled && i.iterSpan != nil && !i.iterSpan.Empty() {
     361           0 :                         panic("pebble: invariant violation: iterPosCurr with valid iterSpan")
     362             :                 }
     363             : 
     364           1 :                 i.iterSpan = i.iter.Prev()
     365           1 :                 if i.iterSpan == nil {
     366           1 :                         return nil
     367           1 :                 }
     368           1 :                 return i.defragmentBackward()
     369           1 :         case iterPosNext:
     370           1 :                 // Switching directions; The iterator is currently positioned over the
     371           1 :                 // first fragment of the next set of fragments. In the below diagram,
     372           1 :                 // the iterator is positioned over the first span that contributes to
     373           1 :                 // the defragmented z position. We want to be positioned over the last
     374           1 :                 // span that contributes to the x position.
     375           1 :                 //
     376           1 :                 //   x x x y y y z z z
     377           1 :                 //       ^       ^
     378           1 :                 //      new     old
     379           1 :                 //
     380           1 :                 // Prev once to move onto y, defragment backward to land on the last x
     381           1 :                 // position.
     382           1 :                 i.iterSpan = i.iter.Prev()
     383           1 :                 if invariants.Enabled && i.iterSpan == nil {
     384           0 :                         panic("pebble: invariant violation: no previous span while switching directions")
     385             :                 }
     386             :                 // We're now positioned on the last span that was defragmented into the
     387             :                 // current iterator position. Skip over the rest of the current iterator
     388             :                 // position's constitutent fragments. In the above example, this would
     389             :                 // land on the last 'x'.
     390           1 :                 i.defragmentBackward()
     391           1 : 
     392           1 :                 // Now that we're positioned over the last of the prev set of
     393           1 :                 // fragments, defragment backward.
     394           1 :                 if i.iterSpan == nil {
     395           1 :                         i.iterPos = iterPosCurr
     396           1 :                         return nil
     397           1 :                 }
     398           1 :                 return i.defragmentBackward()
     399           0 :         default:
     400           0 :                 panic("unreachable")
     401             :         }
     402             : }
     403             : 
     404             : // checkEqual checks the two spans for logical equivalence. It uses the passed-in
     405             : // DefragmentMethod and ensures both spans are NOT empty; not defragmenting empty
     406             : // spans is an optimization that lets us load fewer sstable blocks.
     407           1 : func (i *DefragmentingIter) checkEqual(left, right *Span) bool {
     408           1 :         return (!left.Empty() && !right.Empty()) && i.method.ShouldDefragment(i.equal, i.iterSpan, &i.curr)
     409           1 : }
     410             : 
     411             : // defragmentForward defragments spans in the forward direction, starting from
     412             : // i.iter's current position. The span at the current position must be non-nil,
     413             : // but may be Empty().
     414           1 : func (i *DefragmentingIter) defragmentForward() *Span {
     415           1 :         if i.iterSpan.Empty() {
     416           1 :                 // An empty span will never be equal to another span; see checkEqual for
     417           1 :                 // why. To avoid loading non-empty range keys further ahead by calling Next,
     418           1 :                 // return early.
     419           1 :                 i.iterPos = iterPosCurr
     420           1 :                 return i.iterSpan
     421           1 :         }
     422           1 :         i.saveCurrent()
     423           1 : 
     424           1 :         i.iterPos = iterPosNext
     425           1 :         i.iterSpan = i.iter.Next()
     426           1 :         for i.iterSpan != nil {
     427           1 :                 if !i.equal(i.curr.End, i.iterSpan.Start) {
     428           1 :                         // Not a continuation.
     429           1 :                         break
     430             :                 }
     431           1 :                 if !i.checkEqual(i.iterSpan, &i.curr) {
     432           1 :                         // Not a continuation.
     433           1 :                         break
     434             :                 }
     435           1 :                 i.keyBuf = append(i.keyBuf[:0], i.iterSpan.End...)
     436           1 :                 i.curr.End = i.keyBuf
     437           1 :                 i.keysBuf = i.reduce(i.keysBuf, i.iterSpan.Keys)
     438           1 :                 i.iterSpan = i.iter.Next()
     439             :         }
     440           1 :         i.curr.Keys = i.keysBuf
     441           1 :         return &i.curr
     442             : }
     443             : 
     444             : // defragmentBackward defragments spans in the backward direction, starting from
     445             : // i.iter's current position. The span at the current position must be non-nil,
     446             : // but may be Empty().
     447           1 : func (i *DefragmentingIter) defragmentBackward() *Span {
     448           1 :         if i.iterSpan.Empty() {
     449           1 :                 // An empty span will never be equal to another span; see checkEqual for
     450           1 :                 // why. To avoid loading non-empty range keys further ahead by calling Next,
     451           1 :                 // return early.
     452           1 :                 i.iterPos = iterPosCurr
     453           1 :                 return i.iterSpan
     454           1 :         }
     455           1 :         i.saveCurrent()
     456           1 : 
     457           1 :         i.iterPos = iterPosPrev
     458           1 :         i.iterSpan = i.iter.Prev()
     459           1 :         for i.iterSpan != nil {
     460           1 :                 if !i.equal(i.curr.Start, i.iterSpan.End) {
     461           1 :                         // Not a continuation.
     462           1 :                         break
     463             :                 }
     464           1 :                 if !i.checkEqual(i.iterSpan, &i.curr) {
     465           1 :                         // Not a continuation.
     466           1 :                         break
     467             :                 }
     468           1 :                 i.keyBuf = append(i.keyBuf[:0], i.iterSpan.Start...)
     469           1 :                 i.curr.Start = i.keyBuf
     470           1 :                 i.keysBuf = i.reduce(i.keysBuf, i.iterSpan.Keys)
     471           1 :                 i.iterSpan = i.iter.Prev()
     472             :         }
     473           1 :         i.curr.Keys = i.keysBuf
     474           1 :         return &i.curr
     475             : }
     476             : 
     477           1 : func (i *DefragmentingIter) saveCurrent() {
     478           1 :         i.currBuf.Reset()
     479           1 :         i.keysBuf = i.keysBuf[:0]
     480           1 :         i.keyBuf = i.keyBuf[:0]
     481           1 :         if i.iterSpan == nil {
     482           0 :                 return
     483           0 :         }
     484           1 :         i.curr = Span{
     485           1 :                 Start:     i.saveBytes(i.iterSpan.Start),
     486           1 :                 End:       i.saveBytes(i.iterSpan.End),
     487           1 :                 KeysOrder: i.iterSpan.KeysOrder,
     488           1 :         }
     489           1 :         for j := range i.iterSpan.Keys {
     490           1 :                 i.keysBuf = append(i.keysBuf, Key{
     491           1 :                         Trailer: i.iterSpan.Keys[j].Trailer,
     492           1 :                         Suffix:  i.saveBytes(i.iterSpan.Keys[j].Suffix),
     493           1 :                         Value:   i.saveBytes(i.iterSpan.Keys[j].Value),
     494           1 :                 })
     495           1 :         }
     496           1 :         i.curr.Keys = i.keysBuf
     497             : }
     498             : 
     499           1 : func (i *DefragmentingIter) saveBytes(b []byte) []byte {
     500           1 :         if b == nil {
     501           1 :                 return nil
     502           1 :         }
     503           1 :         i.currBuf, b = i.currBuf.Copy(b)
     504           1 :         return b
     505             : }

Generated by: LCOV version 1.14