LCOV - code coverage report
Current view: top level - pebble/internal/keyspan - fragmenter.go (source / functions) Hit Total Coverage
Test: 2023-10-18 08:17Z 5807b591 - tests + meta.lcov Lines: 183 205 89.3 %
Date: 2023-10-18 08:18:06 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package keyspan
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "sort"
      10             : 
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/invariants"
      13             : )
      14             : 
      15             : type spansByStartKey struct {
      16             :         cmp base.Compare
      17             :         buf []Span
      18             : }
      19             : 
      20           1 : func (v *spansByStartKey) Len() int { return len(v.buf) }
      21           1 : func (v *spansByStartKey) Less(i, j int) bool {
      22           1 :         return v.cmp(v.buf[i].Start, v.buf[j].Start) < 0
      23           1 : }
      24           1 : func (v *spansByStartKey) Swap(i, j int) {
      25           1 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
      26           1 : }
      27             : 
      28             : type spansByEndKey struct {
      29             :         cmp base.Compare
      30             :         buf []Span
      31             : }
      32             : 
      33           2 : func (v *spansByEndKey) Len() int { return len(v.buf) }
      34           2 : func (v *spansByEndKey) Less(i, j int) bool {
      35           2 :         return v.cmp(v.buf[i].End, v.buf[j].End) < 0
      36           2 : }
      37           2 : func (v *spansByEndKey) Swap(i, j int) {
      38           2 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
      39           2 : }
      40             : 
      41             : // keysBySeqNumKind sorts spans by the start key's sequence number in
      42             : // descending order. If two spans have equal sequence number, they're compared
      43             : // by key kind in descending order. This ordering matches the ordering of
      44             : // base.InternalCompare among keys with matching user keys.
      45             : type keysBySeqNumKind []Key
      46             : 
      47           2 : func (v *keysBySeqNumKind) Len() int           { return len(*v) }
      48           2 : func (v *keysBySeqNumKind) Less(i, j int) bool { return (*v)[i].Trailer > (*v)[j].Trailer }
      49           2 : func (v *keysBySeqNumKind) Swap(i, j int)      { (*v)[i], (*v)[j] = (*v)[j], (*v)[i] }
      50             : 
      51             : // Sort the spans by start key. This is the ordering required by the
      52             : // Fragmenter. Usually spans are naturally sorted by their start key,
      53             : // but that isn't true for range deletion tombstones in the legacy
      54             : // range-del-v1 block format.
      55           1 : func Sort(cmp base.Compare, spans []Span) {
      56           1 :         sorter := spansByStartKey{
      57           1 :                 cmp: cmp,
      58           1 :                 buf: spans,
      59           1 :         }
      60           1 :         sort.Sort(&sorter)
      61           1 : }
      62             : 
      63             : // Fragmenter fragments a set of spans such that overlapping spans are
      64             : // split at their overlap points. The fragmented spans are output to the
      65             : // supplied Output function.
      66             : type Fragmenter struct {
      67             :         Cmp    base.Compare
      68             :         Format base.FormatKey
      69             :         // Emit is called to emit a fragmented span and its keys. Every key defined
      70             :         // within the emitted Span applies to the entirety of the Span's key span.
      71             :         // Keys are ordered in decreasing order of their sequence numbers, and if
      72             :         // equal, decreasing order of key kind.
      73             :         Emit func(Span)
      74             :         // pending contains the list of pending fragments that have not been
      75             :         // flushed to the block writer. Note that the spans have not been
      76             :         // fragmented on the end keys yet. That happens as the spans are
      77             :         // flushed. All pending spans have the same Start.
      78             :         pending []Span
      79             :         // doneBuf is used to buffer completed span fragments when flushing to a
      80             :         // specific key (e.g. TruncateAndFlushTo). It is cached in the Fragmenter to
      81             :         // allow reuse.
      82             :         doneBuf []Span
      83             :         // sortBuf is used to sort fragments by end key when flushing.
      84             :         sortBuf spansByEndKey
      85             :         // flushBuf is used to sort keys by (seqnum,kind) before emitting.
      86             :         flushBuf keysBySeqNumKind
      87             :         // flushedKey is the key that fragments have been flushed up to. Any
      88             :         // additional spans added to the fragmenter must have a start key >=
      89             :         // flushedKey. A nil value indicates flushedKey has not been set.
      90             :         flushedKey []byte
      91             :         finished   bool
      92             : }
      93             : 
      94           0 : func (f *Fragmenter) checkInvariants(buf []Span) {
      95           0 :         for i := 1; i < len(buf); i++ {
      96           0 :                 if f.Cmp(buf[i].Start, buf[i].End) >= 0 {
      97           0 :                         panic(fmt.Sprintf("pebble: empty pending span invariant violated: %s", buf[i]))
      98             :                 }
      99           0 :                 if f.Cmp(buf[i-1].Start, buf[i].Start) != 0 {
     100           0 :                         panic(fmt.Sprintf("pebble: pending span invariant violated: %s %s",
     101           0 :                                 f.Format(buf[i-1].Start), f.Format(buf[i].Start)))
     102             :                 }
     103             :         }
     104             : }
     105             : 
     106             : // Add adds a span to the fragmenter. Spans may overlap and the
     107             : // fragmenter will internally split them. The spans must be presented in
     108             : // increasing start key order. That is, Add must be called with a series
     109             : // of spans like:
     110             : //
     111             : //      a---e
     112             : //        c---g
     113             : //        c-----i
     114             : //               j---n
     115             : //               j-l
     116             : //
     117             : // We need to fragment the spans at overlap points. In the above
     118             : // example, we'd create:
     119             : //
     120             : //      a-c-e
     121             : //        c-e-g
     122             : //        c-e-g-i
     123             : //               j-l-n
     124             : //               j-l
     125             : //
     126             : // The fragments need to be output sorted by start key, and for equal start
     127             : // keys, sorted by descending sequence number. This last part requires a mild
     128             : // bit of care as the fragments are not created in descending sequence number
     129             : // order.
     130             : //
     131             : // Once a start key has been seen, we know that we'll never see a smaller
     132             : // start key and can thus flush all of the fragments that lie before that
     133             : // start key.
     134             : //
     135             : // Walking through the example above, we start with:
     136             : //
     137             : //      a---e
     138             : //
     139             : // Next we add [c,g) resulting in:
     140             : //
     141             : //      a-c-e
     142             : //        c---g
     143             : //
     144             : // The fragment [a,c) is flushed leaving the pending spans as:
     145             : //
     146             : //      c-e
     147             : //      c---g
     148             : //
     149             : // The next span is [c,i):
     150             : //
     151             : //      c-e
     152             : //      c---g
     153             : //      c-----i
     154             : //
     155             : // No fragments are flushed. The next span is [j,n):
     156             : //
     157             : //      c-e
     158             : //      c---g
     159             : //      c-----i
     160             : //             j---n
     161             : //
     162             : // The fragments [c,e), [c,g) and [c,i) are flushed. We sort these fragments
     163             : // by their end key, then split the fragments on the end keys:
     164             : //
     165             : //      c-e
     166             : //      c-e-g
     167             : //      c-e---i
     168             : //
     169             : // The [c,e) fragments all get flushed leaving:
     170             : //
     171             : //      e-g
     172             : //      e---i
     173             : //
     174             : // This process continues until there are no more fragments to flush.
     175             : //
     176             : // WARNING: the slices backing Start, End, Keys, Key.Suffix and Key.Value are
     177             : // all retained after this method returns and should not be modified. This is
     178             : // safe for spans that are added from a memtable or batch. It is partially
     179             : // unsafe for a span read from an sstable. Specifically, the Keys slice of a
     180             : // Span returned during sstable iteration is only valid until the next iterator
     181             : // operation. The stability of the user keys depend on whether the block is
     182             : // prefix compressed, and in practice Pebble never prefix compresses range
     183             : // deletion and range key blocks, so these keys are stable. Because of this key
     184             : // stability, typically callers only need to perform a shallow clone of the Span
     185             : // before Add-ing it to the fragmenter.
     186             : //
     187             : // Add requires the provided span's keys are sorted in Trailer descending order.
     188           2 : func (f *Fragmenter) Add(s Span) {
     189           2 :         if f.finished {
     190           0 :                 panic("pebble: span fragmenter already finished")
     191           2 :         } else if s.KeysOrder != ByTrailerDesc {
     192           0 :                 panic("pebble: span keys unexpectedly not in trailer descending order")
     193             :         }
     194           2 :         if f.flushedKey != nil {
     195           2 :                 switch c := f.Cmp(s.Start, f.flushedKey); {
     196           1 :                 case c < 0:
     197           1 :                         panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
     198           1 :                                 f.Format(s.Start), f.Format(f.flushedKey)))
     199             :                 }
     200             :         }
     201           2 :         if f.Cmp(s.Start, s.End) >= 0 {
     202           2 :                 // An empty span, we can ignore it.
     203           2 :                 return
     204           2 :         }
     205           2 :         if invariants.RaceEnabled {
     206           0 :                 f.checkInvariants(f.pending)
     207           0 :                 defer func() { f.checkInvariants(f.pending) }()
     208             :         }
     209             : 
     210           2 :         if len(f.pending) > 0 {
     211           2 :                 // Since all of the pending spans have the same start key, we only need
     212           2 :                 // to compare against the first one.
     213           2 :                 switch c := f.Cmp(f.pending[0].Start, s.Start); {
     214           1 :                 case c > 0:
     215           1 :                         panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
     216           1 :                                 f.Format(f.pending[0].Start), f.Format(s.Start)))
     217           2 :                 case c == 0:
     218           2 :                         // The new span has the same start key as the existing pending
     219           2 :                         // spans. Add it to the pending buffer.
     220           2 :                         f.pending = append(f.pending, s)
     221           2 :                         return
     222             :                 }
     223             : 
     224             :                 // At this point we know that the new start key is greater than the pending
     225             :                 // spans start keys.
     226           2 :                 f.truncateAndFlush(s.Start)
     227             :         }
     228             : 
     229           2 :         f.pending = append(f.pending, s)
     230             : }
     231             : 
     232             : // Cover is returned by Framenter.Covers and describes a span's relationship to
     233             : // a key at a particular snapshot.
     234             : type Cover int8
     235             : 
     236             : const (
     237             :         // NoCover indicates the tested key does not fall within the span's bounds,
     238             :         // or the span contains no keys with sequence numbers higher than the key's.
     239             :         NoCover Cover = iota
     240             :         // CoversInvisibly indicates the tested key does fall within the span's
     241             :         // bounds and the span contains at least one key with a higher sequence
     242             :         // number, but none visible at the provided snapshot.
     243             :         CoversInvisibly
     244             :         // CoversVisibly indicates the tested key does fall within the span's
     245             :         // bounds, and the span constains at least one key with a sequence number
     246             :         // higher than the key's sequence number that is visible at the provided
     247             :         // snapshot.
     248             :         CoversVisibly
     249             : )
     250             : 
     251             : // Covers returns an enum indicating whether the specified key is covered by one
     252             : // of the pending keys. The provided key must be consistent with the ordering of
     253             : // the spans. That is, it is invalid to specify a key here that is out of order
     254             : // with the span start keys passed to Add.
     255           2 : func (f *Fragmenter) Covers(key base.InternalKey, snapshot uint64) Cover {
     256           2 :         if f.finished {
     257           0 :                 panic("pebble: span fragmenter already finished")
     258             :         }
     259           2 :         if len(f.pending) == 0 {
     260           2 :                 return NoCover
     261           2 :         }
     262             : 
     263           2 :         if f.Cmp(f.pending[0].Start, key.UserKey) > 0 {
     264           1 :                 panic(fmt.Sprintf("pebble: keys must be in order: %s > %s",
     265           1 :                         f.Format(f.pending[0].Start), key.Pretty(f.Format)))
     266             :         }
     267             : 
     268           2 :         cover := NoCover
     269           2 :         seqNum := key.SeqNum()
     270           2 :         for _, s := range f.pending {
     271           2 :                 if f.Cmp(key.UserKey, s.End) < 0 {
     272           2 :                         // NB: A range deletion tombstone does not delete a point operation
     273           2 :                         // at the same sequence number, and broadly a span is not considered
     274           2 :                         // to cover a point operation at the same sequence number.
     275           2 : 
     276           2 :                         for i := range s.Keys {
     277           2 :                                 if kseq := s.Keys[i].SeqNum(); kseq > seqNum {
     278           2 :                                         // This key from the span has a higher sequence number than
     279           2 :                                         // `key`. It covers `key`, although the span's key might not
     280           2 :                                         // be visible if its snapshot is too high.
     281           2 :                                         //
     282           2 :                                         // Batch keys are always be visible.
     283           2 :                                         if kseq < snapshot || kseq&base.InternalKeySeqNumBatch != 0 {
     284           2 :                                                 return CoversVisibly
     285           2 :                                         }
     286             :                                         // s.Keys[i] is not visible.
     287           2 :                                         cover = CoversInvisibly
     288             :                                 }
     289             :                         }
     290             :                 }
     291             :         }
     292           2 :         return cover
     293             : }
     294             : 
     295             : // Empty returns true if all fragments added so far have finished flushing.
     296           2 : func (f *Fragmenter) Empty() bool {
     297           2 :         return f.finished || len(f.pending) == 0
     298           2 : }
     299             : 
     300             : // TruncateAndFlushTo flushes all of the fragments with a start key <= key,
     301             : // truncating spans to the specified end key. Used during compaction to force
     302             : // emitting of spans which straddle an sstable boundary. Consider
     303             : // the scenario:
     304             : //
     305             : //      a---------k#10
     306             : //           f#8
     307             : //           f#7
     308             : //
     309             : // Let's say the next user key after f is g. Calling TruncateAndFlushTo(g) will
     310             : // flush this span:
     311             : //
     312             : //      a-------g#10
     313             : //           f#8
     314             : //           f#7
     315             : //
     316             : // And leave this one in f.pending:
     317             : //
     318             : //      g----k#10
     319             : //
     320             : // WARNING: The fragmenter could hold on to the specified end key. Ensure it's
     321             : // a safe byte slice that could outlast the current sstable output, and one
     322             : // that will never be modified.
     323           2 : func (f *Fragmenter) TruncateAndFlushTo(key []byte) {
     324           2 :         if f.finished {
     325           0 :                 panic("pebble: span fragmenter already finished")
     326             :         }
     327           2 :         if f.flushedKey != nil {
     328           2 :                 switch c := f.Cmp(key, f.flushedKey); {
     329           1 :                 case c < 0:
     330           1 :                         panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
     331           1 :                                 f.Format(key), f.Format(f.flushedKey)))
     332             :                 }
     333             :         }
     334           2 :         if invariants.RaceEnabled {
     335           0 :                 f.checkInvariants(f.pending)
     336           0 :                 defer func() { f.checkInvariants(f.pending) }()
     337             :         }
     338           2 :         if len(f.pending) > 0 {
     339           2 :                 // Since all of the pending spans have the same start key, we only need
     340           2 :                 // to compare against the first one.
     341           2 :                 switch c := f.Cmp(f.pending[0].Start, key); {
     342           0 :                 case c > 0:
     343           0 :                         panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
     344           0 :                                 f.Format(f.pending[0].Start), f.Format(key)))
     345           1 :                 case c == 0:
     346           1 :                         return
     347             :                 }
     348             :         }
     349           2 :         f.truncateAndFlush(key)
     350             : }
     351             : 
     352             : // Start returns the start key of the first span in the pending buffer, or nil
     353             : // if there are no pending spans. The start key of all pending spans is the same
     354             : // as that of the first one.
     355           2 : func (f *Fragmenter) Start() []byte {
     356           2 :         if len(f.pending) > 0 {
     357           2 :                 return f.pending[0].Start
     358           2 :         }
     359           2 :         return nil
     360             : }
     361             : 
     362             : // Flushes all pending spans up to key (exclusive).
     363             : //
     364             : // WARNING: The specified key is stored without making a copy, so all callers
     365             : // must ensure it is safe.
     366           2 : func (f *Fragmenter) truncateAndFlush(key []byte) {
     367           2 :         f.flushedKey = append(f.flushedKey[:0], key...)
     368           2 :         done := f.doneBuf[:0]
     369           2 :         pending := f.pending
     370           2 :         f.pending = f.pending[:0]
     371           2 : 
     372           2 :         // pending and f.pending share the same underlying storage. As we iterate
     373           2 :         // over pending we append to f.pending, but only one entry is appended in
     374           2 :         // each iteration, after we have read the entry being overwritten.
     375           2 :         for _, s := range pending {
     376           2 :                 if f.Cmp(key, s.End) < 0 {
     377           2 :                         //   s: a--+--e
     378           2 :                         // new:    c------
     379           2 :                         if f.Cmp(s.Start, key) < 0 {
     380           2 :                                 done = append(done, Span{
     381           2 :                                         Start: s.Start,
     382           2 :                                         End:   key,
     383           2 :                                         Keys:  s.Keys,
     384           2 :                                 })
     385           2 :                         }
     386           2 :                         f.pending = append(f.pending, Span{
     387           2 :                                 Start: key,
     388           2 :                                 End:   s.End,
     389           2 :                                 Keys:  s.Keys,
     390           2 :                         })
     391           2 :                 } else {
     392           2 :                         //   s: a-----e
     393           2 :                         // new:       e----
     394           2 :                         done = append(done, s)
     395           2 :                 }
     396             :         }
     397             : 
     398           2 :         f.doneBuf = done[:0]
     399           2 :         f.flush(done, nil)
     400             : }
     401             : 
     402             : // flush a group of range spans to the block. The spans are required to all have
     403             : // the same start key. We flush all span fragments until startKey > lastKey. If
     404             : // lastKey is nil, all span fragments are flushed. The specification of a
     405             : // non-nil lastKey occurs for range deletion tombstones during compaction where
     406             : // we want to flush (but not truncate) all range tombstones that start at or
     407             : // before the first key in the next sstable. Consider:
     408             : //
     409             : //      a---e#10
     410             : //      a------h#9
     411             : //
     412             : // If a compaction splits the sstables at key c we want the first sstable to
     413             : // contain the tombstones [a,e)#10 and [a,e)#9. Fragmentation would naturally
     414             : // produce a tombstone [e,h)#9, but we don't need to output that tombstone to
     415             : // the first sstable.
     416           2 : func (f *Fragmenter) flush(buf []Span, lastKey []byte) {
     417           2 :         if invariants.RaceEnabled {
     418           0 :                 f.checkInvariants(buf)
     419           0 :         }
     420             : 
     421             :         // Sort the spans by end key. This will allow us to walk over the spans and
     422             :         // easily determine the next split point (the smallest end-key).
     423           2 :         f.sortBuf.cmp = f.Cmp
     424           2 :         f.sortBuf.buf = buf
     425           2 :         sort.Sort(&f.sortBuf)
     426           2 : 
     427           2 :         // Loop over the spans, splitting by end key.
     428           2 :         for len(buf) > 0 {
     429           2 :                 // A prefix of spans will end at split. remove represents the count of
     430           2 :                 // that prefix.
     431           2 :                 remove := 1
     432           2 :                 split := buf[0].End
     433           2 :                 f.flushBuf = append(f.flushBuf[:0], buf[0].Keys...)
     434           2 : 
     435           2 :                 for i := 1; i < len(buf); i++ {
     436           2 :                         if f.Cmp(split, buf[i].End) == 0 {
     437           2 :                                 remove++
     438           2 :                         }
     439           2 :                         f.flushBuf = append(f.flushBuf, buf[i].Keys...)
     440             :                 }
     441             : 
     442           2 :                 sort.Sort(&f.flushBuf)
     443           2 : 
     444           2 :                 f.Emit(Span{
     445           2 :                         Start: buf[0].Start,
     446           2 :                         End:   split,
     447           2 :                         // Copy the sorted keys to a new slice.
     448           2 :                         //
     449           2 :                         // This allocation is an unfortunate side effect of the Fragmenter and
     450           2 :                         // the expectation that the spans it produces are available in-memory
     451           2 :                         // indefinitely.
     452           2 :                         //
     453           2 :                         // Eventually, we should be able to replace the fragmenter with the
     454           2 :                         // keyspan.MergingIter which will perform just-in-time
     455           2 :                         // fragmentation, and only guaranteeing the memory lifetime for the
     456           2 :                         // current span. The MergingIter fragments while only needing to
     457           2 :                         // access one Span per level. It only accesses the Span at the
     458           2 :                         // current position for each level. During compactions, we can write
     459           2 :                         // these spans to sstables without retaining previous Spans.
     460           2 :                         Keys: append([]Key(nil), f.flushBuf...),
     461           2 :                 })
     462           2 : 
     463           2 :                 if lastKey != nil && f.Cmp(split, lastKey) > 0 {
     464           0 :                         break
     465             :                 }
     466             : 
     467             :                 // Adjust the start key for every remaining span.
     468           2 :                 buf = buf[remove:]
     469           2 :                 for i := range buf {
     470           2 :                         buf[i].Start = split
     471           2 :                 }
     472             :         }
     473             : }
     474             : 
     475             : // Finish flushes any remaining fragments to the output. It is an error to call
     476             : // this if any other spans will be added.
     477           2 : func (f *Fragmenter) Finish() {
     478           2 :         if f.finished {
     479           0 :                 panic("pebble: span fragmenter already finished")
     480             :         }
     481           2 :         f.flush(f.pending, nil)
     482           2 :         f.finished = true
     483             : }

Generated by: LCOV version 1.14