LCOV - code coverage report
Current view: top level - pebble - merging_iter.go (source / functions) Coverage Total Hit
Test: 2025-02-12 08:16Z 419f2391 - meta test only.lcov Lines: 82.4 % 790 651
Test Date: 2025-02-12 08:17:57 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "fmt"
      11              :         "runtime/debug"
      12              :         "unsafe"
      13              : 
      14              :         "github.com/cockroachdb/errors"
      15              :         "github.com/cockroachdb/pebble/internal/base"
      16              :         "github.com/cockroachdb/pebble/internal/invariants"
      17              :         "github.com/cockroachdb/pebble/internal/keyspan"
      18              :         "github.com/cockroachdb/pebble/internal/treeprinter"
      19              : )
      20              : 
      21              : type mergingIterLevel struct {
      22              :         index int
      23              :         iter  internalIterator
      24              :         // rangeDelIter is set to the range-deletion iterator for the level. When
      25              :         // configured with a levelIter, this pointer changes as sstable boundaries
      26              :         // are crossed. See levelIter.initRangeDel and the Range Deletions comment
      27              :         // below.
      28              :         rangeDelIter keyspan.FragmentIterator
      29              :         // rangeDelIterGeneration is incremented whenever rangeDelIter changes.
      30              :         rangeDelIterGeneration int
      31              :         // iterKV caches the current key-value pair iter points to.
      32              :         iterKV *base.InternalKV
      33              :         // levelIter is non-nil if this level's iter is ultimately backed by a
      34              :         // *levelIter. The handle in iter may have wrapped the levelIter with
      35              :         // intermediary internalIterator implementations.
      36              :         levelIter *levelIter
      37              : 
      38              :         // tombstone caches the tombstone rangeDelIter is currently pointed at. If
      39              :         // tombstone is nil, there are no further tombstones within the
      40              :         // current sstable in the current iterator direction. The cached tombstone is
      41              :         // only valid for the levels in the range [0,heap[0].index]. This avoids
      42              :         // positioning tombstones at lower levels which cannot possibly shadow the
      43              :         // current key.
      44              :         tombstone *keyspan.Span
      45              : }
      46              : 
      47              : // Assert that *mergingIterLevel implements rangeDelIterSetter.
      48              : var _ rangeDelIterSetter = (*mergingIterLevel)(nil)
      49              : 
      50            1 : func (ml *mergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
      51            1 :         ml.tombstone = nil
      52            1 :         if ml.rangeDelIter != nil {
      53            1 :                 ml.rangeDelIter.Close()
      54            1 :         }
      55            1 :         ml.rangeDelIter = iter
      56            1 :         ml.rangeDelIterGeneration++
      57              : }
      58              : 
      59              : // mergingIter provides a merged view of multiple iterators from different
      60              : // levels of the LSM.
      61              : //
      62              : // The core of a mergingIter is a heap of internalIterators (see
      63              : // mergingIterHeap). The heap can operate as either a min-heap, used during
      64              : // forward iteration (First, SeekGE, Next) or a max-heap, used during reverse
      65              : // iteration (Last, SeekLT, Prev). The heap is initialized in calls to First,
      66              : // Last, SeekGE, and SeekLT. A call to Next or Prev takes the current top
      67              : // element on the heap, advances its iterator, and then "fixes" the heap
      68              : // property. When one of the child iterators is exhausted during Next/Prev
      69              : // iteration, it is removed from the heap.
      70              : //
      71              : // # Range Deletions
      72              : //
      73              : // A mergingIter can optionally be configured with a slice of range deletion
      74              : // iterators. The range deletion iterator slice must exactly parallel the point
      75              : // iterators and the range deletion iterator must correspond to the same level
      76              : // in the LSM as the point iterator. Note that each memtable and each table in
      77              : // L0 is a different "level" from the mergingIter perspective. So level 0 below
      78              : // does not correspond to L0 in the LSM.
      79              : //
      80              : // A range deletion iterator iterates over fragmented range tombstones. Range
      81              : // tombstones are fragmented by splitting them at any overlapping points. This
      82              : // fragmentation guarantees that within an sstable tombstones will either be
      83              : // distinct or will have identical start and end user keys. While range
      84              : // tombstones are fragmented within an sstable, the start and end keys are not truncated
      85              : // to sstable boundaries. This is necessary because the tombstone end key is
      86              : // exclusive and does not have a sequence number. Consider an sstable
      87              : // containing the range tombstone [a,c)#9 and the key "b#8". The tombstone must
      88              : // delete "b#8", yet older versions of "b" might spill over to the next
      89              : // sstable. So the boundary key for this sstable must be "b#8". Adjusting the
      90              : // end key of tombstones to be optionally inclusive or contain a sequence
      91              : // number would be possible solutions (such solutions have potentially serious
      92              : // issues: tombstones have exclusive end keys since an inclusive deletion end can
      93              : // be converted to an exclusive one while the reverse transformation is not possible;
      94              : // the semantics of a sequence number for the end key of a range tombstone are murky).
      95              : //
      96              : // The approach taken here performs an
      97              : // implicit truncation of the tombstone to the sstable boundaries.
      98              : //
      99              : // During initialization of a mergingIter, the range deletion iterators for
     100              : // batches, memtables, and L0 tables are populated up front. Note that Batches
     101              : // and memtables index unfragmented tombstones.  Batch.newRangeDelIter() and
     102              : // memTable.newRangeDelIter() fragment and cache the tombstones on demand. The
     103              : // L1-L6 range deletion iterators are populated by levelIter. When configured
     104              : // to load range deletion iterators, whenever a levelIter loads a table it
     105              : // loads both the point iterator and the range deletion
     106              : // iterator. levelIter.rangeDelIter is configured to point to the right entry
     107              : // in mergingIter.levels. The effect of this setup is that
     108              : // mergingIter.levels[i].rangeDelIter always contains the fragmented range
     109              : // tombstone for the current table in level i that the levelIter has open.
     110              : //
     111              : // Another crucial mechanism of levelIter is that it materializes fake point
     112              : // entries for the table boundaries if the boundary is range deletion
     113              : // key. Consider a table that contains only a range tombstone [a-e)#10. The
     114              : // sstable boundaries for this table will be a#10,15 and
     115              : // e#72057594037927935,15. During forward iteration levelIter will return
     116              : // e#72057594037927935,15 as a key. During reverse iteration levelIter will
     117              : // return a#10,15 as a key. These sentinel keys act as bookends to point
     118              : // iteration and allow mergingIter to keep a table and its associated range
     119              : // tombstones loaded as long as there are keys at lower levels that are within
     120              : // the bounds of the table.
     121              : //
     122              : // The final piece to the range deletion puzzle is the LSM invariant that for a
     123              : // given key K newer versions of K can only exist earlier in the level, or at
     124              : // higher levels of the tree. For example, if K#4 exists in L3, k#5 can only
     125              : // exist earlier in the L3 or in L0, L1, L2 or a memtable. Get very explicitly
     126              : // uses this invariant to find the value for a key by walking the LSM level by
     127              : // level. For range deletions, this invariant means that a range deletion at
     128              : // level N will necessarily shadow any keys within its bounds in level Y where
     129              : // Y > N. One wrinkle to this statement is that it only applies to keys that
     130              : // lie within the sstable bounds as well, but we get that guarantee due to the
     131              : // way the range deletion iterator and point iterator are bound together by a
     132              : // levelIter.
     133              : //
     134              : // Tying the above all together, we get a picture where each level (index in
     135              : // mergingIter.levels) is composed of both point operations (pX) and range
     136              : // deletions (rX). The range deletions for level X shadow both the point
     137              : // operations and range deletions for level Y where Y > X allowing mergingIter
     138              : // to skip processing entries in that shadow. For example, consider the
     139              : // scenario:
     140              : //
     141              : //      r0: a---e
     142              : //      r1:    d---h
     143              : //      r2:       g---k
     144              : //      r3:          j---n
     145              : //      r4:             m---q
     146              : //
     147              : // This is showing 5 levels of range deletions. Consider what happens upon
     148              : // SeekGE("b"). We first seek the point iterator for level 0 (the point values
     149              : // are not shown above) and we then seek the range deletion iterator. That
     150              : // returns the tombstone [a,e). This tombstone tells us that all keys in the
     151              : // range [a,e) in lower levels are deleted so we can skip them. So we can
     152              : // adjust the seek key to "e", the tombstone end key. For level 1 we seek to
     153              : // "e" and find the range tombstone [d,h) and similar logic holds. By the time
     154              : // we get to level 4 we're seeking to "n".
     155              : //
     156              : // One consequence of not truncating tombstone end keys to sstable boundaries
     157              : // is the seeking process described above cannot always seek to the tombstone
     158              : // end key in the older level. For example, imagine in the above example r3 is
     159              : // a partitioned level (i.e., L1+ in our LSM), and the sstable containing [j,
     160              : // n) has "k" as its upper boundary. In this situation, compactions involving
     161              : // keys at or after "k" can output those keys to r4+, even if they're newer
     162              : // than our tombstone [j, n). So instead of seeking to "n" in r4 we can only
     163              : // seek to "k".  To achieve this, the instance variable `largestUserKey.`
     164              : // maintains the upper bounds of the current sstables in the partitioned
     165              : // levels. In this example, `levels[3].largestUserKey` holds "k", telling us to
     166              : // limit the seek triggered by a tombstone in r3 to "k".
     167              : //
     168              : // During actual iteration levels can contain both point operations and range
     169              : // deletions. Within a level, when a range deletion contains a point operation
     170              : // the sequence numbers must be checked to determine if the point operation is
     171              : // newer or older than the range deletion tombstone. The mergingIter maintains
     172              : // the invariant that the range deletion iterators for all levels newer that
     173              : // the current iteration key (L < m.heap.items[0].index) are positioned at the
     174              : // next (or previous during reverse iteration) range deletion tombstone. We
     175              : // know those levels don't contain a range deletion tombstone that covers the
     176              : // current key because if they did the current key would be deleted. The range
     177              : // deletion iterator for the current key's level is positioned at a range
     178              : // tombstone covering or past the current key. The position of all of other
     179              : // range deletion iterators is unspecified. Whenever a key from those levels
     180              : // becomes the current key, their range deletion iterators need to be
     181              : // positioned. This lazy positioning avoids seeking the range deletion
     182              : // iterators for keys that are never considered. (A similar bit of lazy
     183              : // evaluation can be done for the point iterators, but is still TBD).
     184              : //
     185              : // For a full example, consider the following setup:
     186              : //
     187              : //      p0:               o
     188              : //      r0:             m---q
     189              : //
     190              : //      p1:              n p
     191              : //      r1:       g---k
     192              : //
     193              : //      p2:  b d    i
     194              : //      r2: a---e           q----v
     195              : //
     196              : //      p3:     e
     197              : //      r3:
     198              : //
     199              : // If we start iterating from the beginning, the first key we encounter is "b"
     200              : // in p2. When the mergingIter is pointing at a valid entry, the range deletion
     201              : // iterators for all of the levels < m.heap.items[0].index are positioned at
     202              : // the next range tombstone past the current key. So r0 will point at [m,q) and
     203              : // r1 at [g,k). When the key "b" is encountered, we check to see if the current
     204              : // tombstone for r0 or r1 contains it, and whether the tombstone for r2, [a,e),
     205              : // contains and is newer than "b".
     206              : //
     207              : // Advancing the iterator finds the next key at "d". This is in the same level
     208              : // as the previous key "b" so we don't have to reposition any of the range
     209              : // deletion iterators, but merely check whether "d" is now contained by any of
     210              : // the range tombstones at higher levels or has stepped past the range
     211              : // tombstone in its own level or higher levels. In this case, there is nothing to be done.
     212              : //
     213              : // Advancing the iterator again finds "e". Since "e" comes from p3, we have to
     214              : // position the r3 range deletion iterator, which is empty. "e" is past the r2
     215              : // tombstone of [a,e) so we need to advance the r2 range deletion iterator to
     216              : // [q,v).
     217              : //
     218              : // The next key is "i". Because this key is in p2, a level above "e", we don't
     219              : // have to reposition any range deletion iterators and instead see that "i" is
     220              : // covered by the range tombstone [g,k). The iterator is immediately advanced
     221              : // to "n" which is covered by the range tombstone [m,q) causing the iterator to
     222              : // advance to "o" which is visible.
     223              : //
     224              : // # Error handling
     225              : //
     226              : // Any iterator operation may fail. The InternalIterator contract dictates that
     227              : // an iterator must return a nil internal key when an error occurs, and a
     228              : // subsequent call to Error() should return the error value. The exported
     229              : // merging iterator positioning methods must adhere to this contract by setting
     230              : // m.err to hold any error encountered by the individual level iterators and
     231              : // returning a nil internal key. Some internal helpers (eg,
     232              : // find[Next|Prev]Entry) also adhere to this contract, setting m.err directly).
     233              : // Other internal functions return an explicit error return value and DO NOT set
     234              : // m.err, relying on the caller to set m.err appropriately.
     235              : //
     236              : // TODO(jackson): Update the InternalIterator interface to return explicit error
     237              : // return values (and an *InternalKV pointer).
     238              : //
     239              : // TODO(peter,rangedel): For testing, advance the iterator through various
     240              : // scenarios and have each step display the current state (i.e. the current
     241              : // heap and range-del iterator positioning).
     242              : type mergingIter struct {
     243              :         logger        Logger
     244              :         split         Split
     245              :         dir           int
     246              :         snapshot      base.SeqNum
     247              :         batchSnapshot base.SeqNum
     248              :         levels        []mergingIterLevel
     249              :         heap          mergingIterHeap
     250              :         err           error
     251              :         prefix        []byte
     252              :         lower         []byte
     253              :         upper         []byte
     254              :         stats         *InternalIteratorStats
     255              :         seekKeyBuf    []byte
     256              : 
     257              :         // levelsPositioned, if non-nil, is a slice of the same length as levels.
     258              :         // It's used by NextPrefix to record which levels have already been
     259              :         // repositioned. It's created lazily by the first call to NextPrefix.
     260              :         levelsPositioned []bool
     261              : 
     262              :         combinedIterState *combinedIterState
     263              : 
     264              :         // Used in some tests to disable the random disabling of seek optimizations.
     265              :         forceEnableSeekOpt bool
     266              : }
     267              : 
     268              : // mergingIter implements the base.InternalIterator interface.
     269              : var _ base.InternalIterator = (*mergingIter)(nil)
     270              : 
     271              : // newMergingIter returns an iterator that merges its input. Walking the
     272              : // resultant iterator will return all key/value pairs of all input iterators
     273              : // in strictly increasing key order, as defined by cmp. It is permissible to
     274              : // pass a nil split parameter if the caller is never going to call
     275              : // SeekPrefixGE.
     276              : //
     277              : // The input's key ranges may overlap, but there are assumed to be no duplicate
     278              : // keys: if iters[i] contains a key k then iters[j] will not contain that key k.
     279              : //
     280              : // None of the iters may be nil.
     281              : func newMergingIter(
     282              :         logger Logger,
     283              :         stats *base.InternalIteratorStats,
     284              :         cmp Compare,
     285              :         split Split,
     286              :         iters ...internalIterator,
     287            1 : ) *mergingIter {
     288            1 :         m := &mergingIter{}
     289            1 :         levels := make([]mergingIterLevel, len(iters))
     290            1 :         for i := range levels {
     291            1 :                 levels[i].iter = iters[i]
     292            1 :         }
     293            1 :         m.init(&IterOptions{logger: logger}, stats, cmp, split, levels...)
     294            1 :         return m
     295              : }
     296              : 
     297              : func (m *mergingIter) init(
     298              :         opts *IterOptions,
     299              :         stats *base.InternalIteratorStats,
     300              :         cmp Compare,
     301              :         split Split,
     302              :         levels ...mergingIterLevel,
     303            1 : ) {
     304            1 :         m.err = nil // clear cached iteration error
     305            1 :         m.logger = opts.getLogger()
     306            1 :         if opts != nil {
     307            1 :                 m.lower = opts.LowerBound
     308            1 :                 m.upper = opts.UpperBound
     309            1 :         }
     310            1 :         m.snapshot = base.SeqNumMax
     311            1 :         m.batchSnapshot = base.SeqNumMax
     312            1 :         m.levels = levels
     313            1 :         m.heap.cmp = cmp
     314            1 :         m.split = split
     315            1 :         m.stats = stats
     316            1 :         if cap(m.heap.items) < len(levels) {
     317            1 :                 m.heap.items = make([]*mergingIterLevel, 0, len(levels))
     318            1 :         } else {
     319            1 :                 m.heap.items = m.heap.items[:0]
     320            1 :         }
     321            1 :         for l := range m.levels {
     322            1 :                 m.levels[l].index = l
     323            1 :         }
     324              : }
     325              : 
     326            1 : func (m *mergingIter) initHeap() {
     327            1 :         m.heap.items = m.heap.items[:0]
     328            1 :         for i := range m.levels {
     329            1 :                 if l := &m.levels[i]; l.iterKV != nil {
     330            1 :                         m.heap.items = append(m.heap.items, l)
     331            1 :                 }
     332              :         }
     333            1 :         m.heap.init()
     334              : }
     335              : 
     336            1 : func (m *mergingIter) initMinHeap() error {
     337            1 :         m.dir = 1
     338            1 :         m.heap.reverse = false
     339            1 :         m.initHeap()
     340            1 :         return m.initMinRangeDelIters(-1)
     341            1 : }
     342              : 
     343              : // The level of the previous top element was oldTopLevel. Note that all range delete
     344              : // iterators < oldTopLevel are positioned past the key of the previous top element and
     345              : // the range delete iterator == oldTopLevel is positioned at or past the key of the
     346              : // previous top element. We need to position the range delete iterators from oldTopLevel + 1
     347              : // to the level of the current top element.
     348            1 : func (m *mergingIter) initMinRangeDelIters(oldTopLevel int) error {
     349            1 :         if m.heap.len() == 0 {
     350            1 :                 return nil
     351            1 :         }
     352              : 
     353              :         // Position the range-del iterators at levels <= m.heap.items[0].index.
     354            1 :         item := m.heap.items[0]
     355            1 :         for level := oldTopLevel + 1; level <= item.index; level++ {
     356            1 :                 l := &m.levels[level]
     357            1 :                 if l.rangeDelIter == nil {
     358            1 :                         continue
     359              :                 }
     360            1 :                 var err error
     361            1 :                 l.tombstone, err = l.rangeDelIter.SeekGE(item.iterKV.K.UserKey)
     362            1 :                 if err != nil {
     363            0 :                         return err
     364            0 :                 }
     365              :         }
     366            1 :         return nil
     367              : }
     368              : 
     369            1 : func (m *mergingIter) initMaxHeap() error {
     370            1 :         m.dir = -1
     371            1 :         m.heap.reverse = true
     372            1 :         m.initHeap()
     373            1 :         return m.initMaxRangeDelIters(-1)
     374            1 : }
     375              : 
     376              : // The level of the previous top element was oldTopLevel. Note that all range delete
     377              : // iterators < oldTopLevel are positioned before the key of the previous top element and
     378              : // the range delete iterator == oldTopLevel is positioned at or before the key of the
     379              : // previous top element. We need to position the range delete iterators from oldTopLevel + 1
     380              : // to the level of the current top element.
     381            1 : func (m *mergingIter) initMaxRangeDelIters(oldTopLevel int) error {
     382            1 :         if m.heap.len() == 0 {
     383            1 :                 return nil
     384            1 :         }
     385              :         // Position the range-del iterators at levels <= m.heap.items[0].index.
     386            1 :         item := m.heap.items[0]
     387            1 :         for level := oldTopLevel + 1; level <= item.index; level++ {
     388            1 :                 l := &m.levels[level]
     389            1 :                 if l.rangeDelIter == nil {
     390            1 :                         continue
     391              :                 }
     392            1 :                 tomb, err := keyspan.SeekLE(m.heap.cmp, l.rangeDelIter, item.iterKV.K.UserKey)
     393            1 :                 if err != nil {
     394            0 :                         return err
     395            0 :                 }
     396            1 :                 l.tombstone = tomb
     397              :         }
     398            1 :         return nil
     399              : }
     400              : 
     401            1 : func (m *mergingIter) switchToMinHeap() error {
     402            1 :         if m.heap.len() == 0 {
     403            1 :                 if m.lower != nil {
     404            0 :                         m.SeekGE(m.lower, base.SeekGEFlagsNone)
     405            1 :                 } else {
     406            1 :                         m.First()
     407            1 :                 }
     408            1 :                 return m.err
     409              :         }
     410              : 
     411              :         // We're switching from using a max heap to a min heap. We need to advance
     412              :         // any iterator that is less than or equal to the current key. Consider the
     413              :         // scenario where we have 2 iterators being merged (user-key:seq-num):
     414              :         //
     415              :         // i1:     *a:2     b:2
     416              :         // i2: a:1      b:1
     417              :         //
     418              :         // The current key is a:2 and i2 is pointed at a:1. When we switch to forward
     419              :         // iteration, we want to return a key that is greater than a:2.
     420              : 
     421            1 :         key := m.heap.items[0].iterKV.K
     422            1 :         cur := m.heap.items[0]
     423            1 : 
     424            1 :         for i := range m.levels {
     425            1 :                 l := &m.levels[i]
     426            1 :                 if l == cur {
     427            1 :                         continue
     428              :                 }
     429            1 :                 for l.iterKV = l.iter.Next(); l.iterKV != nil; l.iterKV = l.iter.Next() {
     430            1 :                         if base.InternalCompare(m.heap.cmp, key, l.iterKV.K) < 0 {
     431            1 :                                 // key < iter-key
     432            1 :                                 break
     433              :                         }
     434              :                         // key >= iter-key
     435              :                 }
     436            1 :                 if l.iterKV == nil {
     437            1 :                         if err := l.iter.Error(); err != nil {
     438            0 :                                 return err
     439            0 :                         }
     440              :                 }
     441              :         }
     442              : 
     443              :         // Special handling for the current iterator because we were using its key
     444              :         // above.
     445            1 :         cur.iterKV = cur.iter.Next()
     446            1 :         if cur.iterKV == nil {
     447            1 :                 if err := cur.iter.Error(); err != nil {
     448            0 :                         return err
     449            0 :                 }
     450              :         }
     451            1 :         return m.initMinHeap()
     452              : }
     453              : 
     454            1 : func (m *mergingIter) switchToMaxHeap() error {
     455            1 :         if m.heap.len() == 0 {
     456            1 :                 if m.upper != nil {
     457            0 :                         m.SeekLT(m.upper, base.SeekLTFlagsNone)
     458            1 :                 } else {
     459            1 :                         m.Last()
     460            1 :                 }
     461            1 :                 return m.err
     462              :         }
     463              : 
     464              :         // We're switching from using a min heap to a max heap. We need to backup any
     465              :         // iterator that is greater than or equal to the current key. Consider the
     466              :         // scenario where we have 2 iterators being merged (user-key:seq-num):
     467              :         //
     468              :         // i1: a:2     *b:2
     469              :         // i2:     a:1      b:1
     470              :         //
     471              :         // The current key is b:2 and i2 is pointing at b:1. When we switch to
     472              :         // reverse iteration, we want to return a key that is less than b:2.
     473            1 :         key := m.heap.items[0].iterKV.K
     474            1 :         cur := m.heap.items[0]
     475            1 : 
     476            1 :         for i := range m.levels {
     477            1 :                 l := &m.levels[i]
     478            1 :                 if l == cur {
     479            1 :                         continue
     480              :                 }
     481              : 
     482            1 :                 for l.iterKV = l.iter.Prev(); l.iterKV != nil; l.iterKV = l.iter.Prev() {
     483            1 :                         if base.InternalCompare(m.heap.cmp, key, l.iterKV.K) > 0 {
     484            1 :                                 // key > iter-key
     485            1 :                                 break
     486              :                         }
     487              :                         // key <= iter-key
     488              :                 }
     489            1 :                 if l.iterKV == nil {
     490            1 :                         if err := l.iter.Error(); err != nil {
     491            0 :                                 return err
     492            0 :                         }
     493              :                 }
     494              :         }
     495              : 
     496              :         // Special handling for the current iterator because we were using its key
     497              :         // above.
     498            1 :         cur.iterKV = cur.iter.Prev()
     499            1 :         if cur.iterKV == nil {
     500            1 :                 if err := cur.iter.Error(); err != nil {
     501            0 :                         return err
     502            0 :                 }
     503              :         }
     504            1 :         return m.initMaxHeap()
     505              : }
     506              : 
     507              : // nextEntry unconditionally steps to the next entry. item is the current top
     508              : // item in the heap.
     509            1 : func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
     510            1 :         // INVARIANT: If in prefix iteration mode, item.iterKey must have a prefix equal
     511            1 :         // to m.prefix. This invariant is important for ensuring TrySeekUsingNext
     512            1 :         // optimizations behave correctly.
     513            1 :         //
     514            1 :         // During prefix iteration, the iterator does not have a full view of the
     515            1 :         // LSM. Some level iterators may omit keys that are known to fall outside
     516            1 :         // the seek prefix (eg, due to sstable bloom filter exclusion). It's
     517            1 :         // important that in such cases we don't position any iterators beyond
     518            1 :         // m.prefix, because doing so may interfere with future seeks.
     519            1 :         //
     520            1 :         // Let prefixes P1 < P2 < P3. Imagine a SeekPrefixGE to prefix P1, followed
     521            1 :         // by a SeekPrefixGE to prefix P2. Imagine there exist live keys at prefix
     522            1 :         // P2, but they're not visible to the SeekPrefixGE(P1) (because of
     523            1 :         // bloom-filter exclusion or a range tombstone that deletes prefix P1 but
     524            1 :         // not P2). If the SeekPrefixGE(P1) is allowed to move any level iterators
     525            1 :         // to P3, the SeekPrefixGE(P2, TrySeekUsingNext=true) may mistakenly think
     526            1 :         // the level contains no point keys or range tombstones within the prefix
     527            1 :         // P2. Care is taken to avoid ever advancing the iterator beyond the current
     528            1 :         // prefix. If nextEntry is ever invoked while we're already beyond the
     529            1 :         // current prefix, we're violating the invariant.
     530            1 :         if invariants.Enabled && m.prefix != nil {
     531            1 :                 if p := m.split.Prefix(l.iterKV.K.UserKey); !bytes.Equal(m.prefix, p) {
     532            0 :                         m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
     533            0 :                                 m.prefix, l.iterKV, debug.Stack())
     534            0 :                 }
     535              :         }
     536              : 
     537            1 :         oldTopLevel := l.index
     538            1 :         oldRangeDelIterGeneration := l.rangeDelIterGeneration
     539            1 : 
     540            1 :         if succKey == nil {
     541            1 :                 l.iterKV = l.iter.Next()
     542            1 :         } else {
     543            1 :                 l.iterKV = l.iter.NextPrefix(succKey)
     544            1 :         }
     545              : 
     546            1 :         if l.iterKV == nil {
     547            1 :                 if err := l.iter.Error(); err != nil {
     548            0 :                         return err
     549            0 :                 }
     550            1 :                 m.heap.pop()
     551            1 :         } else {
     552            1 :                 if m.prefix != nil && !bytes.Equal(m.prefix, m.split.Prefix(l.iterKV.K.UserKey)) {
     553            1 :                         // Set keys without a matching prefix to their zero values when in prefix
     554            1 :                         // iteration mode and remove iterated level from heap.
     555            1 :                         l.iterKV = nil
     556            1 :                         m.heap.pop()
     557            1 :                 } else if m.heap.len() > 1 {
     558            1 :                         m.heap.fix(0)
     559            1 :                 }
     560            1 :                 if l.rangeDelIterGeneration != oldRangeDelIterGeneration {
     561            1 :                         // The rangeDelIter changed which indicates that the l.iter moved to the
     562            1 :                         // next sstable. We have to update the tombstone for oldTopLevel as well.
     563            1 :                         oldTopLevel--
     564            1 :                 }
     565              :         }
     566              : 
     567              :         // The cached tombstones are only valid for the levels
     568              :         // [0,oldTopLevel]. Updated the cached tombstones for any levels in the range
     569              :         // [oldTopLevel+1,heap[0].index].
     570            1 :         return m.initMinRangeDelIters(oldTopLevel)
     571              : }
     572              : 
     573              : // isNextEntryDeleted starts from the current entry (as the next entry) and if
     574              : // it is deleted, moves the iterators forward as needed and returns true, else
     575              : // it returns false. item is the top item in the heap. If any of the required
     576              : // iterator operations error, the error is returned without updating m.err.
     577              : //
     578              : // During prefix iteration mode, isNextEntryDeleted will exhaust the iterator by
     579              : // clearing the heap if the deleted key(s) extend beyond the iteration prefix
     580              : // during prefix-iteration mode.
     581            1 : func (m *mergingIter) isNextEntryDeleted(item *mergingIterLevel) (bool, error) {
     582            1 :         // Look for a range deletion tombstone containing item.iterKV at higher
     583            1 :         // levels (level < item.index). If we find such a range tombstone we know
     584            1 :         // it deletes the key in the current level. Also look for a range
     585            1 :         // deletion at the current level (level == item.index). If we find such a
     586            1 :         // range deletion we need to check whether it is newer than the current
     587            1 :         // entry.
     588            1 :         for level := 0; level <= item.index; level++ {
     589            1 :                 l := &m.levels[level]
     590            1 :                 if l.rangeDelIter == nil || l.tombstone == nil {
     591            1 :                         // If l.tombstone is nil, there are no further tombstones
     592            1 :                         // in the current sstable in the current (forward) iteration
     593            1 :                         // direction.
     594            1 :                         continue
     595              :                 }
     596            1 :                 if m.heap.cmp(l.tombstone.End, item.iterKV.K.UserKey) <= 0 {
     597            1 :                         // The current key is at or past the tombstone end key.
     598            1 :                         //
     599            1 :                         // NB: for the case that this l.rangeDelIter is provided by a levelIter we know that
     600            1 :                         // the levelIter must be positioned at a key >= item.iterKV. So it is sufficient to seek the
     601            1 :                         // current l.rangeDelIter (since any range del iterators that will be provided by the
     602            1 :                         // levelIter in the future cannot contain item.iterKV). Also, it is possible that we
     603            1 :                         // will encounter parts of the range delete that should be ignored -- we handle that
     604            1 :                         // below.
     605            1 :                         var err error
     606            1 :                         l.tombstone, err = l.rangeDelIter.SeekGE(item.iterKV.K.UserKey)
     607            1 :                         if err != nil {
     608            0 :                                 return false, err
     609            0 :                         }
     610              :                 }
     611            1 :                 if l.tombstone == nil {
     612            1 :                         continue
     613              :                 }
     614              : 
     615            1 :                 if l.tombstone.VisibleAt(m.snapshot) && m.heap.cmp(l.tombstone.Start, item.iterKV.K.UserKey) <= 0 {
     616            1 :                         if level < item.index {
     617            1 :                                 // We could also do m.seekGE(..., level + 1). The levels from
     618            1 :                                 // [level + 1, item.index) are already after item.iterKV so seeking them may be
     619            1 :                                 // wasteful.
     620            1 : 
     621            1 :                                 // We can seek up to tombstone.End.
     622            1 :                                 //
     623            1 :                                 // Progress argument: Since this file is at a higher level than item.iterKV we know
     624            1 :                                 // that the iterator in this file must be positioned within its bounds and at a key
     625            1 :                                 // X > item.iterKV (otherwise it would be the min of the heap). It is not
     626            1 :                                 // possible for X.UserKey == item.iterKV.UserKey, since it is incompatible with
     627            1 :                                 // X > item.iterKV (a lower version cannot be in a higher sstable), so it must be that
     628            1 :                                 // X.UserKey > item.iterKV.UserKey. Which means l.largestUserKey > item.key.UserKey.
     629            1 :                                 // We also know that l.tombstone.End > item.iterKV.UserKey. So the min of these,
     630            1 :                                 // seekKey, computed below, is > item.iterKV.UserKey, so the call to seekGE() will
     631            1 :                                 // make forward progress.
     632            1 :                                 m.seekKeyBuf = append(m.seekKeyBuf[:0], l.tombstone.End...)
     633            1 :                                 seekKey := m.seekKeyBuf
     634            1 :                                 // This seek is not directly due to a SeekGE call, so we don't know
     635            1 :                                 // enough about the underlying iterator positions, and so we keep the
     636            1 :                                 // try-seek-using-next optimization disabled. Additionally, if we're in
     637            1 :                                 // prefix-seek mode and a re-seek would have moved us past the original
     638            1 :                                 // prefix, we can remove all merging iter levels below the rangedel
     639            1 :                                 // tombstone's level and return immediately instead of re-seeking. This
     640            1 :                                 // is correct since those levels cannot provide a key that matches the
     641            1 :                                 // prefix, and is also visible. Additionally, this is important to make
     642            1 :                                 // subsequent `TrySeekUsingNext` work correctly, as a re-seek on a
     643            1 :                                 // different prefix could have resulted in this iterator skipping visible
     644            1 :                                 // keys at prefixes in between m.prefix and seekKey, that are currently
     645            1 :                                 // not in the heap due to a bloom filter mismatch.
     646            1 :                                 //
     647            1 :                                 // Additionally, we set the relative-seek flag. This is
     648            1 :                                 // important when iterating with lazy combined iteration. If
     649            1 :                                 // there's a range key between this level's current file and the
     650            1 :                                 // file the seek will land on, we need to detect it in order to
     651            1 :                                 // trigger construction of the combined iterator.
     652            1 :                                 if m.prefix != nil {
     653            1 :                                         if !bytes.Equal(m.prefix, m.split.Prefix(seekKey)) {
     654            1 :                                                 for i := item.index; i < len(m.levels); i++ {
     655            1 :                                                         // Remove this level from the heap. Setting iterKV
     656            1 :                                                         // to nil should be sufficient for initMinHeap to
     657            1 :                                                         // not re-initialize the heap with them in it. Other
     658            1 :                                                         // fields in mergingIterLevel can remain as-is; the
     659            1 :                                                         // iter/rangeDelIter needs to stay intact for future
     660            1 :                                                         // trySeekUsingNexts to work, the level iter
     661            1 :                                                         // boundary context is owned by the levelIter which
     662            1 :                                                         // is not being repositioned, and any tombstones in
     663            1 :                                                         // these levels will be irrelevant for us anyway.
     664            1 :                                                         m.levels[i].iterKV = nil
     665            1 :                                                 }
     666              :                                                 // TODO(bilal): Consider a more efficient way of removing levels from
     667              :                                                 // the heap without reinitializing all of it. This would likely
     668              :                                                 // necessitate tracking the heap positions of each mergingIterHeap
     669              :                                                 // item in the mergingIterLevel, and then swapping that item in the
     670              :                                                 // heap with the last-positioned heap item, and shrinking the heap by
     671              :                                                 // one.
     672            1 :                                                 if err := m.initMinHeap(); err != nil {
     673            0 :                                                         return false, err
     674            0 :                                                 }
     675            1 :                                                 return true, nil
     676              :                                         }
     677              :                                 }
     678            1 :                                 if err := m.seekGE(seekKey, item.index, base.SeekGEFlagsNone.EnableRelativeSeek()); err != nil {
     679            0 :                                         return false, err
     680            0 :                                 }
     681            1 :                                 return true, nil
     682              :                         }
     683            1 :                         if l.tombstone.CoversAt(m.snapshot, item.iterKV.SeqNum()) {
     684            1 :                                 if err := m.nextEntry(item, nil /* succKey */); err != nil {
     685            0 :                                         return false, err
     686            0 :                                 }
     687            1 :                                 return true, nil
     688              :                         }
     689              :                 }
     690              :         }
     691            1 :         return false, nil
     692              : }
     693              : 
     694              : // Starting from the current entry, finds the first (next) entry that can be returned.
     695              : //
     696              : // If an error occurs, m.err is updated to hold the error and findNextentry
     697              : // returns a nil internal key.
     698            1 : func (m *mergingIter) findNextEntry() *base.InternalKV {
     699            1 :         for m.heap.len() > 0 && m.err == nil {
     700            1 :                 item := m.heap.items[0]
     701            1 : 
     702            1 :                 // The levelIter internal iterator will interleave exclusive sentinel
     703            1 :                 // keys to keep files open until their range deletions are no longer
     704            1 :                 // necessary. Sometimes these are interleaved with the user key of a
     705            1 :                 // file's largest key, in which case they may simply be stepped over to
     706            1 :                 // move to the next file in the forward direction. Other times they're
     707            1 :                 // interleaved at the user key of the user-iteration boundary, if that
     708            1 :                 // falls within the bounds of a file. In the latter case, there are no
     709            1 :                 // more keys < m.upper, and we can stop iterating.
     710            1 :                 //
     711            1 :                 // We perform a key comparison to differentiate between these two cases.
     712            1 :                 // This key comparison is considered okay because it only happens for
     713            1 :                 // sentinel keys. It may be eliminated after #2863.
     714            1 :                 if m.levels[item.index].iterKV.K.IsExclusiveSentinel() {
     715            1 :                         if m.upper != nil && m.heap.cmp(m.levels[item.index].iterKV.K.UserKey, m.upper) >= 0 {
     716            1 :                                 break
     717              :                         }
     718              :                         // This key is the largest boundary of a file and can be skipped now
     719              :                         // that the file's range deletions are no longer relevant.
     720            1 :                         m.err = m.nextEntry(item, nil /* succKey */)
     721            1 :                         if m.err != nil {
     722            0 :                                 return nil
     723            0 :                         }
     724            1 :                         continue
     725              :                 }
     726              : 
     727            1 :                 m.addItemStats(item)
     728            1 : 
     729            1 :                 // Check if the heap root key is deleted by a range tombstone in a
     730            1 :                 // higher level. If it is, isNextEntryDeleted will advance the iterator
     731            1 :                 // to a later key (through seeking or nexting).
     732            1 :                 isDeleted, err := m.isNextEntryDeleted(item)
     733            1 :                 if err != nil {
     734            0 :                         m.err = err
     735            0 :                         return nil
     736            1 :                 } else if isDeleted {
     737            1 :                         m.stats.PointsCoveredByRangeTombstones++
     738            1 :                         continue
     739              :                 }
     740              : 
     741              :                 // Check if the key is visible at the iterator sequence numbers.
     742            1 :                 if !item.iterKV.Visible(m.snapshot, m.batchSnapshot) {
     743            1 :                         m.err = m.nextEntry(item, nil /* succKey */)
     744            1 :                         if m.err != nil {
     745            0 :                                 return nil
     746            0 :                         }
     747            1 :                         continue
     748              :                 }
     749              : 
     750              :                 // The heap root is visible and not deleted by any range tombstones.
     751              :                 // Return it.
     752            1 :                 return item.iterKV
     753              :         }
     754            1 :         return nil
     755              : }
     756              : 
     757              : // Steps to the prev entry. item is the current top item in the heap.
     758            1 : func (m *mergingIter) prevEntry(l *mergingIterLevel) error {
     759            1 :         oldTopLevel := l.index
     760            1 :         oldRangeDelIterGeneration := l.rangeDelIterGeneration
     761            1 :         if l.iterKV = l.iter.Prev(); l.iterKV != nil {
     762            1 :                 if m.heap.len() > 1 {
     763            1 :                         m.heap.fix(0)
     764            1 :                 }
     765            1 :                 if l.rangeDelIterGeneration != oldRangeDelIterGeneration && l.rangeDelIter != nil {
     766            1 :                         // The rangeDelIter changed which indicates that the l.iter moved to the
     767            1 :                         // previous sstable. We have to update the tombstone for oldTopLevel as
     768            1 :                         // well.
     769            1 :                         oldTopLevel--
     770            1 :                 }
     771            1 :         } else {
     772            1 :                 if err := l.iter.Error(); err != nil {
     773            0 :                         return err
     774            0 :                 }
     775            1 :                 m.heap.pop()
     776              :         }
     777              : 
     778              :         // The cached tombstones are only valid for the levels
     779              :         // [0,oldTopLevel]. Updated the cached tombstones for any levels in the range
     780              :         // [oldTopLevel+1,heap[0].index].
     781            1 :         return m.initMaxRangeDelIters(oldTopLevel)
     782              : }
     783              : 
     784              : // isPrevEntryDeleted() starts from the current entry (as the prev entry) and if it is deleted,
     785              : // moves the iterators backward as needed and returns true, else it returns false. item is the top
     786              : // item in the heap.
     787            1 : func (m *mergingIter) isPrevEntryDeleted(item *mergingIterLevel) (bool, error) {
     788            1 :         // Look for a range deletion tombstone containing item.iterKV at higher
     789            1 :         // levels (level < item.index). If we find such a range tombstone we know
     790            1 :         // it deletes the key in the current level. Also look for a range
     791            1 :         // deletion at the current level (level == item.index). If we find such a
     792            1 :         // range deletion we need to check whether it is newer than the current
     793            1 :         // entry.
     794            1 :         for level := 0; level <= item.index; level++ {
     795            1 :                 l := &m.levels[level]
     796            1 :                 if l.rangeDelIter == nil || l.tombstone == nil {
     797            1 :                         // If l.tombstone is nil, there are no further tombstones
     798            1 :                         // in the current sstable in the current (reverse) iteration
     799            1 :                         // direction.
     800            1 :                         continue
     801              :                 }
     802            1 :                 if m.heap.cmp(item.iterKV.K.UserKey, l.tombstone.Start) < 0 {
     803            1 :                         // The current key is before the tombstone start key.
     804            1 :                         //
     805            1 :                         // NB: for the case that this l.rangeDelIter is provided by a levelIter we know that
     806            1 :                         // the levelIter must be positioned at a key < item.iterKV. So it is sufficient to seek the
     807            1 :                         // current l.rangeDelIter (since any range del iterators that will be provided by the
     808            1 :                         // levelIter in the future cannot contain item.iterKV). Also, it is it is possible that we
     809            1 :                         // will encounter parts of the range delete that should be ignored -- we handle that
     810            1 :                         // below.
     811            1 : 
     812            1 :                         tomb, err := keyspan.SeekLE(m.heap.cmp, l.rangeDelIter, item.iterKV.K.UserKey)
     813            1 :                         if err != nil {
     814            0 :                                 return false, err
     815            0 :                         }
     816            1 :                         l.tombstone = tomb
     817              :                 }
     818            1 :                 if l.tombstone == nil {
     819            1 :                         continue
     820              :                 }
     821            1 :                 if l.tombstone.VisibleAt(m.snapshot) && m.heap.cmp(l.tombstone.End, item.iterKV.K.UserKey) > 0 {
     822            1 :                         if level < item.index {
     823            1 :                                 // We could also do m.seekLT(..., level + 1). The levels from
     824            1 :                                 // [level + 1, item.index) are already before item.iterKV so seeking them may be
     825            1 :                                 // wasteful.
     826            1 : 
     827            1 :                                 // We can seek up to tombstone.Start.UserKey.
     828            1 :                                 //
     829            1 :                                 // Progress argument: We know that the iterator in this file is positioned within
     830            1 :                                 // its bounds and at a key X < item.iterKV (otherwise it would be the max of the heap).
     831            1 :                                 // So smallestUserKey <= item.iterKV.UserKey and we already know that
     832            1 :                                 // l.tombstone.Start.UserKey <= item.iterKV.UserKey. So the seekKey computed below
     833            1 :                                 // is <= item.iterKV.UserKey, and since we do a seekLT() we will make backwards
     834            1 :                                 // progress.
     835            1 :                                 m.seekKeyBuf = append(m.seekKeyBuf[:0], l.tombstone.Start...)
     836            1 :                                 seekKey := m.seekKeyBuf
     837            1 :                                 // We set the relative-seek flag. This is important when
     838            1 :                                 // iterating with lazy combined iteration. If there's a range
     839            1 :                                 // key between this level's current file and the file the seek
     840            1 :                                 // will land on, we need to detect it in order to trigger
     841            1 :                                 // construction of the combined iterator.
     842            1 :                                 if err := m.seekLT(seekKey, item.index, base.SeekLTFlagsNone.EnableRelativeSeek()); err != nil {
     843            0 :                                         return false, err
     844            0 :                                 }
     845            1 :                                 return true, nil
     846              :                         }
     847            1 :                         if l.tombstone.CoversAt(m.snapshot, item.iterKV.SeqNum()) {
     848            1 :                                 if err := m.prevEntry(item); err != nil {
     849            0 :                                         return false, err
     850            0 :                                 }
     851            1 :                                 return true, nil
     852              :                         }
     853              :                 }
     854              :         }
     855            1 :         return false, nil
     856              : }
     857              : 
     858              : // Starting from the current entry, finds the first (prev) entry that can be returned.
     859              : //
     860              : // If an error occurs, m.err is updated to hold the error and findNextentry
     861              : // returns a nil internal key.
     862            1 : func (m *mergingIter) findPrevEntry() *base.InternalKV {
     863            1 :         for m.heap.len() > 0 && m.err == nil {
     864            1 :                 item := m.heap.items[0]
     865            1 : 
     866            1 :                 // The levelIter internal iterator will interleave exclusive sentinel
     867            1 :                 // keys to keep files open until their range deletions are no longer
     868            1 :                 // necessary. Sometimes these are interleaved with the user key of a
     869            1 :                 // file's smallest key, in which case they may simply be stepped over to
     870            1 :                 // move to the next file in the backward direction. Other times they're
     871            1 :                 // interleaved at the user key of the user-iteration boundary, if that
     872            1 :                 // falls within the bounds of a file. In the latter case, there are no
     873            1 :                 // more keys ≥ m.lower, and we can stop iterating.
     874            1 :                 //
     875            1 :                 // We perform a key comparison to differentiate between these two cases.
     876            1 :                 // This key comparison is considered okay because it only happens for
     877            1 :                 // sentinel keys. It may be eliminated after #2863.
     878            1 :                 if m.levels[item.index].iterKV.K.IsExclusiveSentinel() {
     879            1 :                         if m.lower != nil && m.heap.cmp(m.levels[item.index].iterKV.K.UserKey, m.lower) <= 0 {
     880            1 :                                 break
     881              :                         }
     882              :                         // This key is the smallest boundary of a file and can be skipped
     883              :                         // now that the file's range deletions are no longer relevant.
     884            1 :                         m.err = m.prevEntry(item)
     885            1 :                         if m.err != nil {
     886            0 :                                 return nil
     887            0 :                         }
     888            1 :                         continue
     889              :                 }
     890              : 
     891            1 :                 m.addItemStats(item)
     892            1 :                 if isDeleted, err := m.isPrevEntryDeleted(item); err != nil {
     893            0 :                         m.err = err
     894            0 :                         return nil
     895            1 :                 } else if isDeleted {
     896            1 :                         m.stats.PointsCoveredByRangeTombstones++
     897            1 :                         continue
     898              :                 }
     899            1 :                 if item.iterKV.Visible(m.snapshot, m.batchSnapshot) {
     900            1 :                         return item.iterKV
     901            1 :                 }
     902            1 :                 m.err = m.prevEntry(item)
     903              :         }
     904            1 :         return nil
     905              : }
     906              : 
     907              : // Seeks levels >= level to >= key. Additionally uses range tombstones to extend the seeks.
     908              : //
     909              : // If an error occurs, seekGE returns the error without setting m.err.
     910            1 : func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) error {
     911            1 :         // When seeking, we can use tombstones to adjust the key we seek to on each
     912            1 :         // level. Consider the series of range tombstones:
     913            1 :         //
     914            1 :         //   1: a---e
     915            1 :         //   2:    d---h
     916            1 :         //   3:       g---k
     917            1 :         //   4:          j---n
     918            1 :         //   5:             m---q
     919            1 :         //
     920            1 :         // If we SeekGE("b") we also find the tombstone "b" resides within in the
     921            1 :         // first level which is [a,e). Regardless of whether this tombstone deletes
     922            1 :         // "b" in that level, we know it deletes "b" in all lower levels, so we
     923            1 :         // adjust the search key in the next level to the tombstone end key "e". We
     924            1 :         // then SeekGE("e") in the second level and find the corresponding tombstone
     925            1 :         // [d,h). This process continues and we end up seeking for "h" in the 3rd
     926            1 :         // level, "k" in the 4th level and "n" in the last level.
     927            1 :         //
     928            1 :         // TODO(peter,rangedel): In addition to the above we can delay seeking a
     929            1 :         // level (and any lower levels) when the current iterator position is
     930            1 :         // contained within a range tombstone at a higher level.
     931            1 : 
     932            1 :         // Deterministically disable the TrySeekUsingNext optimizations sometimes in
     933            1 :         // invariant builds to encourage the metamorphic tests to surface bugs. Note
     934            1 :         // that we cannot disable the optimization within individual levels. It must
     935            1 :         // be disabled for all levels or none. If one lower-level iterator performs
     936            1 :         // a fresh seek whereas another takes advantage of its current iterator
     937            1 :         // position, the heap can become inconsistent. Consider the following
     938            1 :         // example:
     939            1 :         //
     940            1 :         //     L5:  [ [b-c) ]  [ d ]*
     941            1 :         //     L6:  [  b ]           [e]*
     942            1 :         //
     943            1 :         // Imagine a SeekGE(a). The [b-c) range tombstone deletes the L6 point key
     944            1 :         // 'b', resulting in the iterator positioned at d with the heap:
     945            1 :         //
     946            1 :         //     {L5: d, L6: e}
     947            1 :         //
     948            1 :         // A subsequent SeekGE(b) is seeking to a larger key, so the caller may set
     949            1 :         // TrySeekUsingNext()=true. If the L5 iterator used the TrySeekUsingNext
     950            1 :         // optimization but the L6 iterator did not, the iterator would have the
     951            1 :         // heap:
     952            1 :         //
     953            1 :         //     {L6: b, L5: d}
     954            1 :         //
     955            1 :         // Because the L5 iterator has already advanced to the next sstable, the
     956            1 :         // merging iterator cannot observe the [b-c) range tombstone and will
     957            1 :         // mistakenly return L6's deleted point key 'b'.
     958            1 :         if testingDisableSeekOpt(key, uintptr(unsafe.Pointer(m))) && !m.forceEnableSeekOpt {
     959            1 :                 flags = flags.DisableTrySeekUsingNext()
     960            1 :         }
     961              : 
     962            1 :         for ; level < len(m.levels); level++ {
     963            1 :                 if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
     964            0 :                         m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
     965            0 :                 }
     966              : 
     967            1 :                 l := &m.levels[level]
     968            1 :                 if m.prefix != nil {
     969            1 :                         l.iterKV = l.iter.SeekPrefixGE(m.prefix, key, flags)
     970            1 :                         if l.iterKV != nil {
     971            1 :                                 if !bytes.Equal(m.prefix, m.split.Prefix(l.iterKV.K.UserKey)) {
     972            1 :                                         // Prevent keys without a matching prefix from being added to the heap by setting
     973            1 :                                         // iterKey and iterValue to their zero values before calling initMinHeap.
     974            1 :                                         l.iterKV = nil
     975            1 :                                 }
     976              :                         }
     977            1 :                 } else {
     978            1 :                         l.iterKV = l.iter.SeekGE(key, flags)
     979            1 :                 }
     980            1 :                 if l.iterKV == nil {
     981            1 :                         if err := l.iter.Error(); err != nil {
     982            0 :                                 return err
     983            0 :                         }
     984              :                 }
     985              : 
     986              :                 // If this level contains overlapping range tombstones, alter the seek
     987              :                 // key accordingly. Caveat: If we're performing lazy-combined iteration,
     988              :                 // we cannot alter the seek key: Range tombstones don't delete range
     989              :                 // keys, and there might exist live range keys within the range
     990              :                 // tombstone's span that need to be observed to trigger a switch to
     991              :                 // combined iteration.
     992            1 :                 if rangeDelIter := l.rangeDelIter; rangeDelIter != nil &&
     993            1 :                         (m.combinedIterState == nil || m.combinedIterState.initialized) {
     994            1 :                         // The level has a range-del iterator. Find the tombstone containing
     995            1 :                         // the search key.
     996            1 :                         var err error
     997            1 :                         l.tombstone, err = rangeDelIter.SeekGE(key)
     998            1 :                         if err != nil {
     999            0 :                                 return err
    1000            0 :                         }
    1001            1 :                         if l.tombstone != nil && l.tombstone.VisibleAt(m.snapshot) && m.heap.cmp(l.tombstone.Start, key) <= 0 {
    1002            1 :                                 // Based on the containment condition tombstone.End > key, so
    1003            1 :                                 // the assignment to key results in a monotonically
    1004            1 :                                 // non-decreasing key across iterations of this loop.
    1005            1 :                                 //
    1006            1 :                                 // The adjustment of key here can only move it to a larger key.
    1007            1 :                                 // Since the caller of seekGE guaranteed that the original key
    1008            1 :                                 // was greater than or equal to m.lower, the new key will
    1009            1 :                                 // continue to be greater than or equal to m.lower.
    1010            1 :                                 key = l.tombstone.End
    1011            1 :                         }
    1012              :                 }
    1013              :         }
    1014            1 :         return m.initMinHeap()
    1015              : }
    1016              : 
    1017            0 : func (m *mergingIter) String() string {
    1018            0 :         return "merging"
    1019            0 : }
    1020              : 
    1021              : // SeekGE implements base.InternalIterator.SeekGE. Note that SeekGE only checks
    1022              : // the upper bound. It is up to the caller to ensure that key is greater than
    1023              : // or equal to the lower bound.
    1024            1 : func (m *mergingIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
    1025            1 :         m.prefix = nil
    1026            1 :         m.err = m.seekGE(key, 0 /* start level */, flags)
    1027            1 :         if m.err != nil {
    1028            0 :                 return nil
    1029            0 :         }
    1030            1 :         return m.findNextEntry()
    1031              : }
    1032              : 
    1033              : // SeekPrefixGE implements base.InternalIterator.SeekPrefixGE.
    1034            1 : func (m *mergingIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
    1035            1 :         return m.SeekPrefixGEStrict(prefix, key, flags)
    1036            1 : }
    1037              : 
    1038              : // SeekPrefixGEStrict implements topLevelIterator.SeekPrefixGEStrict. Note that
    1039              : // SeekPrefixGEStrict explicitly checks that the key has a matching prefix.
    1040              : func (m *mergingIter) SeekPrefixGEStrict(
    1041              :         prefix, key []byte, flags base.SeekGEFlags,
    1042            1 : ) *base.InternalKV {
    1043            1 :         m.prefix = prefix
    1044            1 :         m.err = m.seekGE(key, 0 /* start level */, flags)
    1045            1 :         if m.err != nil {
    1046            0 :                 return nil
    1047            0 :         }
    1048              : 
    1049            1 :         iterKV := m.findNextEntry()
    1050            1 :         if invariants.Enabled && iterKV != nil {
    1051            1 :                 if !bytes.Equal(m.prefix, m.split.Prefix(iterKV.K.UserKey)) {
    1052            0 :                         m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
    1053            0 :                 }
    1054              :         }
    1055            1 :         return iterKV
    1056              : }
    1057              : 
    1058              : // Seeks levels >= level to < key. Additionally uses range tombstones to extend the seeks.
    1059            1 : func (m *mergingIter) seekLT(key []byte, level int, flags base.SeekLTFlags) error {
    1060            1 :         // See the comment in seekGE regarding using tombstones to adjust the seek
    1061            1 :         // target per level.
    1062            1 :         m.prefix = nil
    1063            1 :         for ; level < len(m.levels); level++ {
    1064            1 :                 if invariants.Enabled && m.upper != nil && m.heap.cmp(key, m.upper) > 0 {
    1065            0 :                         m.logger.Fatalf("mergingIter: upper bound violation: %s > %s\n%s", key, m.upper, debug.Stack())
    1066            0 :                 }
    1067              : 
    1068            1 :                 l := &m.levels[level]
    1069            1 :                 l.iterKV = l.iter.SeekLT(key, flags)
    1070            1 :                 if l.iterKV == nil {
    1071            1 :                         if err := l.iter.Error(); err != nil {
    1072            0 :                                 return err
    1073            0 :                         }
    1074              :                 }
    1075              : 
    1076              :                 // If this level contains overlapping range tombstones, alter the seek
    1077              :                 // key accordingly. Caveat: If we're performing lazy-combined iteration,
    1078              :                 // we cannot alter the seek key: Range tombstones don't delete range
    1079              :                 // keys, and there might exist live range keys within the range
    1080              :                 // tombstone's span that need to be observed to trigger a switch to
    1081              :                 // combined iteration.
    1082            1 :                 if rangeDelIter := l.rangeDelIter; rangeDelIter != nil &&
    1083            1 :                         (m.combinedIterState == nil || m.combinedIterState.initialized) {
    1084            1 :                         // The level has a range-del iterator. Find the tombstone containing
    1085            1 :                         // the search key.
    1086            1 :                         tomb, err := keyspan.SeekLE(m.heap.cmp, rangeDelIter, key)
    1087            1 :                         if err != nil {
    1088            0 :                                 return err
    1089            0 :                         }
    1090            1 :                         l.tombstone = tomb
    1091            1 :                         // Since SeekLT is exclusive on `key` and a tombstone's end key is
    1092            1 :                         // also exclusive, a seek key equal to a tombstone's end key still
    1093            1 :                         // enables the seek optimization (Note this is different than the
    1094            1 :                         // check performed by (*keyspan.Span).Contains).
    1095            1 :                         if l.tombstone != nil && l.tombstone.VisibleAt(m.snapshot) &&
    1096            1 :                                 m.heap.cmp(key, l.tombstone.End) <= 0 {
    1097            1 :                                 // NB: Based on the containment condition
    1098            1 :                                 // tombstone.Start.UserKey <= key, so the assignment to key
    1099            1 :                                 // results in a monotonically non-increasing key across
    1100            1 :                                 // iterations of this loop.
    1101            1 :                                 //
    1102            1 :                                 // The adjustment of key here can only move it to a smaller key.
    1103            1 :                                 // Since the caller of seekLT guaranteed that the original key
    1104            1 :                                 // was less than or equal to m.upper, the new key will continue
    1105            1 :                                 // to be less than or equal to m.upper.
    1106            1 :                                 key = l.tombstone.Start
    1107            1 :                         }
    1108              :                 }
    1109              :         }
    1110              : 
    1111            1 :         return m.initMaxHeap()
    1112              : }
    1113              : 
    1114              : // SeekLT implements base.InternalIterator.SeekLT. Note that SeekLT only checks
    1115              : // the lower bound. It is up to the caller to ensure that key is less than the
    1116              : // upper bound.
    1117            1 : func (m *mergingIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
    1118            1 :         m.prefix = nil
    1119            1 :         m.err = m.seekLT(key, 0 /* start level */, flags)
    1120            1 :         if m.err != nil {
    1121            0 :                 return nil
    1122            0 :         }
    1123            1 :         return m.findPrevEntry()
    1124              : }
    1125              : 
    1126              : // First implements base.InternalIterator.First. Note that First only checks
    1127              : // the upper bound. It is up to the caller to ensure that key is greater than
    1128              : // or equal to the lower bound (e.g. via a call to SeekGE(lower)).
    1129            1 : func (m *mergingIter) First() *base.InternalKV {
    1130            1 :         m.err = nil // clear cached iteration error
    1131            1 :         m.prefix = nil
    1132            1 :         m.heap.items = m.heap.items[:0]
    1133            1 :         for i := range m.levels {
    1134            1 :                 l := &m.levels[i]
    1135            1 :                 l.iterKV = l.iter.First()
    1136            1 :                 if l.iterKV == nil {
    1137            1 :                         if m.err = l.iter.Error(); m.err != nil {
    1138            0 :                                 return nil
    1139            0 :                         }
    1140              :                 }
    1141              :         }
    1142            1 :         if m.err = m.initMinHeap(); m.err != nil {
    1143            0 :                 return nil
    1144            0 :         }
    1145            1 :         return m.findNextEntry()
    1146              : }
    1147              : 
    1148              : // Last implements base.InternalIterator.Last. Note that Last only checks the
    1149              : // lower bound. It is up to the caller to ensure that key is less than the
    1150              : // upper bound (e.g. via a call to SeekLT(upper))
    1151            1 : func (m *mergingIter) Last() *base.InternalKV {
    1152            1 :         m.err = nil // clear cached iteration error
    1153            1 :         m.prefix = nil
    1154            1 :         for i := range m.levels {
    1155            1 :                 l := &m.levels[i]
    1156            1 :                 l.iterKV = l.iter.Last()
    1157            1 :                 if l.iterKV == nil {
    1158            1 :                         if m.err = l.iter.Error(); m.err != nil {
    1159            0 :                                 return nil
    1160            0 :                         }
    1161              :                 }
    1162              :         }
    1163            1 :         if m.err = m.initMaxHeap(); m.err != nil {
    1164            0 :                 return nil
    1165            0 :         }
    1166            1 :         return m.findPrevEntry()
    1167              : }
    1168              : 
    1169            1 : func (m *mergingIter) Next() *base.InternalKV {
    1170            1 :         if m.err != nil {
    1171            0 :                 return nil
    1172            0 :         }
    1173              : 
    1174            1 :         if m.dir != 1 {
    1175            1 :                 if m.err = m.switchToMinHeap(); m.err != nil {
    1176            0 :                         return nil
    1177            0 :                 }
    1178            1 :                 return m.findNextEntry()
    1179              :         }
    1180              : 
    1181            1 :         if m.heap.len() == 0 {
    1182            0 :                 return nil
    1183            0 :         }
    1184              : 
    1185              :         // NB: It's okay to call nextEntry directly even during prefix iteration
    1186              :         // mode. During prefix iteration mode, we rely on the caller to not call
    1187              :         // Next if the iterator has already advanced beyond the iteration prefix.
    1188              :         // See the comment above the base.InternalIterator interface.
    1189            1 :         if m.err = m.nextEntry(m.heap.items[0], nil /* succKey */); m.err != nil {
    1190            0 :                 return nil
    1191            0 :         }
    1192              : 
    1193            1 :         iterKV := m.findNextEntry()
    1194            1 :         if invariants.Enabled && m.prefix != nil && iterKV != nil {
    1195            1 :                 if !bytes.Equal(m.prefix, m.split.Prefix(iterKV.K.UserKey)) {
    1196            0 :                         m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
    1197            0 :                 }
    1198              :         }
    1199            1 :         return iterKV
    1200              : }
    1201              : 
    1202            1 : func (m *mergingIter) NextPrefix(succKey []byte) *base.InternalKV {
    1203            1 :         if m.dir != 1 {
    1204            0 :                 panic("pebble: cannot switch directions with NextPrefix")
    1205              :         }
    1206            1 :         if m.err != nil || m.heap.len() == 0 {
    1207            0 :                 return nil
    1208            0 :         }
    1209            1 :         if m.levelsPositioned == nil {
    1210            1 :                 m.levelsPositioned = make([]bool, len(m.levels))
    1211            1 :         } else {
    1212            1 :                 for i := range m.levelsPositioned {
    1213            1 :                         m.levelsPositioned[i] = false
    1214            1 :                 }
    1215              :         }
    1216              : 
    1217              :         // The heap root necessarily must be positioned at a key < succKey, because
    1218              :         // NextPrefix was invoked.
    1219            1 :         root := m.heap.items[0]
    1220            1 :         if invariants.Enabled && m.heap.cmp((*root).iterKV.K.UserKey, succKey) >= 0 {
    1221            0 :                 m.logger.Fatalf("pebble: invariant violation: NextPrefix(%q) called on merging iterator already positioned at %q",
    1222            0 :                         succKey, (*root).iterKV)
    1223            0 :         }
    1224              :         // NB: root is the heap root before we call nextEntry; nextEntry may change
    1225              :         // the heap root, so we must not `root` to still be the root of the heap, or
    1226              :         // even to be in the heap if the level's iterator becomes exhausted.
    1227            1 :         if m.err = m.nextEntry(root, succKey); m.err != nil {
    1228            0 :                 return nil
    1229            0 :         }
    1230              :         // We only consider the level to be conclusively positioned at the next
    1231              :         // prefix if our call to nextEntry did not advance the level onto a range
    1232              :         // deletion's boundary. Range deletions may have bounds within the prefix
    1233              :         // that are still surfaced by NextPrefix.
    1234            1 :         m.levelsPositioned[root.index] = root.iterKV == nil || !root.iterKV.K.IsExclusiveSentinel()
    1235            1 : 
    1236            1 :         for m.heap.len() > 0 {
    1237            1 :                 root := m.heap.items[0]
    1238            1 :                 if m.levelsPositioned[root.index] {
    1239            1 :                         // A level we've previously positioned is at the top of the heap, so
    1240            1 :                         // there are no other levels positioned at keys < succKey. We've
    1241            1 :                         // advanced as far as we need to.
    1242            1 :                         break
    1243              :                 }
    1244              :                 // If the current heap root is a sentinel key, we need to skip it.
    1245              :                 // Calling NextPrefix while positioned at a sentinel key is not
    1246              :                 // supported.
    1247            1 :                 if root.iterKV.K.IsExclusiveSentinel() {
    1248            1 :                         if m.err = m.nextEntry(root, nil); m.err != nil {
    1249            0 :                                 return nil
    1250            0 :                         }
    1251            1 :                         continue
    1252              :                 }
    1253              : 
    1254              :                 // Since this level was not the original heap root when NextPrefix was
    1255              :                 // called, we don't know whether this level's current key has the
    1256              :                 // previous prefix or a new one.
    1257            1 :                 if m.heap.cmp(root.iterKV.K.UserKey, succKey) >= 0 {
    1258            1 :                         break
    1259              :                 }
    1260            1 :                 if m.err = m.nextEntry(root, succKey); m.err != nil {
    1261            0 :                         return nil
    1262            0 :                 }
    1263              :                 // We only consider the level to be conclusively positioned at the next
    1264              :                 // prefix if our call to nextEntry did not land onto a range deletion's
    1265              :                 // boundary. Range deletions may have bounds within the prefix that are
    1266              :                 // still surfaced by NextPrefix.
    1267            1 :                 m.levelsPositioned[root.index] = root.iterKV == nil || !root.iterKV.K.IsExclusiveSentinel()
    1268              :         }
    1269            1 :         return m.findNextEntry()
    1270              : }
    1271              : 
    1272            1 : func (m *mergingIter) Prev() *base.InternalKV {
    1273            1 :         if m.err != nil {
    1274            0 :                 return nil
    1275            0 :         }
    1276              : 
    1277            1 :         if m.dir != -1 {
    1278            1 :                 if m.prefix != nil {
    1279            0 :                         m.err = errors.New("pebble: unsupported reverse prefix iteration")
    1280            0 :                         return nil
    1281            0 :                 }
    1282            1 :                 if m.err = m.switchToMaxHeap(); m.err != nil {
    1283            0 :                         return nil
    1284            0 :                 }
    1285            1 :                 return m.findPrevEntry()
    1286              :         }
    1287              : 
    1288            1 :         if m.heap.len() == 0 {
    1289            0 :                 return nil
    1290            0 :         }
    1291            1 :         if m.err = m.prevEntry(m.heap.items[0]); m.err != nil {
    1292            0 :                 return nil
    1293            0 :         }
    1294            1 :         return m.findPrevEntry()
    1295              : }
    1296              : 
    1297            1 : func (m *mergingIter) Error() error {
    1298            1 :         if m.heap.len() == 0 || m.err != nil {
    1299            1 :                 return m.err
    1300            1 :         }
    1301            1 :         return m.levels[m.heap.items[0].index].iter.Error()
    1302              : }
    1303              : 
    1304            1 : func (m *mergingIter) Close() error {
    1305            1 :         for i := range m.levels {
    1306            1 :                 iter := m.levels[i].iter
    1307            1 :                 if err := iter.Close(); err != nil && m.err == nil {
    1308            0 :                         m.err = err
    1309            0 :                 }
    1310            1 :                 m.levels[i].setRangeDelIter(nil)
    1311              :         }
    1312            1 :         m.levels = nil
    1313            1 :         m.heap.items = m.heap.items[:0]
    1314            1 :         return m.err
    1315              : }
    1316              : 
    1317            1 : func (m *mergingIter) SetBounds(lower, upper []byte) {
    1318            1 :         m.prefix = nil
    1319            1 :         m.lower = lower
    1320            1 :         m.upper = upper
    1321            1 :         for i := range m.levels {
    1322            1 :                 m.levels[i].iter.SetBounds(lower, upper)
    1323            1 :         }
    1324            1 :         m.heap.clear()
    1325              : }
    1326              : 
    1327            0 : func (m *mergingIter) SetContext(ctx context.Context) {
    1328            0 :         for i := range m.levels {
    1329            0 :                 m.levels[i].iter.SetContext(ctx)
    1330            0 :         }
    1331              : }
    1332              : 
    1333              : // DebugTree is part of the InternalIterator interface.
    1334            0 : func (m *mergingIter) DebugTree(tp treeprinter.Node) {
    1335            0 :         n := tp.Childf("%T(%p)", m, m)
    1336            0 :         for i := range m.levels {
    1337            0 :                 if iter := m.levels[i].iter; iter != nil {
    1338            0 :                         iter.DebugTree(n)
    1339            0 :                 }
    1340              :         }
    1341              : }
    1342              : 
    1343            0 : func (m *mergingIter) DebugString() string {
    1344            0 :         var buf bytes.Buffer
    1345            0 :         sep := ""
    1346            0 :         for m.heap.len() > 0 {
    1347            0 :                 item := m.heap.pop()
    1348            0 :                 fmt.Fprintf(&buf, "%s%s", sep, item.iterKV.K)
    1349            0 :                 sep = " "
    1350            0 :         }
    1351            0 :         var err error
    1352            0 :         if m.dir == 1 {
    1353            0 :                 err = m.initMinHeap()
    1354            0 :         } else {
    1355            0 :                 err = m.initMaxHeap()
    1356            0 :         }
    1357            0 :         if err != nil {
    1358            0 :                 fmt.Fprintf(&buf, "err=<%s>", err)
    1359            0 :         }
    1360            0 :         return buf.String()
    1361              : }
    1362              : 
    1363            1 : func (m *mergingIter) ForEachLevelIter(fn func(li *levelIter) bool) {
    1364            1 :         for _, ml := range m.levels {
    1365            1 :                 if ml.levelIter != nil {
    1366            1 :                         if done := fn(ml.levelIter); done {
    1367            1 :                                 break
    1368              :                         }
    1369              :                 }
    1370              :         }
    1371              : }
    1372              : 
    1373            1 : func (m *mergingIter) addItemStats(l *mergingIterLevel) {
    1374            1 :         m.stats.PointCount++
    1375            1 :         m.stats.KeyBytes += uint64(len(l.iterKV.K.UserKey))
    1376            1 :         m.stats.ValueBytes += uint64(len(l.iterKV.V.ValueOrHandle))
    1377            1 : }
    1378              : 
    1379              : var _ internalIterator = &mergingIter{}
        

Generated by: LCOV version 2.0-1