LCOV - code coverage report
Current view: top level - pebble - compaction_iter.go (source / functions) Hit Total Coverage
Test: 2024-03-31 08:15Z 1c7bcd1c - tests only.lcov Lines: 689 800 86.1 %
Date: 2024-03-31 08:16:11 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "encoding/binary"
       9             :         "io"
      10             :         "sort"
      11             :         "strconv"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      16             :         "github.com/cockroachdb/pebble/internal/compact"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan"
      18             :         "github.com/cockroachdb/pebble/internal/rangekey"
      19             :         "github.com/cockroachdb/redact"
      20             : )
      21             : 
      22             : // compactionIter provides a forward-only iterator that encapsulates the logic
      23             : // for collapsing entries during compaction. It wraps an internal iterator and
      24             : // collapses entries that are no longer necessary because they are shadowed by
      25             : // newer entries. The simplest example of this is when the internal iterator
      26             : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
      27             : // compactionIter collapses the second entry because it is no longer
      28             : // necessary. The high-level structure for compactionIter is to iterate over
      29             : // its internal iterator and output 1 entry for every user-key. There are four
      30             : // complications to this story.
      31             : //
      32             : // 1. Eliding Deletion Tombstones
      33             : //
      34             : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
      35             : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
      36             : // shadows an entry at a lower level. If we're compacting to the base-level in
      37             : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
      38             : // level and can be elided.
      39             : //
      40             : // We can do slightly better than only eliding deletion tombstones at the base
      41             : // level by observing that we can elide a deletion tombstone if there are no
      42             : // sstables that contain the entry's key. This check is performed by
      43             : // elideTombstone.
      44             : //
      45             : // 2. Merges
      46             : //
      47             : // The MERGE operation merges the value for an entry with the existing value
      48             : // for an entry. The logical value of an entry can be composed of a series of
      49             : // merge operations. When compactionIter sees a MERGE, it scans forward in its
      50             : // internal iterator collapsing MERGE operations for the same key until it
      51             : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
      52             : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
      53             : // merged using the specified Merger.
      54             : //
      55             : // An interesting case here occurs when MERGE is combined with SET. Consider
      56             : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
      57             : // reason that the kind is changed to SET is because the SET operation acts as
      58             : // a barrier preventing further merging. This can be seen better in the
      59             : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
      60             : // (older) level and not involved in the compaction. If the compaction of
      61             : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
      62             : // a.MERGE.1 would merge the values together incorrectly.
      63             : //
      64             : // 3. Snapshots
      65             : //
      66             : // Snapshots are lightweight point-in-time views of the DB state. At its core,
      67             : // a snapshot is a sequence number along with a guarantee from Pebble that it
      68             : // will maintain the view of the database at that sequence number. Part of this
      69             : // guarantee is relatively straightforward to achieve. When reading from the
      70             : // database Pebble will ignore sequence numbers that are larger than the
      71             : // snapshot sequence number. The primary complexity with snapshots occurs
      72             : // during compaction: the collapsing of entries that are shadowed by newer
      73             : // entries is at odds with the guarantee that Pebble will maintain the view of
      74             : // the database at the snapshot sequence number. Rather than collapsing entries
      75             : // up to the next user key, compactionIter can only collapse entries up to the
      76             : // next snapshot boundary. That is, every snapshot boundary potentially causes
      77             : // another entry for the same user-key to be emitted. Another way to view this
      78             : // is that snapshots define stripes and entries are collapsed within stripes,
      79             : // but not across stripes. Consider the following scenario:
      80             : //
      81             : //      a.PUT.9
      82             : //      a.DEL.8
      83             : //      a.PUT.7
      84             : //      a.DEL.6
      85             : //      a.PUT.5
      86             : //
      87             : // In the absence of snapshots these entries would be collapsed to
      88             : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
      89             : // be divided into two stripes and collapsed within the stripes:
      90             : //
      91             : //      a.PUT.9        a.PUT.9
      92             : //      a.DEL.8  --->
      93             : //      a.PUT.7
      94             : //      --             --
      95             : //      a.DEL.6  --->  a.DEL.6
      96             : //      a.PUT.5
      97             : //
      98             : // All of the rules described earlier still apply, but they are confined to
      99             : // operate within a snapshot stripe. Snapshots only affect compaction when the
     100             : // snapshot sequence number lies within the range of sequence numbers being
     101             : // compacted. In the above example, a snapshot at sequence number 10 or at
     102             : // sequence number 5 would not have any effect.
     103             : //
     104             : // 4. Range Deletions
     105             : //
     106             : // Range deletions provide the ability to delete all of the keys (and values)
     107             : // in a contiguous range. Range deletions are stored indexed by their start
     108             : // key. The end key of the range is stored in the value. In order to support
     109             : // lookup of the range deletions which overlap with a particular key, the range
     110             : // deletion tombstones need to be fragmented whenever they overlap. This
     111             : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
     112             : // subject to the rules for snapshots. For example, consider the two range
     113             : // tombstones [a,e)#1 and [c,g)#2:
     114             : //
     115             : //      2:     c-------g
     116             : //      1: a-------e
     117             : //
     118             : // These tombstones will be fragmented into:
     119             : //
     120             : //      2:     c---e---g
     121             : //      1: a---c---e
     122             : //
     123             : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
     124             : // depends on whether it is in a new snapshot stripe.
     125             : //
     126             : // In addition to the fragmentation of range tombstones, compaction also needs
     127             : // to take the range tombstones into consideration when outputting normal
     128             : // keys. Just as with point deletions, a range deletion covering an entry can
     129             : // cause the entry to be elided.
     130             : //
     131             : // A note on the stability of keys and values.
     132             : //
     133             : // The stability guarantees of keys and values returned by the iterator tree
     134             : // that backs a compactionIter is nuanced and care must be taken when
     135             : // referencing any returned items.
     136             : //
     137             : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
     138             : // lifetimes that fall into two categories:
     139             : //
     140             : // Lifetime valid for duration of compaction. Range deletion keys and values are
     141             : // stable for the duration of the compaction, due to way in which a
     142             : // compactionIter is typically constructed (i.e. via (*compaction).newInputIter,
     143             : // which wraps the iterator over the range deletion block in a noCloseIter,
     144             : // preventing the release of the backing memory until the compaction is
     145             : // finished).
     146             : //
     147             : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
     148             : // etc.) and values must be cloned / copied following the return from the
     149             : // exported function, and before a subsequent call to Next advances the iterator
     150             : // and mutates the contents of the returned key and value.
     151             : type compactionIter struct {
     152             :         equal Equal
     153             :         merge Merge
     154             :         iter  internalIterator
     155             :         err   error
     156             :         // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKey.UserKey`
     157             :         // and `key.Trailer` is set to `i.iterKey.Trailer`. This is the
     158             :         // case on return from all public methods -- these methods return `key`.
     159             :         // Additionally, it is the internal state when the code is moving to the
     160             :         // next key so it can determine whether the user key has changed from
     161             :         // the previous key.
     162             :         key InternalKey
     163             :         // keyTrailer is updated when `i.key` is updated and holds the key's
     164             :         // original trailer (eg, before any sequence-number zeroing or changes to
     165             :         // key kind).
     166             :         keyTrailer  uint64
     167             :         value       []byte
     168             :         valueCloser io.Closer
     169             :         // Temporary buffer used for storing the previous user key in order to
     170             :         // determine when iteration has advanced to a new user key and thus a new
     171             :         // snapshot stripe.
     172             :         keyBuf []byte
     173             :         // Temporary buffer used for storing the previous value, which may be an
     174             :         // unsafe, i.iter-owned slice that could be altered when the iterator is
     175             :         // advanced.
     176             :         valueBuf []byte
     177             :         // Is the current entry valid?
     178             :         valid            bool
     179             :         iterKey          *InternalKey
     180             :         iterValue        []byte
     181             :         iterStripeChange stripeChangeType
     182             :         // `skip` indicates whether the remaining entries in the current snapshot
     183             :         // stripe should be skipped or processed. `skip` has no effect when `pos ==
     184             :         // iterPosNext`.
     185             :         skip bool
     186             :         // `pos` indicates the iterator position at the top of `Next()`. Its type's
     187             :         // (`iterPos`) values take on the following meanings in the context of
     188             :         // `compactionIter`.
     189             :         //
     190             :         // - `iterPosCur`: the iterator is at the last key returned.
     191             :         // - `iterPosNext`: the iterator has already been advanced to the next
     192             :         //   candidate key. For example, this happens when processing merge operands,
     193             :         //   where we advance the iterator all the way into the next stripe or next
     194             :         //   user key to ensure we've seen all mergeable operands.
     195             :         // - `iterPosPrev`: this is invalid as compactionIter is forward-only.
     196             :         pos iterPos
     197             :         // `snapshotPinned` indicates whether the last point key returned by the
     198             :         // compaction iterator was only returned because an open snapshot prevents
     199             :         // its elision. This field only applies to point keys, and not to range
     200             :         // deletions or range keys.
     201             :         snapshotPinned bool
     202             :         // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
     203             :         // snapshotPinned is true. This value is true when the point is obsolete due
     204             :         // to a RANGEDEL but could not be deleted due to a snapshot.
     205             :         //
     206             :         // NB: it may seem that the additional cases that snapshotPinned captures
     207             :         // are harmless in that they can also be used to mark a point as obsolete
     208             :         // (it is merely a duplication of some logic that happens in
     209             :         // Writer.AddWithForceObsolete), but that is not quite accurate as of this
     210             :         // writing -- snapshotPinned originated in stats collection and for a
     211             :         // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
     212             :         // due to a snapshot, the snapshotPinned value for the SET is true.
     213             :         //
     214             :         // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
     215             :         // whether we need forceObsoleteDueToRangeDel.
     216             :         forceObsoleteDueToRangeDel bool
     217             :         // The index of the snapshot for the current key within the snapshots slice.
     218             :         curSnapshotIdx    int
     219             :         curSnapshotSeqNum uint64
     220             :         // The snapshot sequence numbers that need to be maintained. These sequence
     221             :         // numbers define the snapshot stripes (see the Snapshots description
     222             :         // above). The sequence numbers are in ascending order.
     223             :         snapshots []uint64
     224             :         // frontiers holds a heap of user keys that affect compaction behavior when
     225             :         // they're exceeded. Before a new key is returned, the compaction iterator
     226             :         // advances the frontier, notifying any code that subscribed to be notified
     227             :         // when a key was reached. The primary use today is within the
     228             :         // implementation of compactionOutputSplitters in compaction.go. Many of
     229             :         // these splitters wait for the compaction iterator to call Advance(k) when
     230             :         // it's returning a new key. If the key that they're waiting for is
     231             :         // surpassed, these splitters update internal state recording that they
     232             :         // should request a compaction split next time they're asked in
     233             :         // [shouldSplitBefore].
     234             :         frontiers compact.Frontiers
     235             :         // Reference to the range deletion tombstone fragmenter (e.g.,
     236             :         // `compaction.rangeDelFrag`).
     237             :         // TODO(jackson): We can eliminate range{Del,Key}Frag now that fragmentation
     238             :         // is performed upfront by keyspanimpl.MergingIters.
     239             :         rangeDelFrag *keyspan.Fragmenter
     240             :         rangeKeyFrag *keyspan.Fragmenter
     241             :         // The fragmented tombstones.
     242             :         tombstones []keyspan.Span
     243             :         // The fragmented range keys.
     244             :         rangeKeys []keyspan.Span
     245             :         // Byte allocator for the tombstone keys.
     246             :         alloc                                  bytealloc.A
     247             :         allowZeroSeqNum                        bool
     248             :         elideTombstone                         func(key []byte) bool
     249             :         elideRangeTombstone                    func(start, end []byte) bool
     250             :         ineffectualSingleDeleteCallback        func(userKey []byte)
     251             :         singleDeleteInvariantViolationCallback func(userKey []byte)
     252             :         // The on-disk format major version. This informs the types of keys that
     253             :         // may be written to disk during a compaction.
     254             :         formatVersion FormatMajorVersion
     255             :         stats         struct {
     256             :                 // count of DELSIZED keys that were missized.
     257             :                 countMissizedDels uint64
     258             :         }
     259             : }
     260             : 
     261             : func newCompactionIter(
     262             :         cmp Compare,
     263             :         equal Equal,
     264             :         formatKey base.FormatKey,
     265             :         merge Merge,
     266             :         iter internalIterator,
     267             :         snapshots []uint64,
     268             :         rangeDelFrag *keyspan.Fragmenter,
     269             :         rangeKeyFrag *keyspan.Fragmenter,
     270             :         allowZeroSeqNum bool,
     271             :         elideTombstone func(key []byte) bool,
     272             :         elideRangeTombstone func(start, end []byte) bool,
     273             :         ineffectualSingleDeleteCallback func(userKey []byte),
     274             :         singleDeleteInvariantViolationCallback func(userKey []byte),
     275             :         formatVersion FormatMajorVersion,
     276           1 : ) *compactionIter {
     277           1 :         i := &compactionIter{
     278           1 :                 equal:                                  equal,
     279           1 :                 merge:                                  merge,
     280           1 :                 iter:                                   iter,
     281           1 :                 snapshots:                              snapshots,
     282           1 :                 rangeDelFrag:                           rangeDelFrag,
     283           1 :                 rangeKeyFrag:                           rangeKeyFrag,
     284           1 :                 allowZeroSeqNum:                        allowZeroSeqNum,
     285           1 :                 elideTombstone:                         elideTombstone,
     286           1 :                 elideRangeTombstone:                    elideRangeTombstone,
     287           1 :                 ineffectualSingleDeleteCallback:        ineffectualSingleDeleteCallback,
     288           1 :                 singleDeleteInvariantViolationCallback: singleDeleteInvariantViolationCallback,
     289           1 :                 formatVersion:                          formatVersion,
     290           1 :         }
     291           1 :         i.frontiers.Init(cmp)
     292           1 :         i.rangeDelFrag.Cmp = cmp
     293           1 :         i.rangeDelFrag.Format = formatKey
     294           1 :         i.rangeDelFrag.Emit = i.emitRangeDelChunk
     295           1 :         i.rangeKeyFrag.Cmp = cmp
     296           1 :         i.rangeKeyFrag.Format = formatKey
     297           1 :         i.rangeKeyFrag.Emit = i.emitRangeKeyChunk
     298           1 :         return i
     299           1 : }
     300             : 
     301           1 : func (i *compactionIter) First() (*InternalKey, []byte) {
     302           1 :         if i.err != nil {
     303           0 :                 return nil, nil
     304           0 :         }
     305           1 :         var iterValue LazyValue
     306           1 :         i.iterKey, iterValue = i.iter.First()
     307           1 :         i.iterValue, _, i.err = iterValue.Value(nil)
     308           1 :         if i.err != nil {
     309           0 :                 return nil, nil
     310           0 :         }
     311           1 :         if i.iterKey != nil {
     312           1 :                 i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(i.iterKey.SeqNum(), i.snapshots)
     313           1 :         }
     314           1 :         i.pos = iterPosNext
     315           1 :         i.iterStripeChange = newStripeNewKey
     316           1 :         return i.Next()
     317             : }
     318             : 
     319           1 : func (i *compactionIter) Next() (*InternalKey, []byte) {
     320           1 :         if i.err != nil {
     321           0 :                 return nil, nil
     322           0 :         }
     323             : 
     324             :         // Close the closer for the current value if one was open.
     325           1 :         if i.closeValueCloser() != nil {
     326           0 :                 return nil, nil
     327           0 :         }
     328             : 
     329             :         // Prior to this call to `Next()` we are in one of three situations with
     330             :         // respect to `iterKey` and related state:
     331             :         //
     332             :         // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
     333             :         // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
     334             :         //   To move forward we advance by one key, even if that lands us in the same
     335             :         //   snapshot stripe.
     336             :         // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
     337             :         //   To move forward we skip skippable entries in the stripe.
     338           1 :         if i.pos == iterPosCurForward {
     339           1 :                 if i.skip {
     340           1 :                         i.skipInStripe()
     341           1 :                 } else {
     342           1 :                         i.nextInStripe()
     343           1 :                 }
     344           1 :         } else if i.skip {
     345           0 :                 panic(errors.AssertionFailedf("compaction iterator has skip=true, but iterator is at iterPosNext"))
     346             :         }
     347             : 
     348           1 :         i.pos = iterPosCurForward
     349           1 :         i.valid = false
     350           1 : 
     351           1 :         for i.iterKey != nil {
     352           1 :                 // If we entered a new snapshot stripe with the same key, any key we
     353           1 :                 // return on this iteration is only returned because the open snapshot
     354           1 :                 // prevented it from being elided or merged with the key returned for
     355           1 :                 // the previous stripe. Mark it as pinned so that the compaction loop
     356           1 :                 // can correctly populate output tables' pinned statistics. We might
     357           1 :                 // also set snapshotPinned=true down below if we observe that the key is
     358           1 :                 // deleted by a range deletion in a higher stripe or that this key is a
     359           1 :                 // tombstone that could be elided if only it were in the last snapshot
     360           1 :                 // stripe.
     361           1 :                 i.snapshotPinned = i.iterStripeChange == newStripeSameKey
     362           1 : 
     363           1 :                 if i.iterKey.Kind() == InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKey.Kind()) {
     364           1 :                         // Return the span so the compaction can use it for file truncation and add
     365           1 :                         // it to the relevant fragmenter. In the case of range deletions, we do not
     366           1 :                         // set `skip` to true before returning as there may be any number of point
     367           1 :                         // keys with the same user key and sequence numbers ≥ the range deletion's
     368           1 :                         // sequence number. Such point keys must be visible (i.e., not skipped
     369           1 :                         // over) since we promise point keys are not deleted by range tombstones at
     370           1 :                         // the same sequence number (or higher).
     371           1 :                         //
     372           1 :                         // Note that `skip` must already be false here, because range keys and range
     373           1 :                         // deletions are interleaved at the maximal sequence numbers and neither will
     374           1 :                         // set `skip`=true.
     375           1 :                         if i.skip {
     376           0 :                                 panic(errors.AssertionFailedf("pebble: compaction iterator: skip unexpectedly true"))
     377             :                         }
     378             : 
     379             :                         // NOTE: there is a subtle invariant violation here in that calling
     380             :                         // saveKey and returning a reference to the temporary slice violates
     381             :                         // the stability guarantee for range deletion keys. A potential
     382             :                         // mediation could return the original iterKey and iterValue
     383             :                         // directly, as the backing memory is guaranteed to be stable until
     384             :                         // the compaction completes. The violation here is only minor in
     385             :                         // that the caller immediately clones the range deletion InternalKey
     386             :                         // when passing the key to the deletion fragmenter (see the
     387             :                         // call-site in compaction.go).
     388             :                         // TODO(travers): address this violation by removing the call to
     389             :                         // saveKey and instead return the original iterKey and iterValue.
     390             :                         // This goes against the comment on i.key in the struct, and
     391             :                         // therefore warrants some investigation.
     392           1 :                         i.saveKey()
     393           1 :                         // TODO(jackson): Handle tracking pinned statistics for range keys
     394           1 :                         // and range deletions. This would require updating
     395           1 :                         // emitRangeDelChunk and rangeKeyCompactionTransform to update
     396           1 :                         // statistics when they apply their own snapshot striping logic.
     397           1 :                         i.snapshotPinned = false
     398           1 :                         i.value = i.iterValue
     399           1 :                         i.valid = true
     400           1 :                         return &i.key, i.value
     401             :                 }
     402             : 
     403             :                 // TODO(sumeer): we could avoid calling Covers if i.iterStripeChange ==
     404             :                 // sameStripeSameKey since that check has already been done in
     405             :                 // nextInStripeHelper. However, we also need to handle the case of
     406             :                 // CoversInvisibly below.
     407           1 :                 if cover := i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum); cover == keyspan.CoversVisibly {
     408           1 :                         // A pending range deletion deletes this key. Skip it.
     409           1 :                         i.saveKey()
     410           1 :                         i.skipInStripe()
     411           1 :                         continue
     412           1 :                 } else if cover == keyspan.CoversInvisibly {
     413           1 :                         // i.iterKey would be deleted by a range deletion if there weren't
     414           1 :                         // any open snapshots. Mark it as pinned.
     415           1 :                         //
     416           1 :                         // NB: there are multiple places in this file where we call
     417           1 :                         // i.rangeDelFrag.Covers and this is the only one where we are writing
     418           1 :                         // to i.snapshotPinned. Those other cases occur in mergeNext where the
     419           1 :                         // caller is deciding whether the value should be merged or not, and the
     420           1 :                         // key is in the same snapshot stripe. Hence, snapshotPinned is by
     421           1 :                         // definition false in those cases.
     422           1 :                         i.snapshotPinned = true
     423           1 :                         i.forceObsoleteDueToRangeDel = true
     424           1 :                 } else {
     425           1 :                         i.forceObsoleteDueToRangeDel = false
     426           1 :                 }
     427             : 
     428           1 :                 switch i.iterKey.Kind() {
     429           1 :                 case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     430           1 :                         if i.elideTombstone(i.iterKey.UserKey) {
     431           1 :                                 if i.curSnapshotIdx == 0 {
     432           1 :                                         // If we're at the last snapshot stripe and the tombstone
     433           1 :                                         // can be elided skip skippable keys in the same stripe.
     434           1 :                                         i.saveKey()
     435           1 :                                         if i.key.Kind() == InternalKeyKindSingleDelete {
     436           1 :                                                 i.skipDueToSingleDeleteElision()
     437           1 :                                         } else {
     438           1 :                                                 i.skipInStripe()
     439           1 :                                                 if !i.skip && i.iterStripeChange != newStripeNewKey {
     440           0 :                                                         panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe disabled skip without advancing to new key"))
     441             :                                                 }
     442             :                                         }
     443           1 :                                         if i.iterStripeChange == newStripeSameKey {
     444           0 :                                                 panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe found a new stripe within the same key"))
     445             :                                         }
     446           1 :                                         continue
     447           1 :                                 } else {
     448           1 :                                         // We're not at the last snapshot stripe, so the tombstone
     449           1 :                                         // can NOT yet be elided. Mark it as pinned, so that it's
     450           1 :                                         // included in table statistics appropriately.
     451           1 :                                         i.snapshotPinned = true
     452           1 :                                 }
     453             :                         }
     454             : 
     455           1 :                         switch i.iterKey.Kind() {
     456           1 :                         case InternalKeyKindDelete:
     457           1 :                                 i.saveKey()
     458           1 :                                 i.value = i.iterValue
     459           1 :                                 i.valid = true
     460           1 :                                 i.skip = true
     461           1 :                                 return &i.key, i.value
     462             : 
     463           1 :                         case InternalKeyKindDeleteSized:
     464           1 :                                 // We may skip subsequent keys because of this tombstone. Scan
     465           1 :                                 // ahead to see just how much data this tombstone drops and if
     466           1 :                                 // the tombstone's value should be updated accordingly.
     467           1 :                                 return i.deleteSizedNext()
     468             : 
     469           1 :                         case InternalKeyKindSingleDelete:
     470           1 :                                 if i.singleDeleteNext() {
     471           1 :                                         return &i.key, i.value
     472           1 :                                 } else if i.err != nil {
     473           0 :                                         return nil, nil
     474           0 :                                 }
     475           1 :                                 continue
     476             : 
     477           0 :                         default:
     478           0 :                                 panic(errors.AssertionFailedf(
     479           0 :                                         "unexpected kind %s", redact.SafeString(i.iterKey.Kind().String())))
     480             :                         }
     481             : 
     482           1 :                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     483           1 :                         // The key we emit for this entry is a function of the current key
     484           1 :                         // kind, and whether this entry is followed by a DEL/SINGLEDEL
     485           1 :                         // entry. setNext() does the work to move the iterator forward,
     486           1 :                         // preserving the original value, and potentially mutating the key
     487           1 :                         // kind.
     488           1 :                         i.setNext()
     489           1 :                         if i.err != nil {
     490           1 :                                 return nil, nil
     491           1 :                         }
     492           1 :                         return &i.key, i.value
     493             : 
     494           1 :                 case InternalKeyKindMerge:
     495           1 :                         // Record the snapshot index before mergeNext as merging
     496           1 :                         // advances the iterator, adjusting curSnapshotIdx.
     497           1 :                         origSnapshotIdx := i.curSnapshotIdx
     498           1 :                         var valueMerger ValueMerger
     499           1 :                         valueMerger, i.err = i.merge(i.iterKey.UserKey, i.iterValue)
     500           1 :                         if i.err == nil {
     501           1 :                                 i.mergeNext(valueMerger)
     502           1 :                         }
     503           1 :                         var needDelete bool
     504           1 :                         if i.err == nil {
     505           1 :                                 // includesBase is true whenever we've transformed the MERGE record
     506           1 :                                 // into a SET.
     507           1 :                                 var includesBase bool
     508           1 :                                 switch i.key.Kind() {
     509           1 :                                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     510           1 :                                         includesBase = true
     511           1 :                                 case InternalKeyKindMerge:
     512           0 :                                 default:
     513           0 :                                         panic(errors.AssertionFailedf(
     514           0 :                                                 "unexpected kind %s", redact.SafeString(i.key.Kind().String())))
     515             :                                 }
     516           1 :                                 i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
     517             :                         }
     518           1 :                         if i.err == nil {
     519           1 :                                 if needDelete {
     520           0 :                                         i.valid = false
     521           0 :                                         if i.closeValueCloser() != nil {
     522           0 :                                                 return nil, nil
     523           0 :                                         }
     524           0 :                                         continue
     525             :                                 }
     526             : 
     527           1 :                                 i.maybeZeroSeqnum(origSnapshotIdx)
     528           1 :                                 return &i.key, i.value
     529             :                         }
     530           1 :                         if i.err != nil {
     531           1 :                                 i.valid = false
     532           1 :                                 // TODO(sumeer): why is MarkCorruptionError only being called for
     533           1 :                                 // MERGE?
     534           1 :                                 i.err = base.MarkCorruptionError(i.err)
     535           1 :                         }
     536           1 :                         return nil, nil
     537             : 
     538           1 :                 default:
     539           1 :                         i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
     540           1 :                         i.valid = false
     541           1 :                         return nil, nil
     542             :                 }
     543             :         }
     544             : 
     545           1 :         return nil, nil
     546             : }
     547             : 
     548           1 : func (i *compactionIter) closeValueCloser() error {
     549           1 :         if i.valueCloser == nil {
     550           1 :                 return nil
     551           1 :         }
     552             : 
     553           0 :         i.err = i.valueCloser.Close()
     554           0 :         i.valueCloser = nil
     555           0 :         if i.err != nil {
     556           0 :                 i.valid = false
     557           0 :         }
     558           0 :         return i.err
     559             : }
     560             : 
     561             : // snapshotIndex returns the index of the first sequence number in snapshots
     562             : // which is greater than or equal to seq.
     563           1 : func snapshotIndex(seq uint64, snapshots []uint64) (int, uint64) {
     564           1 :         index := sort.Search(len(snapshots), func(i int) bool {
     565           1 :                 return snapshots[i] > seq
     566           1 :         })
     567           1 :         if index >= len(snapshots) {
     568           1 :                 return index, InternalKeySeqNumMax
     569           1 :         }
     570           1 :         return index, snapshots[index]
     571             : }
     572             : 
     573             : // skipInStripe skips over skippable keys in the same stripe and user key. It
     574             : // may set i.err, in which case i.iterKey will be nil.
     575           1 : func (i *compactionIter) skipInStripe() {
     576           1 :         i.skip = true
     577           1 :         // TODO(sumeer): we can avoid the overhead of calling i.rangeDelFrag.Covers,
     578           1 :         // in this case of nextInStripe, since we are skipping all of them anyway.
     579           1 :         for i.nextInStripe() == sameStripe {
     580           1 :                 if i.err != nil {
     581           0 :                         panic(i.err)
     582             :                 }
     583             :         }
     584             :         // We landed outside the original stripe, so reset skip.
     585           1 :         i.skip = false
     586             : }
     587             : 
     588           1 : func (i *compactionIter) iterNext() bool {
     589           1 :         var iterValue LazyValue
     590           1 :         i.iterKey, iterValue = i.iter.Next()
     591           1 :         i.iterValue, _, i.err = iterValue.Value(nil)
     592           1 :         if i.err != nil {
     593           0 :                 i.iterKey = nil
     594           0 :         }
     595           1 :         return i.iterKey != nil
     596             : }
     597             : 
     598             : // stripeChangeType indicates how the snapshot stripe changed relative to the
     599             : // previous key. If the snapshot stripe changed, it also indicates whether the
     600             : // new stripe was entered because the iterator progressed onto an entirely new
     601             : // key or entered a new stripe within the same key.
     602             : type stripeChangeType int
     603             : 
     604             : const (
     605             :         newStripeNewKey stripeChangeType = iota
     606             :         newStripeSameKey
     607             :         sameStripe
     608             : )
     609             : 
     610             : // nextInStripe advances the iterator and returns one of the above const ints
     611             : // indicating how its state changed.
     612             : //
     613             : // All sameStripe keys that are covered by a RANGEDEL will be skipped and not
     614             : // returned.
     615             : //
     616             : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
     617             : // temporary reference to the original key, so that forward iteration can
     618             : // proceed with a reference to the original key. Care should be taken to avoid
     619             : // overwriting or mutating the saved key or value before they have been returned
     620             : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
     621             : //
     622             : // nextInStripe may set i.err, in which case the return value will be
     623             : // newStripeNewKey, and i.iterKey will be nil.
     624           1 : func (i *compactionIter) nextInStripe() stripeChangeType {
     625           1 :         i.iterStripeChange = i.nextInStripeHelper()
     626           1 :         return i.iterStripeChange
     627           1 : }
     628             : 
     629             : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
     630             : // nextInStripe and not call nextInStripeHelper.
     631           1 : func (i *compactionIter) nextInStripeHelper() stripeChangeType {
     632           1 :         origSnapshotIdx := i.curSnapshotIdx
     633           1 :         for {
     634           1 :                 if !i.iterNext() {
     635           1 :                         return newStripeNewKey
     636           1 :                 }
     637           1 :                 key := i.iterKey
     638           1 : 
     639           1 :                 // Is this a new key? There are two cases:
     640           1 :                 //
     641           1 :                 // 1. The new key has a different user key.
     642           1 :                 // 2. The previous key was an interleaved range deletion or range key
     643           1 :                 //    boundary. These keys are interleaved in the same input iterator
     644           1 :                 //    stream as point keys, but they do not obey the ordinary sequence
     645           1 :                 //    number ordering within a user key. If the previous key was one
     646           1 :                 //    of these keys, we consider the new key a `newStripeNewKey` to
     647           1 :                 //    reflect that it's the beginning of a new stream of point keys.
     648           1 :                 if i.key.IsExclusiveSentinel() || !i.equal(i.key.UserKey, key.UserKey) {
     649           1 :                         i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
     650           1 :                         return newStripeNewKey
     651           1 :                 }
     652             : 
     653             :                 // If i.key and key have the same user key, then
     654             :                 //   1. i.key must not have had a zero sequence number (or it would've be the last
     655             :                 //      key with its user key).
     656             :                 //   2. i.key must have a strictly larger sequence number
     657             :                 // There's an exception in that either key may be a range delete. Range
     658             :                 // deletes may share a sequence number with a point key if the keys were
     659             :                 // ingested together. Range keys may also share the sequence number if they
     660             :                 // were ingested, but range keys are interleaved into the compaction
     661             :                 // iterator's input iterator at the maximal sequence number so their
     662             :                 // original sequence number will not be observed here.
     663           1 :                 if prevSeqNum := base.SeqNumFromTrailer(i.keyTrailer); (prevSeqNum == 0 || prevSeqNum <= key.SeqNum()) &&
     664           1 :                         i.key.Kind() != InternalKeyKindRangeDelete && key.Kind() != InternalKeyKindRangeDelete {
     665           0 :                         prevKey := i.key
     666           0 :                         prevKey.Trailer = i.keyTrailer
     667           0 :                         panic(errors.AssertionFailedf("pebble: invariant violation: %s and %s out of order", prevKey, key))
     668             :                 }
     669             : 
     670           1 :                 i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
     671           1 :                 switch key.Kind() {
     672             :                 case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
     673           0 :                         InternalKeyKindRangeDelete:
     674           0 :                         // Range tombstones and range keys are interleaved at the max
     675           0 :                         // sequence number for a given user key, and the first key after one
     676           0 :                         // is always considered a newStripeNewKey, so we should never reach
     677           0 :                         // this.
     678           0 :                         panic("unreachable")
     679             :                 case InternalKeyKindDelete, InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSingleDelete,
     680           1 :                         InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
     681             :                         // Fall through
     682           1 :                 default:
     683           1 :                         kind := i.iterKey.Kind()
     684           1 :                         i.iterKey = nil
     685           1 :                         i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(kind))
     686           1 :                         i.valid = false
     687           1 :                         return newStripeNewKey
     688             :                 }
     689           1 :                 if i.curSnapshotIdx == origSnapshotIdx {
     690           1 :                         // Same snapshot.
     691           1 :                         if i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
     692           1 :                                 continue
     693             :                         }
     694           1 :                         return sameStripe
     695             :                 }
     696           1 :                 return newStripeSameKey
     697             :         }
     698             : }
     699             : 
     700           1 : func (i *compactionIter) setNext() {
     701           1 :         // Save the current key.
     702           1 :         i.saveKey()
     703           1 :         i.value = i.iterValue
     704           1 :         i.valid = true
     705           1 :         i.maybeZeroSeqnum(i.curSnapshotIdx)
     706           1 : 
     707           1 :         // If this key is already a SETWITHDEL we can early return and skip the remaining
     708           1 :         // records in the stripe:
     709           1 :         if i.iterKey.Kind() == InternalKeyKindSetWithDelete {
     710           1 :                 i.skip = true
     711           1 :                 return
     712           1 :         }
     713             : 
     714             :         // We are iterating forward. Save the current value.
     715           1 :         i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
     716           1 :         i.value = i.valueBuf
     717           1 : 
     718           1 :         // Else, we continue to loop through entries in the stripe looking for a
     719           1 :         // DEL. Note that we may stop *before* encountering a DEL, if one exists.
     720           1 :         //
     721           1 :         // NB: nextInStripe will skip sameStripe keys that are visibly covered by a
     722           1 :         // RANGEDEL. This can include DELs -- this is fine since such DELs don't
     723           1 :         // need to be combined with SET to make SETWITHDEL.
     724           1 :         for {
     725           1 :                 switch i.nextInStripe() {
     726           1 :                 case newStripeNewKey, newStripeSameKey:
     727           1 :                         i.pos = iterPosNext
     728           1 :                         return
     729           1 :                 case sameStripe:
     730           1 :                         // We're still in the same stripe. If this is a
     731           1 :                         // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
     732           1 :                         // Subsequent keys are eligible for skipping.
     733           1 :                         switch i.iterKey.Kind() {
     734           1 :                         case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     735           1 :                                 i.key.SetKind(InternalKeyKindSetWithDelete)
     736           1 :                                 i.skip = true
     737           1 :                                 return
     738           1 :                         case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete:
     739             :                                 // Do nothing
     740           0 :                         default:
     741           0 :                                 i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
     742           0 :                                 i.valid = false
     743             :                         }
     744           0 :                 default:
     745           0 :                         panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
     746             :                 }
     747             :         }
     748             : }
     749             : 
     750           1 : func (i *compactionIter) mergeNext(valueMerger ValueMerger) {
     751           1 :         // Save the current key.
     752           1 :         i.saveKey()
     753           1 :         i.valid = true
     754           1 : 
     755           1 :         // Loop looking for older values in the current snapshot stripe and merge
     756           1 :         // them.
     757           1 :         for {
     758           1 :                 if i.nextInStripe() != sameStripe {
     759           1 :                         i.pos = iterPosNext
     760           1 :                         return
     761           1 :                 }
     762           1 :                 if i.err != nil {
     763           0 :                         panic(i.err)
     764             :                 }
     765             :                 // NB: MERGE#10+RANGEDEL#9 stays a MERGE, since nextInStripe skips
     766             :                 // sameStripe keys that are visibly covered by a RANGEDEL. There may be
     767             :                 // MERGE#7 that is invisibly covered and will be preserved, but there is
     768             :                 // no risk that MERGE#10 and MERGE#7 will get merged in the future as
     769             :                 // the RANGEDEL still exists and will be used in user-facing reads that
     770             :                 // see MERGE#10, and will also eventually cause MERGE#7 to be deleted in
     771             :                 // a compaction.
     772           1 :                 key := i.iterKey
     773           1 :                 switch key.Kind() {
     774           1 :                 case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
     775           1 :                         // We've hit a deletion tombstone. Return everything up to this point and
     776           1 :                         // then skip entries until the next snapshot stripe. We change the kind
     777           1 :                         // of the result key to a Set so that it shadows keys in lower
     778           1 :                         // levels. That is, MERGE+DEL -> SETWITHDEL.
     779           1 :                         //
     780           1 :                         // We do the same for SingleDelete since SingleDelete is only
     781           1 :                         // permitted (with deterministic behavior) for keys that have been
     782           1 :                         // set once since the last SingleDelete/Delete, so everything
     783           1 :                         // older is acceptable to shadow. Note that this is slightly
     784           1 :                         // different from singleDeleteNext() which implements stricter
     785           1 :                         // semantics in terms of applying the SingleDelete to the single
     786           1 :                         // next Set. But those stricter semantics are not observable to
     787           1 :                         // the end-user since Iterator interprets SingleDelete as Delete.
     788           1 :                         // We could do something more complicated here and consume only a
     789           1 :                         // single Set, and then merge in any following Sets, but that is
     790           1 :                         // complicated wrt code and unnecessary given the narrow permitted
     791           1 :                         // use of SingleDelete.
     792           1 :                         i.key.SetKind(InternalKeyKindSetWithDelete)
     793           1 :                         i.skip = true
     794           1 :                         return
     795             : 
     796           1 :                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     797           1 :                         // We've hit a Set or SetWithDel value. Merge with the existing
     798           1 :                         // value and return. We change the kind of the resulting key to a
     799           1 :                         // Set so that it shadows keys in lower levels. That is:
     800           1 :                         // MERGE + (SET*) -> SET.
     801           1 :                         i.err = valueMerger.MergeOlder(i.iterValue)
     802           1 :                         if i.err != nil {
     803           0 :                                 i.valid = false
     804           0 :                                 return
     805           0 :                         }
     806           1 :                         i.key.SetKind(InternalKeyKindSet)
     807           1 :                         i.skip = true
     808           1 :                         return
     809             : 
     810           1 :                 case InternalKeyKindMerge:
     811           1 :                         // We've hit another Merge value. Merge with the existing value and
     812           1 :                         // continue looping.
     813           1 :                         i.err = valueMerger.MergeOlder(i.iterValue)
     814           1 :                         if i.err != nil {
     815           0 :                                 i.valid = false
     816           0 :                                 return
     817           0 :                         }
     818             : 
     819           0 :                 default:
     820           0 :                         i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
     821           0 :                         i.valid = false
     822           0 :                         return
     823             :                 }
     824             :         }
     825             : }
     826             : 
     827             : // singleDeleteNext processes a SingleDelete point tombstone. A SingleDelete, or
     828             : // SINGLEDEL, is unique in that it deletes exactly 1 internal key. It's a
     829             : // performance optimization when the client knows a user key has not been
     830             : // overwritten, allowing the elision of the tombstone earlier, avoiding write
     831             : // amplification.
     832             : //
     833             : // singleDeleteNext returns a boolean indicating whether or not the caller
     834             : // should yield the SingleDelete key to the consumer of the compactionIter. If
     835             : // singleDeleteNext returns false, the caller may consume/elide the
     836             : // SingleDelete.
     837           1 : func (i *compactionIter) singleDeleteNext() bool {
     838           1 :         // Save the current key.
     839           1 :         i.saveKey()
     840           1 :         i.value = i.iterValue
     841           1 :         i.valid = true
     842           1 : 
     843           1 :         // Loop until finds a key to be passed to the next level.
     844           1 :         for {
     845           1 :                 // If we find a key that can't be skipped, return true so that the
     846           1 :                 // caller yields the SingleDelete to the caller.
     847           1 :                 if i.nextInStripe() != sameStripe {
     848           1 :                         // This defers additional error checking regarding single delete
     849           1 :                         // invariants to the compaction where the keys with the same user key as
     850           1 :                         // the single delete are in the same stripe.
     851           1 :                         i.pos = iterPosNext
     852           1 :                         return i.err == nil
     853           1 :                 }
     854           1 :                 if i.err != nil {
     855           0 :                         panic(i.err)
     856             :                 }
     857             :                 // INVARIANT: sameStripe.
     858           1 :                 key := i.iterKey
     859           1 :                 kind := key.Kind()
     860           1 :                 switch kind {
     861           1 :                 case InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
     862           1 :                         if (kind == InternalKeyKindDelete || kind == InternalKeyKindDeleteSized) &&
     863           1 :                                 i.ineffectualSingleDeleteCallback != nil {
     864           1 :                                 i.ineffectualSingleDeleteCallback(i.key.UserKey)
     865           1 :                         }
     866             :                         // We've hit a Delete, DeleteSized, SetWithDelete, transform
     867             :                         // the SingleDelete into a full Delete.
     868           1 :                         i.key.SetKind(InternalKeyKindDelete)
     869           1 :                         i.skip = true
     870           1 :                         return true
     871             : 
     872           1 :                 case InternalKeyKindSet, InternalKeyKindMerge:
     873           1 :                         // This SingleDelete deletes the Set/Merge, and we can now elide the
     874           1 :                         // SingleDel as well. We advance past the Set and return false to
     875           1 :                         // indicate to the main compaction loop that we should NOT yield the
     876           1 :                         // current SingleDel key to the compaction loop.
     877           1 :                         //
     878           1 :                         // NB: singleDeleteNext was called with i.pos == iterPosCurForward, and
     879           1 :                         // after the call to nextInStripe, we are still at iterPosCurForward,
     880           1 :                         // since we are at the key after the Set/Merge that was single deleted.
     881           1 :                         change := i.nextInStripe()
     882           1 :                         switch change {
     883           1 :                         case sameStripe, newStripeSameKey:
     884           1 :                                 // On the same user key.
     885           1 :                                 nextKind := i.iterKey.Kind()
     886           1 :                                 switch nextKind {
     887           1 :                                 case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge:
     888           1 :                                         if i.singleDeleteInvariantViolationCallback != nil {
     889           1 :                                                 // sameStripe keys returned by nextInStripe() are already
     890           1 :                                                 // known to not be covered by a RANGEDEL, so it is an invariant
     891           1 :                                                 // violation. The rare case is newStripeSameKey, where it is a
     892           1 :                                                 // violation if not covered by a RANGEDEL.
     893           1 :                                                 if change == sameStripe ||
     894           1 :                                                         i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum) == keyspan.NoCover {
     895           1 :                                                         i.singleDeleteInvariantViolationCallback(i.key.UserKey)
     896           1 :                                                 }
     897             :                                         }
     898           1 :                                 case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
     899           0 :                                 default:
     900           0 :                                         panic(errors.AssertionFailedf(
     901           0 :                                                 "unexpected internal key kind: %d", errors.Safe(i.iterKey.Kind())))
     902             :                                 }
     903           1 :                         case newStripeNewKey:
     904           0 :                         default:
     905           0 :                                 panic("unreachable")
     906             :                         }
     907           1 :                         i.valid = false
     908           1 :                         return false
     909             : 
     910           1 :                 case InternalKeyKindSingleDelete:
     911           1 :                         // Two single deletes met in a compaction. The first single delete is
     912           1 :                         // ineffectual.
     913           1 :                         if i.ineffectualSingleDeleteCallback != nil {
     914           1 :                                 i.ineffectualSingleDeleteCallback(i.key.UserKey)
     915           1 :                         }
     916             :                         // Continue to apply the second single delete.
     917           1 :                         continue
     918             : 
     919           0 :                 default:
     920           0 :                         i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
     921           0 :                         i.valid = false
     922           0 :                         return false
     923             :                 }
     924             :         }
     925             : }
     926             : 
     927             : // skipDueToSingleDeleteElision is called when the SingleDelete is being
     928             : // elided because it is in the final snapshot stripe and there are no keys
     929             : // with the same user key in lower levels in the LSM (below the files in this
     930             : // compaction).
     931             : //
     932             : // TODO(sumeer): the only difference between singleDeleteNext and
     933             : // skipDueToSingleDeleteElision is the fact that the caller knows it will be
     934             : // eliding the single delete in the latter case. There are some similar things
     935             : // happening in both implementations. My first attempt at combining them into
     936             : // a single method was hard to comprehend. Try again.
     937           1 : func (i *compactionIter) skipDueToSingleDeleteElision() {
     938           1 :         for {
     939           1 :                 stripeChange := i.nextInStripe()
     940           1 :                 if i.err != nil {
     941           0 :                         panic(i.err)
     942             :                 }
     943           1 :                 switch stripeChange {
     944           1 :                 case newStripeNewKey:
     945           1 :                         // The single delete is only now being elided, meaning it did not elide
     946           1 :                         // any keys earlier in its descent down the LSM. We stepped onto a new
     947           1 :                         // user key, meaning that even now at its moment of elision, it still
     948           1 :                         // hasn't elided any other keys. The single delete was ineffectual (a
     949           1 :                         // no-op).
     950           1 :                         if i.ineffectualSingleDeleteCallback != nil {
     951           1 :                                 i.ineffectualSingleDeleteCallback(i.key.UserKey)
     952           1 :                         }
     953           1 :                         i.skip = false
     954           1 :                         return
     955           0 :                 case newStripeSameKey:
     956           0 :                         // This should be impossible. If we're eliding a single delete, we
     957           0 :                         // determined that the tombstone is in the final snapshot stripe, but we
     958           0 :                         // stepped into a new stripe of the same key.
     959           0 :                         panic(errors.AssertionFailedf("eliding single delete followed by same key in new stripe"))
     960           1 :                 case sameStripe:
     961           1 :                         kind := i.iterKey.Kind()
     962           1 :                         switch kind {
     963           1 :                         case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
     964           1 :                                 if i.ineffectualSingleDeleteCallback != nil {
     965           1 :                                         i.ineffectualSingleDeleteCallback(i.key.UserKey)
     966           1 :                                 }
     967           1 :                                 switch kind {
     968           1 :                                 case InternalKeyKindDelete, InternalKeyKindDeleteSized:
     969           1 :                                         i.skipInStripe()
     970           1 :                                         return
     971           1 :                                 case InternalKeyKindSingleDelete:
     972           1 :                                         // Repeat the same with this SingleDelete. We don't want to simply
     973           1 :                                         // call skipInStripe(), since it increases the strength of the
     974           1 :                                         // SingleDel, which hides bugs in the use of single delete.
     975           1 :                                         continue
     976           0 :                                 default:
     977           0 :                                         panic(errors.AssertionFailedf(
     978           0 :                                                 "unexpected internal key kind: %d", errors.Safe(i.iterKey.Kind())))
     979             :                                 }
     980           1 :                         case InternalKeyKindSetWithDelete:
     981           1 :                                 // The SingleDelete should behave like a Delete.
     982           1 :                                 i.skipInStripe()
     983           1 :                                 return
     984           1 :                         case InternalKeyKindSet, InternalKeyKindMerge:
     985           1 :                                 // This SingleDelete deletes the Set/Merge, and we are eliding the
     986           1 :                                 // SingleDel as well. Step to the next key (this is not deleted by the
     987           1 :                                 // SingleDelete).
     988           1 :                                 //
     989           1 :                                 // NB: skipDueToSingleDeleteElision was called with i.pos ==
     990           1 :                                 // iterPosCurForward, and after the call to nextInStripe, we are still
     991           1 :                                 // at iterPosCurForward, since we are at the key after the Set/Merge
     992           1 :                                 // that was single deleted.
     993           1 :                                 change := i.nextInStripe()
     994           1 :                                 if i.err != nil {
     995           0 :                                         panic(i.err)
     996             :                                 }
     997           1 :                                 switch change {
     998           0 :                                 case newStripeSameKey:
     999           0 :                                         panic(errors.AssertionFailedf("eliding single delete followed by same key in new stripe"))
    1000           1 :                                 case newStripeNewKey:
    1001           0 :                                 case sameStripe:
    1002           0 :                                         // On the same key.
    1003           0 :                                         nextKind := i.iterKey.Kind()
    1004           0 :                                         switch nextKind {
    1005           0 :                                         case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge:
    1006           0 :                                                 if i.singleDeleteInvariantViolationCallback != nil {
    1007           0 :                                                         i.singleDeleteInvariantViolationCallback(i.key.UserKey)
    1008           0 :                                                 }
    1009           0 :                                         case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
    1010           0 :                                         default:
    1011           0 :                                                 panic(errors.AssertionFailedf(
    1012           0 :                                                         "unexpected internal key kind: %d", errors.Safe(i.iterKey.Kind())))
    1013             :                                         }
    1014           0 :                                 default:
    1015           0 :                                         panic("unreachable")
    1016             :                                 }
    1017             :                                 // Whether in same stripe or new stripe, this key is not consumed by
    1018             :                                 // the SingleDelete.
    1019           1 :                                 i.skip = false
    1020           1 :                                 return
    1021           0 :                         default:
    1022           0 :                                 panic(errors.AssertionFailedf(
    1023           0 :                                         "unexpected internal key kind: %d", errors.Safe(i.iterKey.Kind())))
    1024             :                         }
    1025           0 :                 default:
    1026           0 :                         panic("unreachable")
    1027             :                 }
    1028             :         }
    1029             : }
    1030             : 
    1031             : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
    1032             : // these tombstones carry a value that's a varint indicating the size of the
    1033             : // entry (len(key)+len(value)) that the tombstone is expected to delete.
    1034             : //
    1035             : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
    1036             : // any, are elided as a result of the tombstone.
    1037           1 : func (i *compactionIter) deleteSizedNext() (*base.InternalKey, []byte) {
    1038           1 :         i.saveKey()
    1039           1 :         i.valid = true
    1040           1 :         i.skip = true
    1041           1 : 
    1042           1 :         // The DELSIZED tombstone may have no value at all. This happens when the
    1043           1 :         // tombstone has already deleted the key that the user originally predicted.
    1044           1 :         // In this case, we still peek forward in case there's another DELSIZED key
    1045           1 :         // with a lower sequence number, in which case we'll adopt its value.
    1046           1 :         if len(i.iterValue) == 0 {
    1047           1 :                 i.value = i.valueBuf[:0]
    1048           1 :         } else {
    1049           1 :                 i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
    1050           1 :                 i.value = i.valueBuf
    1051           1 :         }
    1052             : 
    1053             :         // Loop through all the keys within this stripe that are skippable.
    1054           1 :         i.pos = iterPosNext
    1055           1 :         for i.nextInStripe() == sameStripe {
    1056           1 :                 if i.err != nil {
    1057           0 :                         panic(i.err)
    1058             :                 }
    1059           1 :                 switch i.iterKey.Kind() {
    1060           1 :                 case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
    1061           1 :                         // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
    1062           1 :                         // the original DELSIZED tombstone. This can happen in two cases:
    1063           1 :                         //
    1064           1 :                         // (1) These tombstones were intended to delete two distinct values,
    1065           1 :                         //     and this DELSIZED has already dropped the relevant key. For
    1066           1 :                         //     example:
    1067           1 :                         //
    1068           1 :                         //     a.DELSIZED.9   a.SET.7   a.DELSIZED.5   a.SET.4
    1069           1 :                         //
    1070           1 :                         //     If a.DELSIZED.9 has already deleted a.SET.7, its size has
    1071           1 :                         //     already been zeroed out. In this case, we want to adopt the
    1072           1 :                         //     value of the DELSIZED with the lower sequence number, in
    1073           1 :                         //     case the a.SET.4 key has not yet been elided.
    1074           1 :                         //
    1075           1 :                         // (2) This DELSIZED was missized. The user thought they were
    1076           1 :                         //     deleting a key with this user key, but this user key had
    1077           1 :                         //     already been deleted.
    1078           1 :                         //
    1079           1 :                         // We can differentiate these two cases by examining the length of
    1080           1 :                         // the DELSIZED's value. A DELSIZED's value holds the size of both
    1081           1 :                         // the user key and value that it intends to delete. For any user
    1082           1 :                         // key with a length > 0, a DELSIZED that has not deleted a key must
    1083           1 :                         // have a value with a length > 0.
    1084           1 :                         //
    1085           1 :                         // We treat both cases the same functionally, adopting the identity
    1086           1 :                         // of the lower-sequence numbered tombstone. However in the second
    1087           1 :                         // case, we also increment the stat counting missized tombstones.
    1088           1 :                         if len(i.value) > 0 {
    1089           1 :                                 // The original DELSIZED key was missized. The key that the user
    1090           1 :                                 // thought they were deleting does not exist.
    1091           1 :                                 i.stats.countMissizedDels++
    1092           1 :                         }
    1093           1 :                         i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
    1094           1 :                         i.value = i.valueBuf
    1095           1 :                         if i.iterKey.Kind() != InternalKeyKindDeleteSized {
    1096           1 :                                 // Convert the DELSIZED to a DEL—The DEL/SINGLEDEL we're eliding
    1097           1 :                                 // may not have deleted the key(s) it was intended to yet. The
    1098           1 :                                 // ordinary DEL compaction heuristics are better suited at that,
    1099           1 :                                 // plus we don't want to count it as a missized DEL. We early
    1100           1 :                                 // exit in this case, after skipping the remainder of the
    1101           1 :                                 // snapshot stripe.
    1102           1 :                                 i.key.SetKind(InternalKeyKindDelete)
    1103           1 :                                 // NB: We skipInStripe now, rather than returning leaving
    1104           1 :                                 // i.skip=true and returning early, because Next() requires
    1105           1 :                                 // that i.skip=true only if i.iterPos = iterPosCurForward.
    1106           1 :                                 //
    1107           1 :                                 // Ignore any error caused by skipInStripe since it does not affect
    1108           1 :                                 // the key/value being returned here, and the next call to Next() will
    1109           1 :                                 // expose it.
    1110           1 :                                 i.skipInStripe()
    1111           1 :                                 return &i.key, i.value
    1112           1 :                         }
    1113             :                         // Continue, in case we uncover another DELSIZED or a key this
    1114             :                         // DELSIZED deletes.
    1115             : 
    1116           1 :                 case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete:
    1117           1 :                         // If the DELSIZED is value-less, it already deleted the key that it
    1118           1 :                         // was intended to delete. This is possible with a sequence like:
    1119           1 :                         //
    1120           1 :                         //      DELSIZED.8     SET.7     SET.3
    1121           1 :                         //
    1122           1 :                         // The DELSIZED only describes the size of the SET.7, which in this
    1123           1 :                         // case has already been elided. We don't count it as a missizing,
    1124           1 :                         // instead converting the DELSIZED to a DEL. Skip the remainder of
    1125           1 :                         // the snapshot stripe and return.
    1126           1 :                         if len(i.value) == 0 {
    1127           1 :                                 i.key.SetKind(InternalKeyKindDelete)
    1128           1 :                                 // NB: We skipInStripe now, rather than returning leaving
    1129           1 :                                 // i.skip=true and returning early, because Next() requires
    1130           1 :                                 // that i.skip=true only if i.iterPos = iterPosCurForward.
    1131           1 :                                 //
    1132           1 :                                 // Ignore any error caused by skipInStripe since it does not affect
    1133           1 :                                 // the key/value being returned here, and the next call to Next() will
    1134           1 :                                 // expose it.
    1135           1 :                                 i.skipInStripe()
    1136           1 :                                 return &i.key, i.value
    1137           1 :                         }
    1138             :                         // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
    1139             :                         // has a positive size.
    1140           1 :                         expectedSize, n := binary.Uvarint(i.value)
    1141           1 :                         if n != len(i.value) {
    1142           1 :                                 i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
    1143           1 :                                 i.valid = false
    1144           1 :                                 return nil, nil
    1145           1 :                         }
    1146           1 :                         elidedSize := uint64(len(i.iterKey.UserKey)) + uint64(len(i.iterValue))
    1147           1 :                         if elidedSize != expectedSize {
    1148           1 :                                 // The original DELSIZED key was missized. It's unclear what to
    1149           1 :                                 // do. The user-provided size was wrong, so it's unlikely to be
    1150           1 :                                 // accurate or meaningful. We could:
    1151           1 :                                 //
    1152           1 :                                 //   1. return the DELSIZED with the original user-provided size unmodified
    1153           1 :                                 //   2. return the DELZIZED with a zeroed size to reflect that a key was
    1154           1 :                                 //   elided, even if it wasn't the anticipated size.
    1155           1 :                                 //   3. subtract the elided size from the estimate and re-encode.
    1156           1 :                                 //   4. convert the DELSIZED into a value-less DEL, so that
    1157           1 :                                 //      ordinary DEL heuristics apply.
    1158           1 :                                 //
    1159           1 :                                 // We opt for (4) under the rationale that we can't rely on the
    1160           1 :                                 // user-provided size for accuracy, so ordinary DEL heuristics
    1161           1 :                                 // are safer.
    1162           1 :                                 i.stats.countMissizedDels++
    1163           1 :                                 i.key.SetKind(InternalKeyKindDelete)
    1164           1 :                                 i.value = i.valueBuf[:0]
    1165           1 :                                 // NB: We skipInStripe now, rather than returning leaving
    1166           1 :                                 // i.skip=true and returning early, because Next() requires
    1167           1 :                                 // that i.skip=true only if i.iterPos = iterPosCurForward.
    1168           1 :                                 //
    1169           1 :                                 // Ignore any error caused by skipInStripe since it does not affect
    1170           1 :                                 // the key/value being returned here, and the next call to Next() will
    1171           1 :                                 // expose it.
    1172           1 :                                 i.skipInStripe()
    1173           1 :                                 return &i.key, i.value
    1174           1 :                         }
    1175             :                         // NB: We remove the value regardless of whether the key was sized
    1176             :                         // appropriately. The size encoded is 'consumed' the first time it
    1177             :                         // meets a key that it deletes.
    1178           1 :                         i.value = i.valueBuf[:0]
    1179             : 
    1180           0 :                 default:
    1181           0 :                         i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
    1182           0 :                         i.valid = false
    1183           0 :                         return nil, nil
    1184             :                 }
    1185             :         }
    1186             : 
    1187           1 :         if i.iterStripeChange == sameStripe {
    1188           0 :                 panic(errors.AssertionFailedf("unexpectedly found iter stripe change = %d", i.iterStripeChange))
    1189             :         }
    1190             :         // We landed outside the original stripe. Reset skip.
    1191           1 :         i.skip = false
    1192           1 :         if i.err != nil {
    1193           0 :                 return nil, nil
    1194           0 :         }
    1195           1 :         return &i.key, i.value
    1196             : }
    1197             : 
    1198           1 : func (i *compactionIter) saveKey() {
    1199           1 :         i.keyBuf = append(i.keyBuf[:0], i.iterKey.UserKey...)
    1200           1 :         i.key.UserKey = i.keyBuf
    1201           1 :         i.key.Trailer = i.iterKey.Trailer
    1202           1 :         i.keyTrailer = i.iterKey.Trailer
    1203           1 :         i.frontiers.Advance(i.key.UserKey)
    1204           1 : }
    1205             : 
    1206           1 : func (i *compactionIter) cloneKey(key []byte) []byte {
    1207           1 :         i.alloc, key = i.alloc.Copy(key)
    1208           1 :         return key
    1209           1 : }
    1210             : 
    1211           1 : func (i *compactionIter) Key() InternalKey {
    1212           1 :         return i.key
    1213           1 : }
    1214             : 
    1215           1 : func (i *compactionIter) Value() []byte {
    1216           1 :         return i.value
    1217           1 : }
    1218             : 
    1219           1 : func (i *compactionIter) Valid() bool {
    1220           1 :         return i.valid
    1221           1 : }
    1222             : 
    1223           1 : func (i *compactionIter) Error() error {
    1224           1 :         return i.err
    1225           1 : }
    1226             : 
    1227           1 : func (i *compactionIter) Close() error {
    1228           1 :         err := i.iter.Close()
    1229           1 :         if i.err == nil {
    1230           1 :                 i.err = err
    1231           1 :         }
    1232             : 
    1233             :         // Close the closer for the current value if one was open.
    1234           1 :         if i.valueCloser != nil {
    1235           0 :                 i.err = firstError(i.err, i.valueCloser.Close())
    1236           0 :                 i.valueCloser = nil
    1237           0 :         }
    1238             : 
    1239           1 :         return i.err
    1240             : }
    1241             : 
    1242             : // Tombstones returns a list of pending range tombstones in the fragmenter
    1243             : // up to the specified key, or all pending range tombstones if key = nil.
    1244           1 : func (i *compactionIter) Tombstones(key []byte) []keyspan.Span {
    1245           1 :         if key == nil {
    1246           1 :                 i.rangeDelFrag.Finish()
    1247           1 :         } else {
    1248           1 :                 // The specified end key is exclusive; no versions of the specified
    1249           1 :                 // user key (including range tombstones covering that key) should
    1250           1 :                 // be flushed yet.
    1251           1 :                 i.rangeDelFrag.TruncateAndFlushTo(key)
    1252           1 :         }
    1253           1 :         tombstones := i.tombstones
    1254           1 :         i.tombstones = nil
    1255           1 :         return tombstones
    1256             : }
    1257             : 
    1258             : // RangeKeys returns a list of pending fragmented range keys up to the specified
    1259             : // key, or all pending range keys if key = nil.
    1260           1 : func (i *compactionIter) RangeKeys(key []byte) []keyspan.Span {
    1261           1 :         if key == nil {
    1262           1 :                 i.rangeKeyFrag.Finish()
    1263           1 :         } else {
    1264           1 :                 // The specified end key is exclusive; no versions of the specified
    1265           1 :                 // user key (including range tombstones covering that key) should
    1266           1 :                 // be flushed yet.
    1267           1 :                 i.rangeKeyFrag.TruncateAndFlushTo(key)
    1268           1 :         }
    1269           1 :         rangeKeys := i.rangeKeys
    1270           1 :         i.rangeKeys = nil
    1271           1 :         return rangeKeys
    1272             : }
    1273             : 
    1274           1 : func (i *compactionIter) emitRangeDelChunk(fragmented keyspan.Span) {
    1275           1 :         // Apply the snapshot stripe rules, keeping only the latest tombstone for
    1276           1 :         // each snapshot stripe.
    1277           1 :         currentIdx := -1
    1278           1 :         keys := fragmented.Keys[:0]
    1279           1 :         for _, k := range fragmented.Keys {
    1280           1 :                 idx, _ := snapshotIndex(k.SeqNum(), i.snapshots)
    1281           1 :                 if currentIdx == idx {
    1282           1 :                         continue
    1283             :                 }
    1284           1 :                 if idx == 0 && i.elideRangeTombstone(fragmented.Start, fragmented.End) {
    1285           1 :                         // This is the last snapshot stripe and the range tombstone
    1286           1 :                         // can be elided.
    1287           1 :                         break
    1288             :                 }
    1289             : 
    1290           1 :                 keys = append(keys, k)
    1291           1 :                 if idx == 0 {
    1292           1 :                         // This is the last snapshot stripe.
    1293           1 :                         break
    1294             :                 }
    1295           1 :                 currentIdx = idx
    1296             :         }
    1297           1 :         if len(keys) > 0 {
    1298           1 :                 i.tombstones = append(i.tombstones, keyspan.Span{
    1299           1 :                         Start: fragmented.Start,
    1300           1 :                         End:   fragmented.End,
    1301           1 :                         Keys:  keys,
    1302           1 :                 })
    1303           1 :         }
    1304             : }
    1305             : 
    1306           1 : func (i *compactionIter) emitRangeKeyChunk(fragmented keyspan.Span) {
    1307           1 :         // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
    1308           1 :         // do that here.
    1309           1 :         if len(fragmented.Keys) > 0 {
    1310           1 :                 i.rangeKeys = append(i.rangeKeys, fragmented)
    1311           1 :         }
    1312             : }
    1313             : 
    1314             : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
    1315             : // so improves compression and enables an optimization during forward iteration
    1316             : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
    1317             : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
    1318           1 : func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
    1319           1 :         if !i.allowZeroSeqNum {
    1320           1 :                 // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
    1321           1 :                 // make the determination on a key by key basis, similar to what is done
    1322           1 :                 // for elideTombstone. Need to add a benchmark for compactionIter to verify
    1323           1 :                 // that isn't too expensive.
    1324           1 :                 return
    1325           1 :         }
    1326           1 :         if snapshotIdx > 0 {
    1327           1 :                 // This is not the last snapshot
    1328           1 :                 return
    1329           1 :         }
    1330           1 :         i.key.SetSeqNum(base.SeqNumZero)
    1331             : }

Generated by: LCOV version 1.14