LCOV - code coverage report
Current view: top level - pebble - compaction_iter.go (source / functions) Hit Total Coverage
Test: 2023-09-12 08:17Z 1efa535d - tests only.lcov Lines: 724 780 92.8 %
Date: 2023-09-12 08:18:13 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "encoding/binary"
      10             :         "fmt"
      11             :         "io"
      12             :         "sort"
      13             :         "strconv"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/internal/keyspan"
      20             :         "github.com/cockroachdb/pebble/internal/rangekey"
      21             : )
      22             : 
      23             : // compactionIter provides a forward-only iterator that encapsulates the logic
      24             : // for collapsing entries during compaction. It wraps an internal iterator and
      25             : // collapses entries that are no longer necessary because they are shadowed by
      26             : // newer entries. The simplest example of this is when the internal iterator
      27             : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
      28             : // compactionIter collapses the second entry because it is no longer
      29             : // necessary. The high-level structure for compactionIter is to iterate over
      30             : // its internal iterator and output 1 entry for every user-key. There are four
      31             : // complications to this story.
      32             : //
      33             : // 1. Eliding Deletion Tombstones
      34             : //
      35             : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
      36             : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
      37             : // shadows an entry at a lower level. If we're compacting to the base-level in
      38             : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
      39             : // level and can be elided.
      40             : //
      41             : // We can do slightly better than only eliding deletion tombstones at the base
      42             : // level by observing that we can elide a deletion tombstone if there are no
      43             : // sstables that contain the entry's key. This check is performed by
      44             : // elideTombstone.
      45             : //
      46             : // 2. Merges
      47             : //
      48             : // The MERGE operation merges the value for an entry with the existing value
      49             : // for an entry. The logical value of an entry can be composed of a series of
      50             : // merge operations. When compactionIter sees a MERGE, it scans forward in its
      51             : // internal iterator collapsing MERGE operations for the same key until it
      52             : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
      53             : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
      54             : // merged using the specified Merger.
      55             : //
      56             : // An interesting case here occurs when MERGE is combined with SET. Consider
      57             : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
      58             : // reason that the kind is changed to SET is because the SET operation acts as
      59             : // a barrier preventing further merging. This can be seen better in the
      60             : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
      61             : // (older) level and not involved in the compaction. If the compaction of
      62             : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
      63             : // a.MERGE.1 would merge the values together incorrectly.
      64             : //
      65             : // 3. Snapshots
      66             : //
      67             : // Snapshots are lightweight point-in-time views of the DB state. At its core,
      68             : // a snapshot is a sequence number along with a guarantee from Pebble that it
      69             : // will maintain the view of the database at that sequence number. Part of this
      70             : // guarantee is relatively straightforward to achieve. When reading from the
      71             : // database Pebble will ignore sequence numbers that are larger than the
      72             : // snapshot sequence number. The primary complexity with snapshots occurs
      73             : // during compaction: the collapsing of entries that are shadowed by newer
      74             : // entries is at odds with the guarantee that Pebble will maintain the view of
      75             : // the database at the snapshot sequence number. Rather than collapsing entries
      76             : // up to the next user key, compactionIter can only collapse entries up to the
      77             : // next snapshot boundary. That is, every snapshot boundary potentially causes
      78             : // another entry for the same user-key to be emitted. Another way to view this
      79             : // is that snapshots define stripes and entries are collapsed within stripes,
      80             : // but not across stripes. Consider the following scenario:
      81             : //
      82             : //      a.PUT.9
      83             : //      a.DEL.8
      84             : //      a.PUT.7
      85             : //      a.DEL.6
      86             : //      a.PUT.5
      87             : //
      88             : // In the absence of snapshots these entries would be collapsed to
      89             : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
      90             : // be divided into two stripes and collapsed within the stripes:
      91             : //
      92             : //      a.PUT.9        a.PUT.9
      93             : //      a.DEL.8  --->
      94             : //      a.PUT.7
      95             : //      --             --
      96             : //      a.DEL.6  --->  a.DEL.6
      97             : //      a.PUT.5
      98             : //
      99             : // All of the rules described earlier still apply, but they are confined to
     100             : // operate within a snapshot stripe. Snapshots only affect compaction when the
     101             : // snapshot sequence number lies within the range of sequence numbers being
     102             : // compacted. In the above example, a snapshot at sequence number 10 or at
     103             : // sequence number 5 would not have any effect.
     104             : //
     105             : // 4. Range Deletions
     106             : //
     107             : // Range deletions provide the ability to delete all of the keys (and values)
     108             : // in a contiguous range. Range deletions are stored indexed by their start
     109             : // key. The end key of the range is stored in the value. In order to support
     110             : // lookup of the range deletions which overlap with a particular key, the range
     111             : // deletion tombstones need to be fragmented whenever they overlap. This
     112             : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
     113             : // subject to the rules for snapshots. For example, consider the two range
     114             : // tombstones [a,e)#1 and [c,g)#2:
     115             : //
     116             : //      2:     c-------g
     117             : //      1: a-------e
     118             : //
     119             : // These tombstones will be fragmented into:
     120             : //
     121             : //      2:     c---e---g
     122             : //      1: a---c---e
     123             : //
     124             : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
     125             : // depends on whether it is in a new snapshot stripe.
     126             : //
     127             : // In addition to the fragmentation of range tombstones, compaction also needs
     128             : // to take the range tombstones into consideration when outputting normal
     129             : // keys. Just as with point deletions, a range deletion covering an entry can
     130             : // cause the entry to be elided.
     131             : //
     132             : // A note on the stability of keys and values.
     133             : //
     134             : // The stability guarantees of keys and values returned by the iterator tree
     135             : // that backs a compactionIter is nuanced and care must be taken when
     136             : // referencing any returned items.
     137             : //
     138             : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
     139             : // lifetimes that fall into two categories:
     140             : //
     141             : // Lifetime valid for duration of compaction. Range deletion keys and values are
     142             : // stable for the duration of the compaction, due to way in which a
     143             : // compactionIter is typically constructed (i.e. via (*compaction).newInputIter,
     144             : // which wraps the iterator over the range deletion block in a noCloseIter,
     145             : // preventing the release of the backing memory until the compaction is
     146             : // finished).
     147             : //
     148             : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
     149             : // etc.) and values must be cloned / copied following the return from the
     150             : // exported function, and before a subsequent call to Next advances the iterator
     151             : // and mutates the contents of the returned key and value.
     152             : type compactionIter struct {
     153             :         equal Equal
     154             :         merge Merge
     155             :         iter  internalIterator
     156             :         err   error
     157             :         // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKey.UserKey`
     158             :         // and `key.Trailer` is set to `i.iterKey.Trailer`. This is the
     159             :         // case on return from all public methods -- these methods return `key`.
     160             :         // Additionally, it is the internal state when the code is moving to the
     161             :         // next key so it can determine whether the user key has changed from
     162             :         // the previous key.
     163             :         key InternalKey
     164             :         // keyTrailer is updated when `i.key` is updated and holds the key's
     165             :         // original trailer (eg, before any sequence-number zeroing or changes to
     166             :         // key kind).
     167             :         keyTrailer  uint64
     168             :         value       []byte
     169             :         valueCloser io.Closer
     170             :         // Temporary buffer used for storing the previous user key in order to
     171             :         // determine when iteration has advanced to a new user key and thus a new
     172             :         // snapshot stripe.
     173             :         keyBuf []byte
     174             :         // Temporary buffer used for storing the previous value, which may be an
     175             :         // unsafe, i.iter-owned slice that could be altered when the iterator is
     176             :         // advanced.
     177             :         valueBuf []byte
     178             :         // Is the current entry valid?
     179             :         valid            bool
     180             :         iterKey          *InternalKey
     181             :         iterValue        []byte
     182             :         iterStripeChange stripeChangeType
     183             :         // `skip` indicates whether the remaining skippable entries in the current
     184             :         // snapshot stripe should be skipped or processed. An example of a non-
     185             :         // skippable entry is a range tombstone as we need to return it from the
     186             :         // `compactionIter`, even if a key covering its start key has already been
     187             :         // seen in the same stripe. `skip` has no effect when `pos == iterPosNext`.
     188             :         //
     189             :         // TODO(jackson): If we use keyspan.InterleavingIter for range deletions,
     190             :         // like we do for range keys, the only remaining 'non-skippable' key is
     191             :         // the invalid key. We should be able to simplify this logic and remove this
     192             :         // field.
     193             :         skip bool
     194             :         // `pos` indicates the iterator position at the top of `Next()`. Its type's
     195             :         // (`iterPos`) values take on the following meanings in the context of
     196             :         // `compactionIter`.
     197             :         //
     198             :         // - `iterPosCur`: the iterator is at the last key returned.
     199             :         // - `iterPosNext`: the iterator has already been advanced to the next
     200             :         //   candidate key. For example, this happens when processing merge operands,
     201             :         //   where we advance the iterator all the way into the next stripe or next
     202             :         //   user key to ensure we've seen all mergeable operands.
     203             :         // - `iterPosPrev`: this is invalid as compactionIter is forward-only.
     204             :         pos iterPos
     205             :         // `snapshotPinned` indicates whether the last point key returned by the
     206             :         // compaction iterator was only returned because an open snapshot prevents
     207             :         // its elision. This field only applies to point keys, and not to range
     208             :         // deletions or range keys.
     209             :         //
     210             :         // For MERGE, it is possible that doing the merge is interrupted even when
     211             :         // the next point key is in the same stripe. This can happen if the loop in
     212             :         // mergeNext gets interrupted by sameStripeNonSkippable.
     213             :         // sameStripeNonSkippable occurs due to RANGEDELs that sort before
     214             :         // SET/MERGE/DEL with the same seqnum, so the RANGEDEL does not necessarily
     215             :         // delete the subsequent SET/MERGE/DEL keys.
     216             :         snapshotPinned bool
     217             :         // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
     218             :         // snapshotPinned is true. This value is true when the point is obsolete due
     219             :         // to a RANGEDEL but could not be deleted due to a snapshot.
     220             :         //
     221             :         // NB: it may seem that the additional cases that snapshotPinned captures
     222             :         // are harmless in that they can also be used to mark a point as obsolete
     223             :         // (it is merely a duplication of some logic that happens in
     224             :         // Writer.AddWithForceObsolete), but that is not quite accurate as of this
     225             :         // writing -- snapshotPinned originated in stats collection and for a
     226             :         // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
     227             :         // due to a snapshot, the snapshotPinned value for the SET is true.
     228             :         //
     229             :         // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
     230             :         // whether we need forceObsoleteDueToRangeDel.
     231             :         forceObsoleteDueToRangeDel bool
     232             :         // The index of the snapshot for the current key within the snapshots slice.
     233             :         curSnapshotIdx    int
     234             :         curSnapshotSeqNum uint64
     235             :         // The snapshot sequence numbers that need to be maintained. These sequence
     236             :         // numbers define the snapshot stripes (see the Snapshots description
     237             :         // above). The sequence numbers are in ascending order.
     238             :         snapshots []uint64
     239             :         // frontiers holds a heap of user keys that affect compaction behavior when
     240             :         // they're exceeded. Before a new key is returned, the compaction iterator
     241             :         // advances the frontier, notifying any code that subscribed to be notified
     242             :         // when a key was reached. The primary use today is within the
     243             :         // implementation of compactionOutputSplitters in compaction.go. Many of
     244             :         // these splitters wait for the compaction iterator to call Advance(k) when
     245             :         // it's returning a new key. If the key that they're waiting for is
     246             :         // surpassed, these splitters update internal state recording that they
     247             :         // should request a compaction split next time they're asked in
     248             :         // [shouldSplitBefore].
     249             :         frontiers frontiers
     250             :         // Reference to the range deletion tombstone fragmenter (e.g.,
     251             :         // `compaction.rangeDelFrag`).
     252             :         rangeDelFrag *keyspan.Fragmenter
     253             :         rangeKeyFrag *keyspan.Fragmenter
     254             :         // The fragmented tombstones.
     255             :         tombstones []keyspan.Span
     256             :         // The fragmented range keys.
     257             :         rangeKeys []keyspan.Span
     258             :         // Byte allocator for the tombstone keys.
     259             :         alloc               bytealloc.A
     260             :         allowZeroSeqNum     bool
     261             :         elideTombstone      func(key []byte) bool
     262             :         elideRangeTombstone func(start, end []byte) bool
     263             :         // The on-disk format major version. This informs the types of keys that
     264             :         // may be written to disk during a compaction.
     265             :         formatVersion FormatMajorVersion
     266             :         stats         struct {
     267             :                 // count of DELSIZED keys that were missized.
     268             :                 countMissizedDels uint64
     269             :         }
     270             : }
     271             : 
     272             : func newCompactionIter(
     273             :         cmp Compare,
     274             :         equal Equal,
     275             :         formatKey base.FormatKey,
     276             :         merge Merge,
     277             :         iter internalIterator,
     278             :         snapshots []uint64,
     279             :         rangeDelFrag *keyspan.Fragmenter,
     280             :         rangeKeyFrag *keyspan.Fragmenter,
     281             :         allowZeroSeqNum bool,
     282             :         elideTombstone func(key []byte) bool,
     283             :         elideRangeTombstone func(start, end []byte) bool,
     284             :         formatVersion FormatMajorVersion,
     285           1 : ) *compactionIter {
     286           1 :         i := &compactionIter{
     287           1 :                 equal:               equal,
     288           1 :                 merge:               merge,
     289           1 :                 iter:                iter,
     290           1 :                 snapshots:           snapshots,
     291           1 :                 frontiers:           frontiers{cmp: cmp},
     292           1 :                 rangeDelFrag:        rangeDelFrag,
     293           1 :                 rangeKeyFrag:        rangeKeyFrag,
     294           1 :                 allowZeroSeqNum:     allowZeroSeqNum,
     295           1 :                 elideTombstone:      elideTombstone,
     296           1 :                 elideRangeTombstone: elideRangeTombstone,
     297           1 :                 formatVersion:       formatVersion,
     298           1 :         }
     299           1 :         i.rangeDelFrag.Cmp = cmp
     300           1 :         i.rangeDelFrag.Format = formatKey
     301           1 :         i.rangeDelFrag.Emit = i.emitRangeDelChunk
     302           1 :         i.rangeKeyFrag.Cmp = cmp
     303           1 :         i.rangeKeyFrag.Format = formatKey
     304           1 :         i.rangeKeyFrag.Emit = i.emitRangeKeyChunk
     305           1 :         return i
     306           1 : }
     307             : 
     308           1 : func (i *compactionIter) First() (*InternalKey, []byte) {
     309           1 :         if i.err != nil {
     310           0 :                 return nil, nil
     311           0 :         }
     312           1 :         var iterValue LazyValue
     313           1 :         i.iterKey, iterValue = i.iter.First()
     314           1 :         i.iterValue, _, i.err = iterValue.Value(nil)
     315           1 :         if i.err != nil {
     316           0 :                 return nil, nil
     317           0 :         }
     318           1 :         if i.iterKey != nil {
     319           1 :                 i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(i.iterKey.SeqNum(), i.snapshots)
     320           1 :         }
     321           1 :         i.pos = iterPosNext
     322           1 :         i.iterStripeChange = newStripeNewKey
     323           1 :         return i.Next()
     324             : }
     325             : 
     326           1 : func (i *compactionIter) Next() (*InternalKey, []byte) {
     327           1 :         if i.err != nil {
     328           0 :                 return nil, nil
     329           0 :         }
     330             : 
     331             :         // Close the closer for the current value if one was open.
     332           1 :         if i.closeValueCloser() != nil {
     333           0 :                 return nil, nil
     334           0 :         }
     335             : 
     336             :         // Prior to this call to `Next()` we are in one of three situations with
     337             :         // respect to `iterKey` and related state:
     338             :         //
     339             :         // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
     340             :         // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
     341             :         //   To move forward we advance by one key, even if that lands us in the same
     342             :         //   snapshot stripe.
     343             :         // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
     344             :         //   To move forward we skip skippable entries in the stripe.
     345           1 :         if i.pos == iterPosCurForward {
     346           1 :                 if i.skip {
     347           1 :                         i.skipInStripe()
     348           1 :                 } else {
     349           1 :                         i.nextInStripe()
     350           1 :                 }
     351             :         }
     352             : 
     353           1 :         i.pos = iterPosCurForward
     354           1 :         i.valid = false
     355           1 : 
     356           1 :         for i.iterKey != nil {
     357           1 :                 // If we entered a new snapshot stripe with the same key, any key we
     358           1 :                 // return on this iteration is only returned because the open snapshot
     359           1 :                 // prevented it from being elided or merged with the key returned for
     360           1 :                 // the previous stripe. Mark it as pinned so that the compaction loop
     361           1 :                 // can correctly populate output tables' pinned statistics. We might
     362           1 :                 // also set snapshotPinned=true down below if we observe that the key is
     363           1 :                 // deleted by a range deletion in a higher stripe or that this key is a
     364           1 :                 // tombstone that could be elided if only it were in the last snapshot
     365           1 :                 // stripe.
     366           1 :                 i.snapshotPinned = i.iterStripeChange == newStripeSameKey
     367           1 : 
     368           1 :                 if i.iterKey.Kind() == InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKey.Kind()) {
     369           1 :                         // Return the span so the compaction can use it for file truncation and add
     370           1 :                         // it to the relevant fragmenter. We do not set `skip` to true before
     371           1 :                         // returning as there may be a forthcoming point key with the same user key
     372           1 :                         // and sequence number. Such a point key must be visible (i.e., not skipped
     373           1 :                         // over) since we promise point keys are not deleted by range tombstones at
     374           1 :                         // the same sequence number.
     375           1 :                         //
     376           1 :                         // Although, note that `skip` may already be true before reaching here
     377           1 :                         // due to an earlier key in the stripe. Then it is fine to leave it set
     378           1 :                         // to true, as the earlier key must have had a higher sequence number.
     379           1 :                         //
     380           1 :                         // NOTE: there is a subtle invariant violation here in that calling
     381           1 :                         // saveKey and returning a reference to the temporary slice violates
     382           1 :                         // the stability guarantee for range deletion keys. A potential
     383           1 :                         // mediation could return the original iterKey and iterValue
     384           1 :                         // directly, as the backing memory is guaranteed to be stable until
     385           1 :                         // the compaction completes. The violation here is only minor in
     386           1 :                         // that the caller immediately clones the range deletion InternalKey
     387           1 :                         // when passing the key to the deletion fragmenter (see the
     388           1 :                         // call-site in compaction.go).
     389           1 :                         // TODO(travers): address this violation by removing the call to
     390           1 :                         // saveKey and instead return the original iterKey and iterValue.
     391           1 :                         // This goes against the comment on i.key in the struct, and
     392           1 :                         // therefore warrants some investigation.
     393           1 :                         i.saveKey()
     394           1 :                         // TODO(jackson): Handle tracking pinned statistics for range keys
     395           1 :                         // and range deletions. This would require updating
     396           1 :                         // emitRangeDelChunk and rangeKeyCompactionTransform to update
     397           1 :                         // statistics when they apply their own snapshot striping logic.
     398           1 :                         i.snapshotPinned = false
     399           1 :                         i.value = i.iterValue
     400           1 :                         i.valid = true
     401           1 :                         return &i.key, i.value
     402           1 :                 }
     403             : 
     404           1 :                 if cover := i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum); cover == keyspan.CoversVisibly {
     405           1 :                         // A pending range deletion deletes this key. Skip it.
     406           1 :                         i.saveKey()
     407           1 :                         i.skipInStripe()
     408           1 :                         continue
     409           1 :                 } else if cover == keyspan.CoversInvisibly {
     410           1 :                         // i.iterKey would be deleted by a range deletion if there weren't
     411           1 :                         // any open snapshots. Mark it as pinned.
     412           1 :                         //
     413           1 :                         // NB: there are multiple places in this file where we call
     414           1 :                         // i.rangeDelFrag.Covers and this is the only one where we are writing
     415           1 :                         // to i.snapshotPinned. Those other cases occur in mergeNext where the
     416           1 :                         // caller is deciding whether the value should be merged or not, and the
     417           1 :                         // key is in the same snapshot stripe. Hence, snapshotPinned is by
     418           1 :                         // definition false in those cases.
     419           1 :                         i.snapshotPinned = true
     420           1 :                         i.forceObsoleteDueToRangeDel = true
     421           1 :                 } else {
     422           1 :                         i.forceObsoleteDueToRangeDel = false
     423           1 :                 }
     424             : 
     425           1 :                 switch i.iterKey.Kind() {
     426           1 :                 case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     427           1 :                         if i.elideTombstone(i.iterKey.UserKey) {
     428           1 :                                 if i.curSnapshotIdx == 0 {
     429           1 :                                         // If we're at the last snapshot stripe and the tombstone
     430           1 :                                         // can be elided skip skippable keys in the same stripe.
     431           1 :                                         i.saveKey()
     432           1 :                                         i.skipInStripe()
     433           1 :                                         continue
     434           1 :                                 } else {
     435           1 :                                         // We're not at the last snapshot stripe, so the tombstone
     436           1 :                                         // can NOT yet be elided. Mark it as pinned, so that it's
     437           1 :                                         // included in table statistics appropriately.
     438           1 :                                         i.snapshotPinned = true
     439           1 :                                 }
     440             :                         }
     441             : 
     442           1 :                         switch i.iterKey.Kind() {
     443           1 :                         case InternalKeyKindDelete:
     444           1 :                                 i.saveKey()
     445           1 :                                 i.value = i.iterValue
     446           1 :                                 i.valid = true
     447           1 :                                 i.skip = true
     448           1 :                                 return &i.key, i.value
     449             : 
     450           1 :                         case InternalKeyKindDeleteSized:
     451           1 :                                 // We may skip subsequent keys because of this tombstone. Scan
     452           1 :                                 // ahead to see just how much data this tombstone drops and if
     453           1 :                                 // the tombstone's value should be updated accordingly.
     454           1 :                                 return i.deleteSizedNext()
     455             : 
     456           1 :                         case InternalKeyKindSingleDelete:
     457           1 :                                 if i.singleDeleteNext() {
     458           1 :                                         return &i.key, i.value
     459           1 :                                 }
     460           1 :                                 continue
     461             :                         }
     462             : 
     463           1 :                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     464           1 :                         // The key we emit for this entry is a function of the current key
     465           1 :                         // kind, and whether this entry is followed by a DEL/SINGLEDEL
     466           1 :                         // entry. setNext() does the work to move the iterator forward,
     467           1 :                         // preserving the original value, and potentially mutating the key
     468           1 :                         // kind.
     469           1 :                         i.setNext()
     470           1 :                         return &i.key, i.value
     471             : 
     472           1 :                 case InternalKeyKindMerge:
     473           1 :                         // Record the snapshot index before mergeNext as merging
     474           1 :                         // advances the iterator, adjusting curSnapshotIdx.
     475           1 :                         origSnapshotIdx := i.curSnapshotIdx
     476           1 :                         var valueMerger ValueMerger
     477           1 :                         valueMerger, i.err = i.merge(i.iterKey.UserKey, i.iterValue)
     478           1 :                         var change stripeChangeType
     479           1 :                         if i.err == nil {
     480           1 :                                 change = i.mergeNext(valueMerger)
     481           1 :                         }
     482           1 :                         var needDelete bool
     483           1 :                         if i.err == nil {
     484           1 :                                 // includesBase is true whenever we've transformed the MERGE record
     485           1 :                                 // into a SET.
     486           1 :                                 includesBase := i.key.Kind() == InternalKeyKindSet
     487           1 :                                 i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
     488           1 :                         }
     489           1 :                         if i.err == nil {
     490           1 :                                 if needDelete {
     491           1 :                                         i.valid = false
     492           1 :                                         if i.closeValueCloser() != nil {
     493           0 :                                                 return nil, nil
     494           0 :                                         }
     495           1 :                                         continue
     496             :                                 }
     497             :                                 // A non-skippable entry does not necessarily cover later merge
     498             :                                 // operands, so we must not zero the current merge result's seqnum.
     499             :                                 //
     500             :                                 // For example, suppose the forthcoming two keys are a range
     501             :                                 // tombstone, `[a, b)#3`, and a merge operand, `a#3`. Recall that
     502             :                                 // range tombstones do not cover point keys at the same seqnum, so
     503             :                                 // `a#3` is not deleted. The range tombstone will be seen first due
     504             :                                 // to its larger value type. Since it is a non-skippable key, the
     505             :                                 // current merge will not include `a#3`. If we zeroed the current
     506             :                                 // merge result's seqnum, then it would conflict with the upcoming
     507             :                                 // merge including `a#3`, whose seqnum will also be zeroed.
     508           1 :                                 if change != sameStripeNonSkippable {
     509           1 :                                         i.maybeZeroSeqnum(origSnapshotIdx)
     510           1 :                                 }
     511           1 :                                 return &i.key, i.value
     512             :                         }
     513           0 :                         if i.err != nil {
     514           0 :                                 i.valid = false
     515           0 :                                 i.err = base.MarkCorruptionError(i.err)
     516           0 :                         }
     517           0 :                         return nil, nil
     518             : 
     519           1 :                 default:
     520           1 :                         i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
     521           1 :                         i.valid = false
     522           1 :                         return nil, nil
     523             :                 }
     524             :         }
     525             : 
     526           1 :         return nil, nil
     527             : }
     528             : 
     529           1 : func (i *compactionIter) closeValueCloser() error {
     530           1 :         if i.valueCloser == nil {
     531           1 :                 return nil
     532           1 :         }
     533             : 
     534           0 :         i.err = i.valueCloser.Close()
     535           0 :         i.valueCloser = nil
     536           0 :         if i.err != nil {
     537           0 :                 i.valid = false
     538           0 :         }
     539           0 :         return i.err
     540             : }
     541             : 
     542             : // snapshotIndex returns the index of the first sequence number in snapshots
     543             : // which is greater than or equal to seq.
     544           1 : func snapshotIndex(seq uint64, snapshots []uint64) (int, uint64) {
     545           1 :         index := sort.Search(len(snapshots), func(i int) bool {
     546           1 :                 return snapshots[i] > seq
     547           1 :         })
     548           1 :         if index >= len(snapshots) {
     549           1 :                 return index, InternalKeySeqNumMax
     550           1 :         }
     551           1 :         return index, snapshots[index]
     552             : }
     553             : 
     554             : // skipInStripe skips over skippable keys in the same stripe and user key.
     555           1 : func (i *compactionIter) skipInStripe() {
     556           1 :         i.skip = true
     557           1 :         for i.nextInStripe() == sameStripeSkippable {
     558           1 :         }
     559             :         // Reset skip if we landed outside the original stripe. Otherwise, we landed
     560             :         // in the same stripe on a non-skippable key. In that case we should preserve
     561             :         // `i.skip == true` such that later keys in the stripe will continue to be
     562             :         // skipped.
     563           1 :         if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
     564           1 :                 i.skip = false
     565           1 :         }
     566             : }
     567             : 
     568           1 : func (i *compactionIter) iterNext() bool {
     569           1 :         var iterValue LazyValue
     570           1 :         i.iterKey, iterValue = i.iter.Next()
     571           1 :         i.iterValue, _, i.err = iterValue.Value(nil)
     572           1 :         if i.err != nil {
     573           0 :                 i.iterKey = nil
     574           0 :         }
     575           1 :         return i.iterKey != nil
     576             : }
     577             : 
     578             : // stripeChangeType indicates how the snapshot stripe changed relative to the
     579             : // previous key. If no change, it also indicates whether the current entry is
     580             : // skippable. If the snapshot stripe changed, it also indicates whether the new
     581             : // stripe was entered because the iterator progressed onto an entirely new key
     582             : // or entered a new stripe within the same key.
     583             : type stripeChangeType int
     584             : 
     585             : const (
     586             :         newStripeNewKey stripeChangeType = iota
     587             :         newStripeSameKey
     588             :         sameStripeSkippable
     589             :         sameStripeNonSkippable
     590             : )
     591             : 
     592             : // nextInStripe advances the iterator and returns one of the above const ints
     593             : // indicating how its state changed.
     594             : //
     595             : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
     596             : // temporary reference to the original key, so that forward iteration can
     597             : // proceed with a reference to the original key. Care should be taken to avoid
     598             : // overwriting or mutating the saved key or value before they have been returned
     599             : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
     600           1 : func (i *compactionIter) nextInStripe() stripeChangeType {
     601           1 :         i.iterStripeChange = i.nextInStripeHelper()
     602           1 :         return i.iterStripeChange
     603           1 : }
     604             : 
     605             : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
     606             : // nextInStripe and not call nextInStripeHelper.
     607           1 : func (i *compactionIter) nextInStripeHelper() stripeChangeType {
     608           1 :         if !i.iterNext() {
     609           1 :                 return newStripeNewKey
     610           1 :         }
     611           1 :         key := i.iterKey
     612           1 : 
     613           1 :         // NB: The below conditional is an optimization to avoid a user key
     614           1 :         // comparison in many cases. Internal keys with the same user key are
     615           1 :         // ordered in (strictly) descending order by trailer. If the new key has a
     616           1 :         // greater or equal trailer, or the previous key had a zero sequence number,
     617           1 :         // the new key must have a new user key.
     618           1 :         //
     619           1 :         // A couple things make these cases common:
     620           1 :         // - Sequence-number zeroing ensures ~all of the keys in L6 have a zero
     621           1 :         //   sequence number.
     622           1 :         // - Ingested sstables' keys all adopt the same sequence number.
     623           1 :         if i.keyTrailer <= base.InternalKeyZeroSeqnumMaxTrailer || key.Trailer >= i.keyTrailer {
     624           1 :                 if invariants.Enabled && i.equal(i.key.UserKey, key.UserKey) {
     625           0 :                         prevKey := i.key
     626           0 :                         prevKey.Trailer = i.keyTrailer
     627           0 :                         panic(fmt.Sprintf("pebble: invariant violation: %s and %s out of order", key, prevKey))
     628             :                 }
     629           1 :                 i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
     630           1 :                 return newStripeNewKey
     631           1 :         } else if !i.equal(i.key.UserKey, key.UserKey) {
     632           1 :                 i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
     633           1 :                 return newStripeNewKey
     634           1 :         }
     635           1 :         origSnapshotIdx := i.curSnapshotIdx
     636           1 :         i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
     637           1 :         switch key.Kind() {
     638           1 :         case InternalKeyKindRangeDelete:
     639           1 :                 // Range tombstones need to be exposed by the compactionIter to the upper level
     640           1 :                 // `compaction` object, so return them regardless of whether they are in the same
     641           1 :                 // snapshot stripe.
     642           1 :                 if i.curSnapshotIdx == origSnapshotIdx {
     643           1 :                         return sameStripeNonSkippable
     644           1 :                 }
     645           1 :                 return newStripeSameKey
     646           0 :         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
     647           0 :                 // Range keys are interleaved at the max sequence number for a given user
     648           0 :                 // key, so we should not see any more range keys in this stripe.
     649           0 :                 panic("unreachable")
     650           1 :         case InternalKeyKindInvalid:
     651           1 :                 if i.curSnapshotIdx == origSnapshotIdx {
     652           1 :                         return sameStripeNonSkippable
     653           1 :                 }
     654           0 :                 return newStripeSameKey
     655             :         }
     656           1 :         if i.curSnapshotIdx == origSnapshotIdx {
     657           1 :                 return sameStripeSkippable
     658           1 :         }
     659           1 :         return newStripeSameKey
     660             : }
     661             : 
     662           1 : func (i *compactionIter) setNext() {
     663           1 :         // Save the current key.
     664           1 :         i.saveKey()
     665           1 :         i.value = i.iterValue
     666           1 :         i.valid = true
     667           1 :         i.maybeZeroSeqnum(i.curSnapshotIdx)
     668           1 : 
     669           1 :         // There are two cases where we can early return and skip the remaining
     670           1 :         // records in the stripe:
     671           1 :         // - If the DB does not SETWITHDEL.
     672           1 :         // - If this key is already a SETWITHDEL.
     673           1 :         if i.formatVersion < FormatSetWithDelete ||
     674           1 :                 i.iterKey.Kind() == InternalKeyKindSetWithDelete {
     675           1 :                 i.skip = true
     676           1 :                 return
     677           1 :         }
     678             : 
     679             :         // We are iterating forward. Save the current value.
     680           1 :         i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
     681           1 :         i.value = i.valueBuf
     682           1 : 
     683           1 :         // Else, we continue to loop through entries in the stripe looking for a
     684           1 :         // DEL. Note that we may stop *before* encountering a DEL, if one exists.
     685           1 :         for {
     686           1 :                 switch i.nextInStripe() {
     687           1 :                 case newStripeNewKey, newStripeSameKey:
     688           1 :                         i.pos = iterPosNext
     689           1 :                         return
     690           1 :                 case sameStripeNonSkippable:
     691           1 :                         i.pos = iterPosNext
     692           1 :                         // We iterated onto a key that we cannot skip. We can
     693           1 :                         // conservatively transform the original SET into a SETWITHDEL
     694           1 :                         // as an indication that there *may* still be a DEL/SINGLEDEL
     695           1 :                         // under this SET, even if we did not actually encounter one.
     696           1 :                         //
     697           1 :                         // This is safe to do, as:
     698           1 :                         //
     699           1 :                         // - in the case that there *is not* actually a DEL/SINGLEDEL
     700           1 :                         // under this entry, any SINGLEDEL above this now-transformed
     701           1 :                         // SETWITHDEL will become a DEL when the two encounter in a
     702           1 :                         // compaction. The DEL will eventually be elided in a
     703           1 :                         // subsequent compaction. The cost for ensuring correctness is
     704           1 :                         // that this entry is kept around for an additional compaction
     705           1 :                         // cycle(s).
     706           1 :                         //
     707           1 :                         // - in the case there *is* indeed a DEL/SINGLEDEL under us
     708           1 :                         // (but in a different stripe or sstable), then we will have
     709           1 :                         // already done the work to transform the SET into a
     710           1 :                         // SETWITHDEL, and we will skip any additional iteration when
     711           1 :                         // this entry is encountered again in a subsequent compaction.
     712           1 :                         //
     713           1 :                         // Ideally, this codepath would be smart enough to handle the
     714           1 :                         // case of SET <- RANGEDEL <- ... <- DEL/SINGLEDEL <- ....
     715           1 :                         // This requires preserving any RANGEDEL entries we encounter
     716           1 :                         // along the way, then emitting the original (possibly
     717           1 :                         // transformed) key, followed by the RANGEDELs. This requires
     718           1 :                         // a sizable refactoring of the existing code, as nextInStripe
     719           1 :                         // currently returns a sameStripeNonSkippable when it
     720           1 :                         // encounters a RANGEDEL.
     721           1 :                         // TODO(travers): optimize to handle the RANGEDEL case if it
     722           1 :                         // turns out to be a performance problem.
     723           1 :                         i.key.SetKind(InternalKeyKindSetWithDelete)
     724           1 : 
     725           1 :                         // By setting i.skip=true, we are saying that after the
     726           1 :                         // non-skippable key is emitted (which is likely a RANGEDEL),
     727           1 :                         // the remaining point keys that share the same user key as this
     728           1 :                         // saved key should be skipped.
     729           1 :                         i.skip = true
     730           1 :                         return
     731           1 :                 case sameStripeSkippable:
     732           1 :                         // We're still in the same stripe. If this is a
     733           1 :                         // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
     734           1 :                         // Subsequent keys are eligible for skipping.
     735           1 :                         if i.iterKey.Kind() == InternalKeyKindDelete ||
     736           1 :                                 i.iterKey.Kind() == InternalKeyKindSingleDelete ||
     737           1 :                                 i.iterKey.Kind() == InternalKeyKindDeleteSized {
     738           1 :                                 i.key.SetKind(InternalKeyKindSetWithDelete)
     739           1 :                                 i.skip = true
     740           1 :                                 return
     741           1 :                         }
     742           0 :                 default:
     743           0 :                         panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
     744             :                 }
     745             :         }
     746             : }
     747             : 
     748           1 : func (i *compactionIter) mergeNext(valueMerger ValueMerger) stripeChangeType {
     749           1 :         // Save the current key.
     750           1 :         i.saveKey()
     751           1 :         i.valid = true
     752           1 : 
     753           1 :         // Loop looking for older values in the current snapshot stripe and merge
     754           1 :         // them.
     755           1 :         for {
     756           1 :                 if i.nextInStripe() != sameStripeSkippable {
     757           1 :                         i.pos = iterPosNext
     758           1 :                         return i.iterStripeChange
     759           1 :                 }
     760           1 :                 key := i.iterKey
     761           1 :                 switch key.Kind() {
     762           1 :                 case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     763           1 :                         // We've hit a deletion tombstone. Return everything up to this point and
     764           1 :                         // then skip entries until the next snapshot stripe. We change the kind
     765           1 :                         // of the result key to a Set so that it shadows keys in lower
     766           1 :                         // levels. That is, MERGE+DEL -> SET.
     767           1 :                         // We do the same for SingleDelete since SingleDelete is only
     768           1 :                         // permitted (with deterministic behavior) for keys that have been
     769           1 :                         // set once since the last SingleDelete/Delete, so everything
     770           1 :                         // older is acceptable to shadow. Note that this is slightly
     771           1 :                         // different from singleDeleteNext() which implements stricter
     772           1 :                         // semantics in terms of applying the SingleDelete to the single
     773           1 :                         // next Set. But those stricter semantics are not observable to
     774           1 :                         // the end-user since Iterator interprets SingleDelete as Delete.
     775           1 :                         // We could do something more complicated here and consume only a
     776           1 :                         // single Set, and then merge in any following Sets, but that is
     777           1 :                         // complicated wrt code and unnecessary given the narrow permitted
     778           1 :                         // use of SingleDelete.
     779           1 :                         i.key.SetKind(InternalKeyKindSet)
     780           1 :                         i.skip = true
     781           1 :                         return sameStripeSkippable
     782             : 
     783           1 :                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     784           1 :                         if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
     785           1 :                                 // We change the kind of the result key to a Set so that it shadows
     786           1 :                                 // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
     787           1 :                                 // strictly necessary, but provides consistency with the behavior of
     788           1 :                                 // MERGE+DEL.
     789           1 :                                 i.key.SetKind(InternalKeyKindSet)
     790           1 :                                 i.skip = true
     791           1 :                                 return sameStripeSkippable
     792           1 :                         }
     793             : 
     794             :                         // We've hit a Set or SetWithDel value. Merge with the existing
     795             :                         // value and return. We change the kind of the resulting key to a
     796             :                         // Set so that it shadows keys in lower levels. That is:
     797             :                         // MERGE + (SET*) -> SET.
     798           1 :                         i.err = valueMerger.MergeOlder(i.iterValue)
     799           1 :                         if i.err != nil {
     800           0 :                                 i.valid = false
     801           0 :                                 return sameStripeSkippable
     802           0 :                         }
     803           1 :                         i.key.SetKind(InternalKeyKindSet)
     804           1 :                         i.skip = true
     805           1 :                         return sameStripeSkippable
     806             : 
     807           1 :                 case InternalKeyKindMerge:
     808           1 :                         if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
     809           1 :                                 // We change the kind of the result key to a Set so that it shadows
     810           1 :                                 // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
     811           1 :                                 // strictly necessary, but provides consistency with the behavior of
     812           1 :                                 // MERGE+DEL.
     813           1 :                                 i.key.SetKind(InternalKeyKindSet)
     814           1 :                                 i.skip = true
     815           1 :                                 return sameStripeSkippable
     816           1 :                         }
     817             : 
     818             :                         // We've hit another Merge value. Merge with the existing value and
     819             :                         // continue looping.
     820           1 :                         i.err = valueMerger.MergeOlder(i.iterValue)
     821           1 :                         if i.err != nil {
     822           0 :                                 i.valid = false
     823           0 :                                 return sameStripeSkippable
     824           0 :                         }
     825             : 
     826           0 :                 default:
     827           0 :                         i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
     828           0 :                         i.valid = false
     829           0 :                         return sameStripeSkippable
     830             :                 }
     831             :         }
     832             : }
     833             : 
     834           1 : func (i *compactionIter) singleDeleteNext() bool {
     835           1 :         // Save the current key.
     836           1 :         i.saveKey()
     837           1 :         i.value = i.iterValue
     838           1 :         i.valid = true
     839           1 : 
     840           1 :         // Loop until finds a key to be passed to the next level.
     841           1 :         for {
     842           1 :                 if i.nextInStripe() != sameStripeSkippable {
     843           1 :                         i.pos = iterPosNext
     844           1 :                         return true
     845           1 :                 }
     846             : 
     847           1 :                 key := i.iterKey
     848           1 :                 switch key.Kind() {
     849           1 :                 case InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
     850           1 :                         // We've hit a Delete, DeleteSized, Merge, SetWithDelete, transform
     851           1 :                         // the SingleDelete into a full Delete.
     852           1 :                         i.key.SetKind(InternalKeyKindDelete)
     853           1 :                         i.skip = true
     854           1 :                         return true
     855             : 
     856           1 :                 case InternalKeyKindSet:
     857           1 :                         i.nextInStripe()
     858           1 :                         i.valid = false
     859           1 :                         return false
     860             : 
     861           1 :                 case InternalKeyKindSingleDelete:
     862           1 :                         continue
     863             : 
     864           0 :                 default:
     865           0 :                         i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
     866           0 :                         i.valid = false
     867           0 :                         return false
     868             :                 }
     869             :         }
     870             : }
     871             : 
     872             : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
     873             : // these tombstones carry a value that's a varint indicating the size of the
     874             : // entry (len(key)+len(value)) that the tombstone is expected to delete.
     875             : //
     876             : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
     877             : // any, are elided as a result of the tombstone.
     878           1 : func (i *compactionIter) deleteSizedNext() (*base.InternalKey, []byte) {
     879           1 :         i.saveKey()
     880           1 :         i.valid = true
     881           1 :         i.skip = true
     882           1 : 
     883           1 :         // The DELSIZED tombstone may have no value at all. This happens when the
     884           1 :         // tombstone has already deleted the key that the user originally predicted.
     885           1 :         // In this case, we still peek forward in case there's another DELSIZED key
     886           1 :         // with a lower sequence number, in which case we'll adopt its value.
     887           1 :         if len(i.iterValue) == 0 {
     888           0 :                 i.value = i.valueBuf[:0]
     889           1 :         } else {
     890           1 :                 i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
     891           1 :                 i.value = i.valueBuf
     892           1 :         }
     893             : 
     894             :         // Loop through all the keys within this stripe that are skippable.
     895           1 :         i.pos = iterPosNext
     896           1 :         for i.nextInStripe() == sameStripeSkippable {
     897           1 :                 switch i.iterKey.Kind() {
     898           1 :                 case InternalKeyKindDelete, InternalKeyKindDeleteSized:
     899           1 :                         // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
     900           1 :                         // the original DELSIZED tombstone. This can happen in two cases:
     901           1 :                         //
     902           1 :                         // (1) These tombstones were intended to delete two distinct values,
     903           1 :                         //     and this DELSIZED has already dropped the relevant key. For
     904           1 :                         //     example:
     905           1 :                         //
     906           1 :                         //     a.DELSIZED.9   a.SET.7   a.DELSIZED.5   a.SET.4
     907           1 :                         //
     908           1 :                         //     If a.DELSIZED.9 has already deleted a.SET.7, its size has
     909           1 :                         //     already been zeroed out. In this case, we want to adopt the
     910           1 :                         //     value of the DELSIZED with the lower sequence number, in
     911           1 :                         //     case the a.SET.4 key has not yet been elided.
     912           1 :                         //
     913           1 :                         // (2) This DELSIZED was missized. The user thought they were
     914           1 :                         //     deleting a key with this user key, but this user key had
     915           1 :                         //     already been deleted.
     916           1 :                         //
     917           1 :                         // We can differentiate these two cases by examining the length of
     918           1 :                         // the DELSIZED's value. A DELSIZED's value holds the size of both
     919           1 :                         // the user key and value that it intends to delete. For any user
     920           1 :                         // key with a length > 1, a DELSIZED that has not deleted a key must
     921           1 :                         // have a value with a length > 1.
     922           1 :                         //
     923           1 :                         // We treat both cases the same functionally, adopting the identity
     924           1 :                         // of the lower-sequence numbered tombstone. However in the second
     925           1 :                         // case, we also increment the stat counting missized tombstones.
     926           1 :                         if len(i.value) > 0 {
     927           1 :                                 // The original DELSIZED key was missized. The key that the user
     928           1 :                                 // thought they were deleting does not exist.
     929           1 :                                 i.stats.countMissizedDels++
     930           1 :                         }
     931           1 :                         i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
     932           1 :                         i.value = i.valueBuf
     933           1 :                         if i.iterKey.Kind() == InternalKeyKindDelete {
     934           1 :                                 // Convert the DELSIZED to a DEL—The DEL we're eliding may not
     935           1 :                                 // have deleted the key(s) it was intended to yet. The ordinary
     936           1 :                                 // DEL compaction heuristics are better suited at that, plus we
     937           1 :                                 // don't want to count it as a missized DEL. We early exit in
     938           1 :                                 // this case, after skipping the remainder of the snapshot
     939           1 :                                 // stripe.
     940           1 :                                 i.key.SetKind(i.iterKey.Kind())
     941           1 :                                 i.skipInStripe()
     942           1 :                                 return &i.key, i.value
     943           1 :                         }
     944             :                         // Continue, in case we uncover another DELSIZED or a key this
     945             :                         // DELSIZED deletes.
     946           1 :                 default:
     947           1 :                         // If the DELSIZED is value-less, it already deleted the key that it
     948           1 :                         // was intended to delete. This is possible with a sequence like:
     949           1 :                         //
     950           1 :                         //      DELSIZED.8     SET.7     SET.3
     951           1 :                         //
     952           1 :                         // The DELSIZED only describes the size of the SET.7, which in this
     953           1 :                         // case has already been elided. We don't count it as a missizing,
     954           1 :                         // instead converting the DELSIZED to a DEL. Skip the remainder of
     955           1 :                         // the snapshot stripe and return.
     956           1 :                         if len(i.value) == 0 {
     957           1 :                                 i.key.SetKind(InternalKeyKindDelete)
     958           1 :                                 i.skipInStripe()
     959           1 :                                 return &i.key, i.value
     960           1 :                         }
     961             :                         // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
     962             :                         // has a positive size.
     963           1 :                         expectedSize, n := binary.Uvarint(i.value)
     964           1 :                         if n != len(i.value) {
     965           0 :                                 i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
     966           0 :                                 i.valid = false
     967           0 :                                 return nil, nil
     968           0 :                         }
     969           1 :                         elidedSize := uint64(len(i.iterKey.UserKey)) + uint64(len(i.iterValue))
     970           1 :                         if elidedSize != expectedSize {
     971           1 :                                 // The original DELSIZED key was missized. It's unclear what to
     972           1 :                                 // do. The user-provided size was wrong, so it's unlikely to be
     973           1 :                                 // accurate or meaningful. We could:
     974           1 :                                 //
     975           1 :                                 //   1. return the DELSIZED with the original user-provided size unmodified
     976           1 :                                 //   2. return the DELZIZED with a zeroed size to reflect that a key was
     977           1 :                                 //   elided, even if it wasn't the anticipated size.
     978           1 :                                 //   3. subtract the elided size from the estimate and re-encode.
     979           1 :                                 //   4. convert the DELSIZED into a value-less DEL, so that
     980           1 :                                 //      ordinary DEL heuristics apply.
     981           1 :                                 //
     982           1 :                                 // We opt for (4) under the rationale that we can't rely on the
     983           1 :                                 // user-provided size for accuracy, so ordinary DEL heuristics
     984           1 :                                 // are safer.
     985           1 :                                 i.key.SetKind(InternalKeyKindDelete)
     986           1 :                                 i.stats.countMissizedDels++
     987           1 :                         }
     988             :                         // NB: We remove the value regardless of whether the key was sized
     989             :                         // appropriately. The size encoded is 'consumed' the first time it
     990             :                         // meets a key that it deletes.
     991           1 :                         i.value = i.valueBuf[:0]
     992             :                 }
     993             :         }
     994             :         // Reset skip if we landed outside the original stripe. Otherwise, we landed
     995             :         // in the same stripe on a non-skippable key. In that case we should preserve
     996             :         // `i.skip == true` such that later keys in the stripe will continue to be
     997             :         // skipped.
     998           1 :         if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
     999           1 :                 i.skip = false
    1000           1 :         }
    1001           1 :         return &i.key, i.value
    1002             : }
    1003             : 
    1004           1 : func (i *compactionIter) saveKey() {
    1005           1 :         i.keyBuf = append(i.keyBuf[:0], i.iterKey.UserKey...)
    1006           1 :         i.key.UserKey = i.keyBuf
    1007           1 :         i.key.Trailer = i.iterKey.Trailer
    1008           1 :         i.keyTrailer = i.iterKey.Trailer
    1009           1 :         i.frontiers.Advance(i.key.UserKey)
    1010           1 : }
    1011             : 
    1012           1 : func (i *compactionIter) cloneKey(key []byte) []byte {
    1013           1 :         i.alloc, key = i.alloc.Copy(key)
    1014           1 :         return key
    1015           1 : }
    1016             : 
    1017           1 : func (i *compactionIter) Key() InternalKey {
    1018           1 :         return i.key
    1019           1 : }
    1020             : 
    1021           1 : func (i *compactionIter) Value() []byte {
    1022           1 :         return i.value
    1023           1 : }
    1024             : 
    1025           1 : func (i *compactionIter) Valid() bool {
    1026           1 :         return i.valid
    1027           1 : }
    1028             : 
    1029           1 : func (i *compactionIter) Error() error {
    1030           1 :         return i.err
    1031           1 : }
    1032             : 
    1033           1 : func (i *compactionIter) Close() error {
    1034           1 :         err := i.iter.Close()
    1035           1 :         if i.err == nil {
    1036           1 :                 i.err = err
    1037           1 :         }
    1038             : 
    1039             :         // Close the closer for the current value if one was open.
    1040           1 :         if i.valueCloser != nil {
    1041           0 :                 i.err = firstError(i.err, i.valueCloser.Close())
    1042           0 :                 i.valueCloser = nil
    1043           0 :         }
    1044             : 
    1045           1 :         return i.err
    1046             : }
    1047             : 
    1048             : // Tombstones returns a list of pending range tombstones in the fragmenter
    1049             : // up to the specified key, or all pending range tombstones if key = nil.
    1050           1 : func (i *compactionIter) Tombstones(key []byte) []keyspan.Span {
    1051           1 :         if key == nil {
    1052           1 :                 i.rangeDelFrag.Finish()
    1053           1 :         } else {
    1054           1 :                 // The specified end key is exclusive; no versions of the specified
    1055           1 :                 // user key (including range tombstones covering that key) should
    1056           1 :                 // be flushed yet.
    1057           1 :                 i.rangeDelFrag.TruncateAndFlushTo(key)
    1058           1 :         }
    1059           1 :         tombstones := i.tombstones
    1060           1 :         i.tombstones = nil
    1061           1 :         return tombstones
    1062             : }
    1063             : 
    1064             : // RangeKeys returns a list of pending fragmented range keys up to the specified
    1065             : // key, or all pending range keys if key = nil.
    1066           1 : func (i *compactionIter) RangeKeys(key []byte) []keyspan.Span {
    1067           1 :         if key == nil {
    1068           1 :                 i.rangeKeyFrag.Finish()
    1069           1 :         } else {
    1070           1 :                 // The specified end key is exclusive; no versions of the specified
    1071           1 :                 // user key (including range tombstones covering that key) should
    1072           1 :                 // be flushed yet.
    1073           1 :                 i.rangeKeyFrag.TruncateAndFlushTo(key)
    1074           1 :         }
    1075           1 :         rangeKeys := i.rangeKeys
    1076           1 :         i.rangeKeys = nil
    1077           1 :         return rangeKeys
    1078             : }
    1079             : 
    1080           1 : func (i *compactionIter) emitRangeDelChunk(fragmented keyspan.Span) {
    1081           1 :         // Apply the snapshot stripe rules, keeping only the latest tombstone for
    1082           1 :         // each snapshot stripe.
    1083           1 :         currentIdx := -1
    1084           1 :         keys := fragmented.Keys[:0]
    1085           1 :         for _, k := range fragmented.Keys {
    1086           1 :                 idx, _ := snapshotIndex(k.SeqNum(), i.snapshots)
    1087           1 :                 if currentIdx == idx {
    1088           1 :                         continue
    1089             :                 }
    1090           1 :                 if idx == 0 && i.elideRangeTombstone(fragmented.Start, fragmented.End) {
    1091           1 :                         // This is the last snapshot stripe and the range tombstone
    1092           1 :                         // can be elided.
    1093           1 :                         break
    1094             :                 }
    1095             : 
    1096           1 :                 keys = append(keys, k)
    1097           1 :                 if idx == 0 {
    1098           1 :                         // This is the last snapshot stripe.
    1099           1 :                         break
    1100             :                 }
    1101           1 :                 currentIdx = idx
    1102             :         }
    1103           1 :         if len(keys) > 0 {
    1104           1 :                 i.tombstones = append(i.tombstones, keyspan.Span{
    1105           1 :                         Start: fragmented.Start,
    1106           1 :                         End:   fragmented.End,
    1107           1 :                         Keys:  keys,
    1108           1 :                 })
    1109           1 :         }
    1110             : }
    1111             : 
    1112           1 : func (i *compactionIter) emitRangeKeyChunk(fragmented keyspan.Span) {
    1113           1 :         // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
    1114           1 :         // do that here.
    1115           1 :         if len(fragmented.Keys) > 0 {
    1116           1 :                 i.rangeKeys = append(i.rangeKeys, fragmented)
    1117           1 :         }
    1118             : }
    1119             : 
    1120             : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
    1121             : // so improves compression and enables an optimization during forward iteration
    1122             : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
    1123             : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
    1124           1 : func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
    1125           1 :         if !i.allowZeroSeqNum {
    1126           1 :                 // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
    1127           1 :                 // make the determination on a key by key basis, similar to what is done
    1128           1 :                 // for elideTombstone. Need to add a benchmark for compactionIter to verify
    1129           1 :                 // that isn't too expensive.
    1130           1 :                 return
    1131           1 :         }
    1132           1 :         if snapshotIdx > 0 {
    1133           1 :                 // This is not the last snapshot
    1134           1 :                 return
    1135           1 :         }
    1136           1 :         i.key.SetSeqNum(base.SeqNumZero)
    1137             : }
    1138             : 
    1139             : // A frontier is used to monitor a compaction's progression across the user
    1140             : // keyspace.
    1141             : //
    1142             : // A frontier hold a user key boundary that it's concerned with in its `key`
    1143             : // field. If/when the compaction iterator returns an InternalKey with a user key
    1144             : // _k_ such that k ≥ frontier.key, the compaction iterator invokes the
    1145             : // frontier's `reached` function, passing _k_ as its argument.
    1146             : //
    1147             : // The `reached` function returns a new value to use as the key. If `reached`
    1148             : // returns nil, the frontier is forgotten and its `reached` method will not be
    1149             : // invoked again, unless the user calls [Update] to set a new key.
    1150             : //
    1151             : // A frontier's key may be updated outside the context of a `reached`
    1152             : // invocation at any time, through its Update method.
    1153             : type frontier struct {
    1154             :         // container points to the containing *frontiers that was passed to Init
    1155             :         // when the frontier was initialized.
    1156             :         container *frontiers
    1157             : 
    1158             :         // key holds the frontier's current key. If nil, this frontier is inactive
    1159             :         // and its reached func will not be invoked. The value of this key may only
    1160             :         // be updated by the `frontiers` type, or the Update method.
    1161             :         key []byte
    1162             : 
    1163             :         // reached is invoked to inform a frontier that its key has been reached.
    1164             :         // It's invoked with the user key that reached the limit. The `key` argument
    1165             :         // is guaranteed to be ≥ the frontier's key.
    1166             :         //
    1167             :         // After reached is invoked, the frontier's key is updated to the return
    1168             :         // value of `reached`. Note bene, the frontier is permitted to update its
    1169             :         // key to a user key ≤ the argument `key`.
    1170             :         //
    1171             :         // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
    1172             :         // frontier will receive reached(k2) calls until it returns nil or a key
    1173             :         // `k3` such that k2 < k3. This property is useful for frontiers that use
    1174             :         // `reached` invocations to drive iteration through collections of keys that
    1175             :         // may contain multiple keys that are both < k2 and ≥ k1.
    1176             :         reached func(key []byte) (next []byte)
    1177             : }
    1178             : 
    1179             : // Init initializes the frontier with the provided key and reached callback.
    1180             : // The frontier is attached to the provided *frontiers and the provided reached
    1181             : // func will be invoked when the *frontiers is advanced to a key ≥ this
    1182             : // frontier's key.
    1183             : func (f *frontier) Init(
    1184             :         frontiers *frontiers, initialKey []byte, reached func(key []byte) (next []byte),
    1185           1 : ) {
    1186           1 :         *f = frontier{
    1187           1 :                 container: frontiers,
    1188           1 :                 key:       initialKey,
    1189           1 :                 reached:   reached,
    1190           1 :         }
    1191           1 :         if initialKey != nil {
    1192           1 :                 f.container.push(f)
    1193           1 :         }
    1194             : }
    1195             : 
    1196             : // String implements fmt.Stringer.
    1197           1 : func (f *frontier) String() string {
    1198           1 :         return string(f.key)
    1199           1 : }
    1200             : 
    1201             : // Update replaces the existing frontier's key with the provided key. The
    1202             : // frontier's reached func will be invoked when the new key is reached.
    1203           1 : func (f *frontier) Update(key []byte) {
    1204           1 :         c := f.container
    1205           1 :         prevKeyIsNil := f.key == nil
    1206           1 :         f.key = key
    1207           1 :         if prevKeyIsNil {
    1208           1 :                 if key != nil {
    1209           1 :                         c.push(f)
    1210           1 :                 }
    1211           1 :                 return
    1212             :         }
    1213             : 
    1214             :         // Find the frontier within the heap (it must exist within the heap because
    1215             :         // f.key was != nil). If the frontier key is now nil, remove it from the
    1216             :         // heap. Otherwise, fix up its position.
    1217           1 :         for i := 0; i < len(c.items); i++ {
    1218           1 :                 if c.items[i] == f {
    1219           1 :                         if key != nil {
    1220           1 :                                 c.fix(i)
    1221           1 :                         } else {
    1222           1 :                                 n := c.len() - 1
    1223           1 :                                 c.swap(i, n)
    1224           1 :                                 c.down(i, n)
    1225           1 :                                 c.items = c.items[:n]
    1226           1 :                         }
    1227           1 :                         return
    1228             :                 }
    1229             :         }
    1230           0 :         panic("unreachable")
    1231             : }
    1232             : 
    1233             : // frontiers is used to track progression of a task (eg, compaction) across the
    1234             : // keyspace. Clients that want to be informed when the task advances to a key ≥
    1235             : // some frontier may register a frontier, providing a callback. The task calls
    1236             : // `Advance(k)` with each user key encountered, which invokes the `reached` func
    1237             : // on all tracked frontiers with `key`s ≤ k.
    1238             : //
    1239             : // Internally, frontiers is implemented as a simple heap.
    1240             : type frontiers struct {
    1241             :         cmp   Compare
    1242             :         items []*frontier
    1243             : }
    1244             : 
    1245             : // String implements fmt.Stringer.
    1246           1 : func (f *frontiers) String() string {
    1247           1 :         var buf bytes.Buffer
    1248           1 :         for i := 0; i < len(f.items); i++ {
    1249           1 :                 if i > 0 {
    1250           1 :                         fmt.Fprint(&buf, ", ")
    1251           1 :                 }
    1252           1 :                 fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
    1253             :         }
    1254           1 :         return buf.String()
    1255             : }
    1256             : 
    1257             : // Advance notifies all member frontiers with keys ≤ k.
    1258           1 : func (f *frontiers) Advance(k []byte) {
    1259           1 :         for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
    1260           1 :                 // This frontier has been reached. Invoke the closure and update with
    1261           1 :                 // the next frontier.
    1262           1 :                 f.items[0].key = f.items[0].reached(k)
    1263           1 :                 if f.items[0].key == nil {
    1264           1 :                         // This was the final frontier that this user was concerned with.
    1265           1 :                         // Remove it from the heap.
    1266           1 :                         f.pop()
    1267           1 :                 } else {
    1268           1 :                         // Fix up the heap root.
    1269           1 :                         f.fix(0)
    1270           1 :                 }
    1271             :         }
    1272             : }
    1273             : 
    1274           1 : func (f *frontiers) len() int {
    1275           1 :         return len(f.items)
    1276           1 : }
    1277             : 
    1278           1 : func (f *frontiers) less(i, j int) bool {
    1279           1 :         return f.cmp(f.items[i].key, f.items[j].key) < 0
    1280           1 : }
    1281             : 
    1282           1 : func (f *frontiers) swap(i, j int) {
    1283           1 :         f.items[i], f.items[j] = f.items[j], f.items[i]
    1284           1 : }
    1285             : 
    1286             : // fix, up and down are copied from the go stdlib.
    1287             : 
    1288           1 : func (f *frontiers) fix(i int) {
    1289           1 :         if !f.down(i, f.len()) {
    1290           1 :                 f.up(i)
    1291           1 :         }
    1292             : }
    1293             : 
    1294           1 : func (f *frontiers) push(ff *frontier) {
    1295           1 :         n := len(f.items)
    1296           1 :         f.items = append(f.items, ff)
    1297           1 :         f.up(n)
    1298           1 : }
    1299             : 
    1300           1 : func (f *frontiers) pop() *frontier {
    1301           1 :         n := f.len() - 1
    1302           1 :         f.swap(0, n)
    1303           1 :         f.down(0, n)
    1304           1 :         item := f.items[n]
    1305           1 :         f.items = f.items[:n]
    1306           1 :         return item
    1307           1 : }
    1308             : 
    1309           1 : func (f *frontiers) up(j int) {
    1310           1 :         for {
    1311           1 :                 i := (j - 1) / 2 // parent
    1312           1 :                 if i == j || !f.less(j, i) {
    1313           1 :                         break
    1314             :                 }
    1315           1 :                 f.swap(i, j)
    1316           1 :                 j = i
    1317             :         }
    1318             : }
    1319             : 
    1320           1 : func (f *frontiers) down(i0, n int) bool {
    1321           1 :         i := i0
    1322           1 :         for {
    1323           1 :                 j1 := 2*i + 1
    1324           1 :                 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
    1325           1 :                         break
    1326             :                 }
    1327           1 :                 j := j1 // left child
    1328           1 :                 if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
    1329           1 :                         j = j2 // = 2*i + 2  // right child
    1330           1 :                 }
    1331           1 :                 if !f.less(j, i) {
    1332           1 :                         break
    1333             :                 }
    1334           1 :                 f.swap(i, j)
    1335           1 :                 i = j
    1336             :         }
    1337           1 :         return i > i0
    1338             : }

Generated by: LCOV version 1.14