LCOV - code coverage report
Current view: top level - pebble/internal/keyspan - fragmenter.go (source / functions) Hit Total Coverage
Test: 2024-06-06 08:16Z 8fb46650 - tests + meta.lcov Lines: 134 155 86.5 %
Date: 2024-06-06 08:17:16 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package keyspan
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "sort"
      10             : 
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/invariants"
      13             : )
      14             : 
      15             : type spansByStartKey struct {
      16             :         cmp base.Compare
      17             :         buf []Span
      18             : }
      19             : 
      20           1 : func (v *spansByStartKey) Len() int { return len(v.buf) }
      21           1 : func (v *spansByStartKey) Less(i, j int) bool {
      22           1 :         return v.cmp(v.buf[i].Start, v.buf[j].Start) < 0
      23           1 : }
      24           1 : func (v *spansByStartKey) Swap(i, j int) {
      25           1 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
      26           1 : }
      27             : 
      28             : type spansByEndKey struct {
      29             :         cmp base.Compare
      30             :         buf []Span
      31             : }
      32             : 
      33           2 : func (v *spansByEndKey) Len() int { return len(v.buf) }
      34           2 : func (v *spansByEndKey) Less(i, j int) bool {
      35           2 :         return v.cmp(v.buf[i].End, v.buf[j].End) < 0
      36           2 : }
      37           2 : func (v *spansByEndKey) Swap(i, j int) {
      38           2 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
      39           2 : }
      40             : 
      41             : // keysBySeqNumKind sorts spans by the start key's sequence number in
      42             : // descending order. If two spans have equal sequence number, they're compared
      43             : // by key kind in descending order. This ordering matches the ordering of
      44             : // base.InternalCompare among keys with matching user keys.
      45             : type keysBySeqNumKind []Key
      46             : 
      47           2 : func (v *keysBySeqNumKind) Len() int           { return len(*v) }
      48           2 : func (v *keysBySeqNumKind) Less(i, j int) bool { return (*v)[i].Trailer > (*v)[j].Trailer }
      49           2 : func (v *keysBySeqNumKind) Swap(i, j int)      { (*v)[i], (*v)[j] = (*v)[j], (*v)[i] }
      50             : 
      51             : // Sort the spans by start key. This is the ordering required by the
      52             : // Fragmenter. Usually spans are naturally sorted by their start key,
      53             : // but that isn't true for range deletion tombstones in the legacy
      54             : // range-del-v1 block format.
      55           1 : func Sort(cmp base.Compare, spans []Span) {
      56           1 :         sorter := spansByStartKey{
      57           1 :                 cmp: cmp,
      58           1 :                 buf: spans,
      59           1 :         }
      60           1 :         sort.Sort(&sorter)
      61           1 : }
      62             : 
      63             : // Fragmenter fragments a set of spans such that overlapping spans are
      64             : // split at their overlap points. The fragmented spans are output to the
      65             : // supplied Output function.
      66             : type Fragmenter struct {
      67             :         Cmp    base.Compare
      68             :         Format base.FormatKey
      69             :         // Emit is called to emit a fragmented span and its keys. Every key defined
      70             :         // within the emitted Span applies to the entirety of the Span's key span.
      71             :         // Keys are ordered in decreasing order of their sequence numbers, and if
      72             :         // equal, decreasing order of key kind.
      73             :         Emit func(Span)
      74             :         // pending contains the list of pending fragments that have not been
      75             :         // flushed to the block writer. Note that the spans have not been
      76             :         // fragmented on the end keys yet. That happens as the spans are
      77             :         // flushed. All pending spans have the same Start.
      78             :         pending []Span
      79             :         // doneBuf is used to buffer completed span fragments when flushing to a
      80             :         // specific key (e.g. TruncateAndFlushTo). It is cached in the Fragmenter to
      81             :         // allow reuse.
      82             :         doneBuf []Span
      83             :         // sortBuf is used to sort fragments by end key when flushing.
      84             :         sortBuf spansByEndKey
      85             :         // flushBuf is used to sort keys by (seqnum,kind) before emitting.
      86             :         flushBuf keysBySeqNumKind
      87             :         // flushedKey is the key that fragments have been flushed up to. Any
      88             :         // additional spans added to the fragmenter must have a start key >=
      89             :         // flushedKey. A nil value indicates flushedKey has not been set.
      90             :         flushedKey []byte
      91             :         finished   bool
      92             : }
      93             : 
      94           0 : func (f *Fragmenter) checkInvariants(buf []Span) {
      95           0 :         for i := 1; i < len(buf); i++ {
      96           0 :                 if f.Cmp(buf[i].Start, buf[i].End) >= 0 {
      97           0 :                         panic(fmt.Sprintf("pebble: empty pending span invariant violated: %s", buf[i]))
      98             :                 }
      99           0 :                 if f.Cmp(buf[i-1].Start, buf[i].Start) != 0 {
     100           0 :                         panic(fmt.Sprintf("pebble: pending span invariant violated: %s %s",
     101           0 :                                 f.Format(buf[i-1].Start), f.Format(buf[i].Start)))
     102             :                 }
     103             :         }
     104             : }
     105             : 
     106             : // Add adds a span to the fragmenter. Spans may overlap and the
     107             : // fragmenter will internally split them. The spans must be presented in
     108             : // increasing start key order. That is, Add must be called with a series
     109             : // of spans like:
     110             : //
     111             : //      a---e
     112             : //        c---g
     113             : //        c-----i
     114             : //               j---n
     115             : //               j-l
     116             : //
     117             : // We need to fragment the spans at overlap points. In the above
     118             : // example, we'd create:
     119             : //
     120             : //      a-c-e
     121             : //        c-e-g
     122             : //        c-e-g-i
     123             : //               j-l-n
     124             : //               j-l
     125             : //
     126             : // The fragments need to be output sorted by start key, and for equal start
     127             : // keys, sorted by descending sequence number. This last part requires a mild
     128             : // bit of care as the fragments are not created in descending sequence number
     129             : // order.
     130             : //
     131             : // Once a start key has been seen, we know that we'll never see a smaller
     132             : // start key and can thus flush all of the fragments that lie before that
     133             : // start key.
     134             : //
     135             : // Walking through the example above, we start with:
     136             : //
     137             : //      a---e
     138             : //
     139             : // Next we add [c,g) resulting in:
     140             : //
     141             : //      a-c-e
     142             : //        c---g
     143             : //
     144             : // The fragment [a,c) is flushed leaving the pending spans as:
     145             : //
     146             : //      c-e
     147             : //      c---g
     148             : //
     149             : // The next span is [c,i):
     150             : //
     151             : //      c-e
     152             : //      c---g
     153             : //      c-----i
     154             : //
     155             : // No fragments are flushed. The next span is [j,n):
     156             : //
     157             : //      c-e
     158             : //      c---g
     159             : //      c-----i
     160             : //             j---n
     161             : //
     162             : // The fragments [c,e), [c,g) and [c,i) are flushed. We sort these fragments
     163             : // by their end key, then split the fragments on the end keys:
     164             : //
     165             : //      c-e
     166             : //      c-e-g
     167             : //      c-e---i
     168             : //
     169             : // The [c,e) fragments all get flushed leaving:
     170             : //
     171             : //      e-g
     172             : //      e---i
     173             : //
     174             : // This process continues until there are no more fragments to flush.
     175             : //
     176             : // WARNING: the slices backing Start, End, Keys, Key.Suffix and Key.Value are
     177             : // all retained after this method returns and should not be modified. This is
     178             : // safe for spans that are added from a memtable or batch. It is partially
     179             : // unsafe for a span read from an sstable. Specifically, the Keys slice of a
     180             : // Span returned during sstable iteration is only valid until the next iterator
     181             : // operation. The stability of the user keys depend on whether the block is
     182             : // prefix compressed, and in practice Pebble never prefix compresses range
     183             : // deletion and range key blocks, so these keys are stable. Because of this key
     184             : // stability, typically callers only need to perform a shallow clone of the Span
     185             : // before Add-ing it to the fragmenter.
     186             : //
     187             : // Add requires the provided span's keys are sorted in Trailer descending order.
     188           2 : func (f *Fragmenter) Add(s Span) {
     189           2 :         if f.finished {
     190           0 :                 panic("pebble: span fragmenter already finished")
     191           2 :         } else if s.KeysOrder != ByTrailerDesc {
     192           0 :                 panic("pebble: span keys unexpectedly not in trailer descending order")
     193             :         }
     194           2 :         if f.flushedKey != nil {
     195           2 :                 switch c := f.Cmp(s.Start, f.flushedKey); {
     196           0 :                 case c < 0:
     197           0 :                         panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
     198           0 :                                 f.Format(s.Start), f.Format(f.flushedKey)))
     199             :                 }
     200             :         }
     201           2 :         if f.Cmp(s.Start, s.End) >= 0 {
     202           1 :                 // An empty span, we can ignore it.
     203           1 :                 return
     204           1 :         }
     205           2 :         if invariants.RaceEnabled {
     206           0 :                 f.checkInvariants(f.pending)
     207           0 :                 defer func() { f.checkInvariants(f.pending) }()
     208             :         }
     209             : 
     210           2 :         if len(f.pending) > 0 {
     211           2 :                 // Since all of the pending spans have the same start key, we only need
     212           2 :                 // to compare against the first one.
     213           2 :                 switch c := f.Cmp(f.pending[0].Start, s.Start); {
     214           1 :                 case c > 0:
     215           1 :                         panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
     216           1 :                                 f.Format(f.pending[0].Start), f.Format(s.Start)))
     217           2 :                 case c == 0:
     218           2 :                         // The new span has the same start key as the existing pending
     219           2 :                         // spans. Add it to the pending buffer.
     220           2 :                         f.pending = append(f.pending, s)
     221           2 :                         return
     222             :                 }
     223             : 
     224             :                 // At this point we know that the new start key is greater than the pending
     225             :                 // spans start keys.
     226           2 :                 f.truncateAndFlush(s.Start)
     227             :         }
     228             : 
     229           2 :         f.pending = append(f.pending, s)
     230             : }
     231             : 
     232             : // Empty returns true if all fragments added so far have finished flushing.
     233           0 : func (f *Fragmenter) Empty() bool {
     234           0 :         return f.finished || len(f.pending) == 0
     235           0 : }
     236             : 
     237             : // Start returns the start key of the first span in the pending buffer, or nil
     238             : // if there are no pending spans. The start key of all pending spans is the same
     239             : // as that of the first one.
     240           1 : func (f *Fragmenter) Start() []byte {
     241           1 :         if len(f.pending) > 0 {
     242           1 :                 return f.pending[0].Start
     243           1 :         }
     244           1 :         return nil
     245             : }
     246             : 
     247             : // Flushes all pending spans up to key (exclusive).
     248             : //
     249             : // WARNING: The specified key is stored without making a copy, so all callers
     250             : // must ensure it is safe.
     251           2 : func (f *Fragmenter) truncateAndFlush(key []byte) {
     252           2 :         f.flushedKey = append(f.flushedKey[:0], key...)
     253           2 :         done := f.doneBuf[:0]
     254           2 :         pending := f.pending
     255           2 :         f.pending = f.pending[:0]
     256           2 : 
     257           2 :         // pending and f.pending share the same underlying storage. As we iterate
     258           2 :         // over pending we append to f.pending, but only one entry is appended in
     259           2 :         // each iteration, after we have read the entry being overwritten.
     260           2 :         for _, s := range pending {
     261           2 :                 if f.Cmp(key, s.End) < 0 {
     262           2 :                         //   s: a--+--e
     263           2 :                         // new:    c------
     264           2 :                         if f.Cmp(s.Start, key) < 0 {
     265           2 :                                 done = append(done, Span{
     266           2 :                                         Start: s.Start,
     267           2 :                                         End:   key,
     268           2 :                                         Keys:  s.Keys,
     269           2 :                                 })
     270           2 :                         }
     271           2 :                         f.pending = append(f.pending, Span{
     272           2 :                                 Start: key,
     273           2 :                                 End:   s.End,
     274           2 :                                 Keys:  s.Keys,
     275           2 :                         })
     276           2 :                 } else {
     277           2 :                         //   s: a-----e
     278           2 :                         // new:       e----
     279           2 :                         done = append(done, s)
     280           2 :                 }
     281             :         }
     282             : 
     283           2 :         f.doneBuf = done[:0]
     284           2 :         f.flush(done, nil)
     285             : }
     286             : 
     287             : // flush a group of range spans to the block. The spans are required to all have
     288             : // the same start key. We flush all span fragments until startKey > lastKey. If
     289             : // lastKey is nil, all span fragments are flushed. The specification of a
     290             : // non-nil lastKey occurs for range deletion tombstones during compaction where
     291             : // we want to flush (but not truncate) all range tombstones that start at or
     292             : // before the first key in the next sstable. Consider:
     293             : //
     294             : //      a---e#10
     295             : //      a------h#9
     296             : //
     297             : // If a compaction splits the sstables at key c we want the first sstable to
     298             : // contain the tombstones [a,e)#10 and [a,e)#9. Fragmentation would naturally
     299             : // produce a tombstone [e,h)#9, but we don't need to output that tombstone to
     300             : // the first sstable.
     301           2 : func (f *Fragmenter) flush(buf []Span, lastKey []byte) {
     302           2 :         if invariants.RaceEnabled {
     303           0 :                 f.checkInvariants(buf)
     304           0 :         }
     305             : 
     306             :         // Sort the spans by end key. This will allow us to walk over the spans and
     307             :         // easily determine the next split point (the smallest end-key).
     308           2 :         f.sortBuf.cmp = f.Cmp
     309           2 :         f.sortBuf.buf = buf
     310           2 :         sort.Sort(&f.sortBuf)
     311           2 : 
     312           2 :         // Loop over the spans, splitting by end key.
     313           2 :         for len(buf) > 0 {
     314           2 :                 // A prefix of spans will end at split. remove represents the count of
     315           2 :                 // that prefix.
     316           2 :                 remove := 1
     317           2 :                 split := buf[0].End
     318           2 :                 f.flushBuf = append(f.flushBuf[:0], buf[0].Keys...)
     319           2 : 
     320           2 :                 for i := 1; i < len(buf); i++ {
     321           2 :                         if f.Cmp(split, buf[i].End) == 0 {
     322           2 :                                 remove++
     323           2 :                         }
     324           2 :                         f.flushBuf = append(f.flushBuf, buf[i].Keys...)
     325             :                 }
     326             : 
     327           2 :                 sort.Sort(&f.flushBuf)
     328           2 : 
     329           2 :                 f.Emit(Span{
     330           2 :                         Start: buf[0].Start,
     331           2 :                         End:   split,
     332           2 :                         // Copy the sorted keys to a new slice.
     333           2 :                         //
     334           2 :                         // This allocation is an unfortunate side effect of the Fragmenter and
     335           2 :                         // the expectation that the spans it produces are available in-memory
     336           2 :                         // indefinitely.
     337           2 :                         //
     338           2 :                         // Eventually, we should be able to replace the fragmenter with the
     339           2 :                         // keyspanimpl.MergingIter which will perform just-in-time
     340           2 :                         // fragmentation, and only guaranteeing the memory lifetime for the
     341           2 :                         // current span. The MergingIter fragments while only needing to
     342           2 :                         // access one Span per level. It only accesses the Span at the
     343           2 :                         // current position for each level. During compactions, we can write
     344           2 :                         // these spans to sstables without retaining previous Spans.
     345           2 :                         Keys: append([]Key(nil), f.flushBuf...),
     346           2 :                 })
     347           2 : 
     348           2 :                 if lastKey != nil && f.Cmp(split, lastKey) > 0 {
     349           0 :                         break
     350             :                 }
     351             : 
     352             :                 // Adjust the start key for every remaining span.
     353           2 :                 buf = buf[remove:]
     354           2 :                 for i := range buf {
     355           2 :                         buf[i].Start = split
     356           2 :                 }
     357             :         }
     358             : }
     359             : 
     360             : // Finish flushes any remaining fragments to the output. It is an error to call
     361             : // this if any other spans will be added.
     362           2 : func (f *Fragmenter) Finish() {
     363           2 :         if f.finished {
     364           0 :                 panic("pebble: span fragmenter already finished")
     365             :         }
     366           2 :         f.flush(f.pending, nil)
     367           2 :         f.finished = true
     368             : }

Generated by: LCOV version 1.14