LCOV - code coverage report
Current view: top level - pebble/internal/keyspan - fragmenter.go (source / functions) Hit Total Coverage
Test: 2024-10-09 08:16Z e1b330bd - tests + meta.lcov Lines: 108 133 81.2 %
Date: 2024-10-09 08:17:34 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package keyspan
       6             : 
       7             : import (
       8             :         "fmt"
       9             : 
      10             :         "github.com/cockroachdb/pebble/internal/base"
      11             :         "github.com/cockroachdb/pebble/internal/invariants"
      12             : )
      13             : 
      14             : // Fragmenter fragments a set of spans such that overlapping spans are
      15             : // split at their overlap points. The fragmented spans are output to the
      16             : // supplied Output function.
      17             : type Fragmenter struct {
      18             :         Cmp    base.Compare
      19             :         Format base.FormatKey
      20             :         // Emit is called to emit a fragmented span and its keys. Every key defined
      21             :         // within the emitted Span applies to the entirety of the Span's key span.
      22             :         // Keys are ordered in decreasing order of their sequence numbers, and if
      23             :         // equal, decreasing order of key kind.
      24             :         Emit func(Span)
      25             :         // pending contains the list of pending fragments that have not been
      26             :         // flushed to the block writer. Note that the spans have not been
      27             :         // fragmented on the end keys yet. That happens as the spans are
      28             :         // flushed. All pending spans have the same Start.
      29             :         pending []Span
      30             :         // doneBuf is used to buffer completed span fragments when flushing to a
      31             :         // specific key (e.g. TruncateAndFlushTo). It is cached in the Fragmenter to
      32             :         // allow reuse.
      33             :         doneBuf []Span
      34             :         // flushBuf is used to sort keys by (seqnum,kind) before emitting.
      35             :         flushBuf []Key
      36             :         // flushedKey is the key that fragments have been flushed up to. Any
      37             :         // additional spans added to the fragmenter must have a start key >=
      38             :         // flushedKey. A nil value indicates flushedKey has not been set.
      39             :         flushedKey []byte
      40             :         finished   bool
      41             : }
      42             : 
      43           0 : func (f *Fragmenter) checkInvariants(buf []Span) {
      44           0 :         for i := 1; i < len(buf); i++ {
      45           0 :                 if f.Cmp(buf[i].Start, buf[i].End) >= 0 {
      46           0 :                         panic(fmt.Sprintf("pebble: empty pending span invariant violated: %s", buf[i]))
      47             :                 }
      48           0 :                 if f.Cmp(buf[i-1].Start, buf[i].Start) != 0 {
      49           0 :                         panic(fmt.Sprintf("pebble: pending span invariant violated: %s %s",
      50           0 :                                 f.Format(buf[i-1].Start), f.Format(buf[i].Start)))
      51             :                 }
      52             :         }
      53             : }
      54             : 
      55             : // Add adds a span to the fragmenter. Spans may overlap and the
      56             : // fragmenter will internally split them. The spans must be presented in
      57             : // increasing start key order. That is, Add must be called with a series
      58             : // of spans like:
      59             : //
      60             : //      a---e
      61             : //        c---g
      62             : //        c-----i
      63             : //               j---n
      64             : //               j-l
      65             : //
      66             : // We need to fragment the spans at overlap points. In the above
      67             : // example, we'd create:
      68             : //
      69             : //      a-c-e
      70             : //        c-e-g
      71             : //        c-e-g-i
      72             : //               j-l-n
      73             : //               j-l
      74             : //
      75             : // The fragments need to be output sorted by start key, and for equal start
      76             : // keys, sorted by descending sequence number. This last part requires a mild
      77             : // bit of care as the fragments are not created in descending sequence number
      78             : // order.
      79             : //
      80             : // Once a start key has been seen, we know that we'll never see a smaller
      81             : // start key and can thus flush all of the fragments that lie before that
      82             : // start key.
      83             : //
      84             : // Walking through the example above, we start with:
      85             : //
      86             : //      a---e
      87             : //
      88             : // Next we add [c,g) resulting in:
      89             : //
      90             : //      a-c-e
      91             : //        c---g
      92             : //
      93             : // The fragment [a,c) is flushed leaving the pending spans as:
      94             : //
      95             : //      c-e
      96             : //      c---g
      97             : //
      98             : // The next span is [c,i):
      99             : //
     100             : //      c-e
     101             : //      c---g
     102             : //      c-----i
     103             : //
     104             : // No fragments are flushed. The next span is [j,n):
     105             : //
     106             : //      c-e
     107             : //      c---g
     108             : //      c-----i
     109             : //             j---n
     110             : //
     111             : // The fragments [c,e), [c,g) and [c,i) are flushed. We sort these fragments
     112             : // by their end key, then split the fragments on the end keys:
     113             : //
     114             : //      c-e
     115             : //      c-e-g
     116             : //      c-e---i
     117             : //
     118             : // The [c,e) fragments all get flushed leaving:
     119             : //
     120             : //      e-g
     121             : //      e---i
     122             : //
     123             : // This process continues until there are no more fragments to flush.
     124             : //
     125             : // WARNING: the slices backing Start, End, Keys, Key.Suffix and Key.Value are
     126             : // all retained after this method returns and should not be modified. This is
     127             : // safe for spans that are added from a memtable or batch. It is partially
     128             : // unsafe for a span read from an sstable. Specifically, the Keys slice of a
     129             : // Span returned during sstable iteration is only valid until the next iterator
     130             : // operation. The stability of the user keys depend on whether the block is
     131             : // prefix compressed, and in practice Pebble never prefix compresses range
     132             : // deletion and range key blocks, so these keys are stable. Because of this key
     133             : // stability, typically callers only need to perform a shallow clone of the Span
     134             : // before Add-ing it to the fragmenter.
     135             : //
     136             : // Add requires the provided span's keys are sorted in InternalKeyTrailer descending order.
     137           2 : func (f *Fragmenter) Add(s Span) {
     138           2 :         if f.finished {
     139           0 :                 panic("pebble: span fragmenter already finished")
     140           2 :         } else if s.KeysOrder != ByTrailerDesc {
     141           0 :                 panic("pebble: span keys unexpectedly not in trailer descending order")
     142             :         }
     143           2 :         if f.flushedKey != nil {
     144           2 :                 switch c := f.Cmp(s.Start, f.flushedKey); {
     145           0 :                 case c < 0:
     146           0 :                         panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
     147           0 :                                 f.Format(s.Start), f.Format(f.flushedKey)))
     148             :                 }
     149             :         }
     150           2 :         if f.Cmp(s.Start, s.End) >= 0 {
     151           1 :                 // An empty span, we can ignore it.
     152           1 :                 return
     153           1 :         }
     154           2 :         if invariants.RaceEnabled {
     155           0 :                 f.checkInvariants(f.pending)
     156           0 :                 defer func() { f.checkInvariants(f.pending) }()
     157             :         }
     158             : 
     159           2 :         if len(f.pending) > 0 {
     160           2 :                 // Since all of the pending spans have the same start key, we only need
     161           2 :                 // to compare against the first one.
     162           2 :                 switch c := f.Cmp(f.pending[0].Start, s.Start); {
     163           1 :                 case c > 0:
     164           1 :                         panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
     165           1 :                                 f.Format(f.pending[0].Start), f.Format(s.Start)))
     166           2 :                 case c == 0:
     167           2 :                         // The new span has the same start key as the existing pending
     168           2 :                         // spans. Add it to the pending buffer.
     169           2 :                         f.pending = append(f.pending, s)
     170           2 :                         return
     171             :                 }
     172             : 
     173             :                 // At this point we know that the new start key is greater than the pending
     174             :                 // spans start keys.
     175           2 :                 f.truncateAndFlush(s.Start)
     176             :         }
     177             : 
     178           2 :         f.pending = append(f.pending, s)
     179             : }
     180             : 
     181             : // Empty returns true if all fragments added so far have finished flushing.
     182           0 : func (f *Fragmenter) Empty() bool {
     183           0 :         return f.finished || len(f.pending) == 0
     184           0 : }
     185             : 
     186             : // Start returns the start key of the first span in the pending buffer, or nil
     187             : // if there are no pending spans. The start key of all pending spans is the same
     188             : // as that of the first one.
     189           1 : func (f *Fragmenter) Start() []byte {
     190           1 :         if len(f.pending) > 0 {
     191           1 :                 return f.pending[0].Start
     192           1 :         }
     193           1 :         return nil
     194             : }
     195             : 
     196             : // Truncate truncates all pending spans up to key (exclusive), flushes them, and
     197             : // retains any spans that continue onward for future flushes.
     198           0 : func (f *Fragmenter) Truncate(key []byte) {
     199           0 :         if len(f.pending) > 0 {
     200           0 :                 f.truncateAndFlush(key)
     201           0 :         }
     202             : }
     203             : 
     204             : // Flushes all pending spans up to key (exclusive).
     205             : //
     206             : // WARNING: The specified key is stored without making a copy, so all callers
     207             : // must ensure it is safe.
     208           2 : func (f *Fragmenter) truncateAndFlush(key []byte) {
     209           2 :         f.flushedKey = append(f.flushedKey[:0], key...)
     210           2 :         done := f.doneBuf[:0]
     211           2 :         pending := f.pending
     212           2 :         f.pending = f.pending[:0]
     213           2 : 
     214           2 :         // pending and f.pending share the same underlying storage. As we iterate
     215           2 :         // over pending we append to f.pending, but only one entry is appended in
     216           2 :         // each iteration, after we have read the entry being overwritten.
     217           2 :         for _, s := range pending {
     218           2 :                 if f.Cmp(key, s.End) < 0 {
     219           2 :                         //   s: a--+--e
     220           2 :                         // new:    c------
     221           2 :                         if f.Cmp(s.Start, key) < 0 {
     222           2 :                                 done = append(done, Span{
     223           2 :                                         Start: s.Start,
     224           2 :                                         End:   key,
     225           2 :                                         Keys:  s.Keys,
     226           2 :                                 })
     227           2 :                         }
     228           2 :                         f.pending = append(f.pending, Span{
     229           2 :                                 Start: key,
     230           2 :                                 End:   s.End,
     231           2 :                                 Keys:  s.Keys,
     232           2 :                         })
     233           2 :                 } else {
     234           2 :                         //   s: a-----e
     235           2 :                         // new:       e----
     236           2 :                         done = append(done, s)
     237           2 :                 }
     238             :         }
     239             : 
     240           2 :         f.doneBuf = done[:0]
     241           2 :         f.flush(done, nil)
     242             : }
     243             : 
     244             : // flush a group of range spans to the block. The spans are required to all have
     245             : // the same start key. We flush all span fragments until startKey > lastKey. If
     246             : // lastKey is nil, all span fragments are flushed. The specification of a
     247             : // non-nil lastKey occurs for range deletion tombstones during compaction where
     248             : // we want to flush (but not truncate) all range tombstones that start at or
     249             : // before the first key in the next sstable. Consider:
     250             : //
     251             : //      a---e#10
     252             : //      a------h#9
     253             : //
     254             : // If a compaction splits the sstables at key c we want the first sstable to
     255             : // contain the tombstones [a,e)#10 and [a,e)#9. Fragmentation would naturally
     256             : // produce a tombstone [e,h)#9, but we don't need to output that tombstone to
     257             : // the first sstable.
     258           2 : func (f *Fragmenter) flush(buf []Span, lastKey []byte) {
     259           2 :         if invariants.RaceEnabled {
     260           0 :                 f.checkInvariants(buf)
     261           0 :         }
     262             : 
     263             :         // Sort the spans by end key. This will allow us to walk over the spans and
     264             :         // easily determine the next split point (the smallest end-key).
     265           2 :         SortSpansByEndKey(f.Cmp, buf)
     266           2 : 
     267           2 :         // Loop over the spans, splitting by end key.
     268           2 :         for len(buf) > 0 {
     269           2 :                 // A prefix of spans will end at split. remove represents the count of
     270           2 :                 // that prefix.
     271           2 :                 remove := 1
     272           2 :                 split := buf[0].End
     273           2 :                 f.flushBuf = append(f.flushBuf[:0], buf[0].Keys...)
     274           2 : 
     275           2 :                 for i := 1; i < len(buf); i++ {
     276           2 :                         if f.Cmp(split, buf[i].End) == 0 {
     277           2 :                                 remove++
     278           2 :                         }
     279           2 :                         f.flushBuf = append(f.flushBuf, buf[i].Keys...)
     280             :                 }
     281             : 
     282           2 :                 SortKeysByTrailer(f.flushBuf)
     283           2 : 
     284           2 :                 f.Emit(Span{
     285           2 :                         Start: buf[0].Start,
     286           2 :                         End:   split,
     287           2 :                         // Copy the sorted keys to a new slice.
     288           2 :                         //
     289           2 :                         // This allocation is an unfortunate side effect of the Fragmenter and
     290           2 :                         // the expectation that the spans it produces are available in-memory
     291           2 :                         // indefinitely.
     292           2 :                         //
     293           2 :                         // Eventually, we should be able to replace the fragmenter with the
     294           2 :                         // keyspanimpl.MergingIter which will perform just-in-time
     295           2 :                         // fragmentation, and only guaranteeing the memory lifetime for the
     296           2 :                         // current span. The MergingIter fragments while only needing to
     297           2 :                         // access one Span per level. It only accesses the Span at the
     298           2 :                         // current position for each level. During compactions, we can write
     299           2 :                         // these spans to sstables without retaining previous Spans.
     300           2 :                         Keys: append([]Key(nil), f.flushBuf...),
     301           2 :                 })
     302           2 : 
     303           2 :                 if lastKey != nil && f.Cmp(split, lastKey) > 0 {
     304           0 :                         break
     305             :                 }
     306             : 
     307             :                 // Adjust the start key for every remaining span.
     308           2 :                 buf = buf[remove:]
     309           2 :                 for i := range buf {
     310           2 :                         buf[i].Start = split
     311           2 :                 }
     312             :         }
     313             : }
     314             : 
     315             : // Finish flushes any remaining fragments to the output. It is an error to call
     316             : // this if any other spans will be added.
     317           2 : func (f *Fragmenter) Finish() {
     318           2 :         if f.finished {
     319           0 :                 panic("pebble: span fragmenter already finished")
     320             :         }
     321           2 :         f.flush(f.pending, nil)
     322           2 :         f.finished = true
     323             : }

Generated by: LCOV version 1.14