LCOV - code coverage report
Current view: top level - pebble - level_checker.go (source / functions) Hit Total Coverage
Test: 2024-11-09 08:16Z b2cb561a - tests only.lcov Lines: 413 491 84.1 %
Date: 2024-11-09 08:16:35 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "io"
      11             :         "sort"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/keyspan"
      16             :         "github.com/cockroachdb/pebble/internal/manifest"
      17             : )
      18             : 
      19             : // This file implements DB.CheckLevels() which checks that every entry in the
      20             : // DB is consistent with respect to the level invariant: any point (or the
      21             : // infinite number of points in a range tombstone) has a seqnum such that a
      22             : // point with the same UserKey at a lower level has a lower seqnum. This is an
      23             : // expensive check since it involves iterating over all the entries in the DB,
      24             : // hence only intended for tests or tools.
      25             : //
      26             : // If we ignore range tombstones, the consistency checking of points can be
      27             : // done with a simplified version of mergingIter. simpleMergingIter is that
      28             : // simplified version of mergingIter that only needs to step through points
      29             : // (analogous to only doing Next()). It can also easily accommodate
      30             : // consistency checking of points relative to range tombstones.
      31             : // simpleMergingIter does not do any seek optimizations present in mergingIter
      32             : // (it minimally needs to seek the range delete iterators to position them at
      33             : // or past the current point) since it does not want to miss points for
      34             : // purposes of consistency checking.
      35             : //
      36             : // Mutual consistency of range tombstones is non-trivial to check. One needs
      37             : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
      38             : // a lower level. The start key of the former is not contained in the latter
      39             : // and we can't use the exclusive end key, c, for a containment check since it
      40             : // is the sentinel key. We observe that if these tombstones were fragmented
      41             : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
      42             : // [b, c)#10 at the lower level and then it is is trivial to compare the two
      43             : // [b, c) tombstones. Note that this fragmentation needs to take into account
      44             : // that tombstones in a file may be untruncated and need to act within the
      45             : // bounds of the file. This checking is performed by checkRangeTombstones()
      46             : // and its helper functions.
      47             : 
      48             : // The per-level structure used by simpleMergingIter.
      49             : type simpleMergingIterLevel struct {
      50             :         iter         internalIterator
      51             :         rangeDelIter keyspan.FragmentIterator
      52             : 
      53             :         iterKV    *base.InternalKV
      54             :         tombstone *keyspan.Span
      55             : }
      56             : 
      57           1 : func (ml *simpleMergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
      58           1 :         ml.tombstone = nil
      59           1 :         if ml.rangeDelIter != nil {
      60           1 :                 ml.rangeDelIter.Close()
      61           1 :         }
      62           1 :         ml.rangeDelIter = iter
      63             : }
      64             : 
      65             : type simpleMergingIter struct {
      66             :         levels   []simpleMergingIterLevel
      67             :         snapshot base.SeqNum
      68             :         heap     simpleMergingIterHeap
      69             :         // The last point's key and level. For validation.
      70             :         lastKey     InternalKey
      71             :         lastLevel   int
      72             :         lastIterMsg string
      73             :         // A non-nil valueMerger means MERGE record processing is ongoing.
      74             :         valueMerger base.ValueMerger
      75             :         // The first error will cause step() to return false.
      76             :         err       error
      77             :         numPoints int64
      78             :         merge     Merge
      79             :         formatKey base.FormatKey
      80             : }
      81             : 
      82             : func (m *simpleMergingIter) init(
      83             :         merge Merge,
      84             :         cmp Compare,
      85             :         snapshot base.SeqNum,
      86             :         formatKey base.FormatKey,
      87             :         levels ...simpleMergingIterLevel,
      88           1 : ) {
      89           1 :         m.levels = levels
      90           1 :         m.formatKey = formatKey
      91           1 :         m.merge = merge
      92           1 :         m.snapshot = snapshot
      93           1 :         m.lastLevel = -1
      94           1 :         m.heap.cmp = cmp
      95           1 :         m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
      96           1 :         for i := range m.levels {
      97           1 :                 l := &m.levels[i]
      98           1 :                 l.iterKV = l.iter.First()
      99           1 :                 if l.iterKV != nil {
     100           1 :                         item := simpleMergingIterItem{
     101           1 :                                 index: i,
     102           1 :                                 value: l.iterKV.V,
     103           1 :                         }
     104           1 :                         item.key = l.iterKV.K.Clone()
     105           1 :                         m.heap.items = append(m.heap.items, item)
     106           1 :                 }
     107             :         }
     108           1 :         m.heap.init()
     109           1 : 
     110           1 :         if m.heap.len() == 0 {
     111           1 :                 return
     112           1 :         }
     113           1 :         m.positionRangeDels()
     114             : }
     115             : 
     116             : // Positions all the rangedel iterators at or past the current top of the
     117             : // heap, using SeekGE().
     118           1 : func (m *simpleMergingIter) positionRangeDels() {
     119           1 :         item := &m.heap.items[0]
     120           1 :         for i := range m.levels {
     121           1 :                 l := &m.levels[i]
     122           1 :                 if l.rangeDelIter == nil {
     123           1 :                         continue
     124             :                 }
     125           1 :                 t, err := l.rangeDelIter.SeekGE(item.key.UserKey)
     126           1 :                 m.err = firstError(m.err, err)
     127           1 :                 l.tombstone = t
     128             :         }
     129             : }
     130             : 
     131             : // Returns true if not yet done.
     132           1 : func (m *simpleMergingIter) step() bool {
     133           1 :         if m.heap.len() == 0 || m.err != nil {
     134           1 :                 return false
     135           1 :         }
     136           1 :         item := &m.heap.items[0]
     137           1 :         l := &m.levels[item.index]
     138           1 :         // Sentinels are not relevant for this point checking.
     139           1 :         if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot, base.SeqNumMax) {
     140           1 :                 // This is a visible point key.
     141           1 :                 if !m.handleVisiblePoint(item, l) {
     142           0 :                         return false
     143           0 :                 }
     144             :         }
     145             : 
     146             :         // The iterator for the current level may be closed in the following call to
     147             :         // Next(). We save its debug string for potential use after it is closed -
     148             :         // either in this current step() invocation or on the next invocation.
     149           1 :         m.lastIterMsg = l.iter.String()
     150           1 : 
     151           1 :         // Step to the next point.
     152           1 :         l.iterKV = l.iter.Next()
     153           1 :         if l.iterKV == nil {
     154           1 :                 m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
     155           1 :                 l.iter = nil
     156           1 :                 m.heap.pop()
     157           1 :         } else {
     158           1 :                 // Check point keys in an sstable are ordered. Although not required, we check
     159           1 :                 // for memtables as well. A subtle check here is that successive sstables of
     160           1 :                 // L1 and higher levels are ordered. This happens when levelIter moves to the
     161           1 :                 // next sstable in the level, in which case item.key is previous sstable's
     162           1 :                 // last point key.
     163           1 :                 if !l.iterKV.K.IsExclusiveSentinel() && base.InternalCompare(m.heap.cmp, item.key, l.iterKV.K) >= 0 {
     164           0 :                         m.err = errors.Errorf("out of order keys %s >= %s in %s",
     165           0 :                                 item.key.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
     166           0 :                         return false
     167           0 :                 }
     168           1 :                 item.key = base.InternalKey{
     169           1 :                         Trailer: l.iterKV.K.Trailer,
     170           1 :                         UserKey: append(item.key.UserKey[:0], l.iterKV.K.UserKey...),
     171           1 :                 }
     172           1 :                 item.value = l.iterKV.V
     173           1 :                 if m.heap.len() > 1 {
     174           1 :                         m.heap.fix(0)
     175           1 :                 }
     176             :         }
     177           1 :         if m.err != nil {
     178           0 :                 return false
     179           0 :         }
     180           1 :         if m.heap.len() == 0 {
     181           1 :                 // If m.valueMerger != nil, the last record was a MERGE record.
     182           1 :                 if m.valueMerger != nil {
     183           1 :                         var closer io.Closer
     184           1 :                         var err error
     185           1 :                         _, closer, err = m.valueMerger.Finish(true /* includesBase */)
     186           1 :                         if closer != nil {
     187           0 :                                 err = errors.CombineErrors(err, closer.Close())
     188           0 :                         }
     189           1 :                         if err != nil {
     190           0 :                                 m.err = errors.CombineErrors(m.err,
     191           0 :                                         errors.Wrapf(err, "merge processing error on key %s in %s",
     192           0 :                                                 item.key.Pretty(m.formatKey), m.lastIterMsg))
     193           0 :                         }
     194           1 :                         m.valueMerger = nil
     195             :                 }
     196           1 :                 return false
     197             :         }
     198           1 :         m.positionRangeDels()
     199           1 :         return true
     200             : }
     201             : 
     202             : // handleVisiblePoint returns true if validation succeeded and level checking
     203             : // can continue.
     204             : func (m *simpleMergingIter) handleVisiblePoint(
     205             :         item *simpleMergingIterItem, l *simpleMergingIterLevel,
     206           1 : ) (ok bool) {
     207           1 :         m.numPoints++
     208           1 :         keyChanged := m.heap.cmp(item.key.UserKey, m.lastKey.UserKey) != 0
     209           1 :         if !keyChanged {
     210           1 :                 // At the same user key. We will see them in decreasing seqnum
     211           1 :                 // order so the lastLevel must not be lower.
     212           1 :                 if m.lastLevel > item.index {
     213           0 :                         m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
     214           0 :                                 item.key.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
     215           0 :                                 m.lastIterMsg)
     216           0 :                         return false
     217           0 :                 }
     218           1 :                 m.lastLevel = item.index
     219           1 :         } else {
     220           1 :                 // The user key has changed.
     221           1 :                 m.lastKey.Trailer = item.key.Trailer
     222           1 :                 m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.key.UserKey...)
     223           1 :                 m.lastLevel = item.index
     224           1 :         }
     225             :         // Ongoing series of MERGE records ends with a MERGE record.
     226           1 :         if keyChanged && m.valueMerger != nil {
     227           1 :                 var closer io.Closer
     228           1 :                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     229           1 :                 if m.err == nil && closer != nil {
     230           0 :                         m.err = closer.Close()
     231           0 :                 }
     232           1 :                 m.valueMerger = nil
     233             :         }
     234           1 :         itemValue, _, err := item.value.Value(nil)
     235           1 :         if err != nil {
     236           0 :                 m.err = err
     237           0 :                 return false
     238           0 :         }
     239           1 :         if m.valueMerger != nil {
     240           1 :                 // Ongoing series of MERGE records.
     241           1 :                 switch item.key.Kind() {
     242           1 :                 case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
     243           1 :                         var closer io.Closer
     244           1 :                         _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     245           1 :                         if m.err == nil && closer != nil {
     246           0 :                                 m.err = closer.Close()
     247           0 :                         }
     248           1 :                         m.valueMerger = nil
     249           1 :                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     250           1 :                         m.err = m.valueMerger.MergeOlder(itemValue)
     251           1 :                         if m.err == nil {
     252           1 :                                 var closer io.Closer
     253           1 :                                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     254           1 :                                 if m.err == nil && closer != nil {
     255           0 :                                         m.err = closer.Close()
     256           0 :                                 }
     257             :                         }
     258           1 :                         m.valueMerger = nil
     259           1 :                 case InternalKeyKindMerge:
     260           1 :                         m.err = m.valueMerger.MergeOlder(itemValue)
     261           0 :                 default:
     262           0 :                         m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
     263           0 :                                 item.key.Pretty(m.formatKey),
     264           0 :                                 l.iter)
     265           0 :                         return false
     266             :                 }
     267           1 :         } else if item.key.Kind() == InternalKeyKindMerge && m.err == nil {
     268           1 :                 // New series of MERGE records.
     269           1 :                 m.valueMerger, m.err = m.merge(item.key.UserKey, itemValue)
     270           1 :         }
     271           1 :         if m.err != nil {
     272           0 :                 m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
     273           0 :                         item.key.Pretty(m.formatKey), l.iter)
     274           0 :                 return false
     275           0 :         }
     276             :         // Is this point covered by a tombstone at a lower level? Note that all these
     277             :         // iterators must be positioned at a key > item.key.
     278           1 :         for level := item.index + 1; level < len(m.levels); level++ {
     279           1 :                 lvl := &m.levels[level]
     280           1 :                 if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
     281           1 :                         continue
     282             :                 }
     283           1 :                 if lvl.tombstone.Contains(m.heap.cmp, item.key.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.key.SeqNum()) {
     284           0 :                         m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
     285           0 :                                 lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.key.Pretty(m.formatKey),
     286           0 :                                 l.iter)
     287           0 :                         return false
     288           0 :                 }
     289             :         }
     290           1 :         return true
     291             : }
     292             : 
     293             : // Checking that range tombstones are mutually consistent is performed by
     294             : // checkRangeTombstones(). See the overview comment at the top of the file.
     295             : //
     296             : // We do this check as follows:
     297             : // - Collect the tombstones for each level, put them into one pool of tombstones
     298             : //   along with their level information (addTombstonesFromIter()).
     299             : // - Collect the start and end user keys from all these tombstones
     300             : //   (collectAllUserKey()) and use them to fragment all the tombstones
     301             : //   (fragmentUsingUserKey()).
     302             : // - Sort tombstones by start key and decreasing seqnum
     303             : //   (tombstonesByStartKeyAndSeqnum) - all tombstones that have the same start
     304             : //   key will have the same end key because they have been fragmented.
     305             : // - Iterate and check (iterateAndCheckTombstones()).
     306             : //
     307             : // Note that this simple approach requires holding all the tombstones across all
     308             : // levels in-memory. A more sophisticated incremental approach could be devised,
     309             : // if necessary.
     310             : 
     311             : // A tombstone and the corresponding level it was found in.
     312             : type tombstoneWithLevel struct {
     313             :         keyspan.Span
     314             :         level int
     315             :         // The level in LSM. A -1 means it's a memtable.
     316             :         lsmLevel int
     317             :         fileNum  FileNum
     318             : }
     319             : 
     320             : // For sorting tombstoneWithLevels in increasing order of start UserKey and
     321             : // for the same start UserKey in decreasing order of seqnum.
     322             : type tombstonesByStartKeyAndSeqnum struct {
     323             :         cmp Compare
     324             :         buf []tombstoneWithLevel
     325             : }
     326             : 
     327           1 : func (v *tombstonesByStartKeyAndSeqnum) Len() int { return len(v.buf) }
     328           1 : func (v *tombstonesByStartKeyAndSeqnum) Less(i, j int) bool {
     329           1 :         less := v.cmp(v.buf[i].Start, v.buf[j].Start)
     330           1 :         if less == 0 {
     331           1 :                 return v.buf[i].LargestSeqNum() > v.buf[j].LargestSeqNum()
     332           1 :         }
     333           1 :         return less < 0
     334             : }
     335           1 : func (v *tombstonesByStartKeyAndSeqnum) Swap(i, j int) {
     336           1 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
     337           1 : }
     338             : 
     339             : func iterateAndCheckTombstones(
     340             :         cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
     341           1 : ) error {
     342           1 :         sortBuf := tombstonesByStartKeyAndSeqnum{
     343           1 :                 cmp: cmp,
     344           1 :                 buf: tombstones,
     345           1 :         }
     346           1 :         sort.Sort(&sortBuf)
     347           1 : 
     348           1 :         // For a sequence of tombstones that share the same start UserKey, we will
     349           1 :         // encounter them in non-increasing seqnum order and so should encounter them
     350           1 :         // in non-decreasing level order.
     351           1 :         lastTombstone := tombstoneWithLevel{}
     352           1 :         for _, t := range tombstones {
     353           1 :                 if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
     354           0 :                         return errors.Errorf("encountered tombstone %s in %s"+
     355           0 :                                 " that has a lower seqnum than the same tombstone in %s",
     356           0 :                                 t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.fileNum),
     357           0 :                                 levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.fileNum))
     358           0 :                 }
     359           1 :                 lastTombstone = t
     360             :         }
     361           1 :         return nil
     362             : }
     363             : 
     364             : type checkConfig struct {
     365             :         logger    Logger
     366             :         comparer  *Comparer
     367             :         readState *readState
     368             :         newIters  tableNewIters
     369             :         seqNum    base.SeqNum
     370             :         stats     *CheckLevelsStats
     371             :         merge     Merge
     372             :         formatKey base.FormatKey
     373             : }
     374             : 
     375             : // cmp is shorthand for comparer.Compare.
     376           1 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
     377             : 
     378           1 : func checkRangeTombstones(c *checkConfig) error {
     379           1 :         var level int
     380           1 :         var tombstones []tombstoneWithLevel
     381           1 :         var err error
     382           1 : 
     383           1 :         memtables := c.readState.memtables
     384           1 :         for i := len(memtables) - 1; i >= 0; i-- {
     385           1 :                 iter := memtables[i].newRangeDelIter(nil)
     386           1 :                 if iter == nil {
     387           1 :                         continue
     388             :                 }
     389           1 :                 tombstones, err = addTombstonesFromIter(
     390           1 :                         iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
     391           1 :                 )
     392           1 :                 iter.Close()
     393           1 :                 if err != nil {
     394           0 :                         return err
     395           0 :                 }
     396           1 :                 level++
     397             :         }
     398             : 
     399           1 :         current := c.readState.current
     400           1 :         addTombstonesFromLevel := func(files manifest.LevelIterator, lsmLevel int) error {
     401           1 :                 for f := files.First(); f != nil; f = files.Next() {
     402           1 :                         lf := files.Take()
     403           1 :                         iters, err := c.newIters(
     404           1 :                                 context.Background(), lf.FileMetadata, &IterOptions{layer: manifest.Level(lsmLevel)},
     405           1 :                                 internalIterOpts{}, iterRangeDeletions)
     406           1 :                         if err != nil {
     407           0 :                                 return err
     408           0 :                         }
     409           1 :                         tombstones, err = addTombstonesFromIter(iters.RangeDeletion(), level, lsmLevel, f.FileNum,
     410           1 :                                 tombstones, c.seqNum, c.cmp, c.formatKey)
     411           1 :                         iters.CloseAll()
     412           1 : 
     413           1 :                         if err != nil {
     414           0 :                                 return err
     415           0 :                         }
     416             :                 }
     417           1 :                 return nil
     418             :         }
     419             :         // Now the levels with untruncated tombsones.
     420           1 :         for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
     421           1 :                 if current.L0SublevelFiles[i].Empty() {
     422           0 :                         continue
     423             :                 }
     424           1 :                 err := addTombstonesFromLevel(current.L0SublevelFiles[i].Iter(), 0)
     425           1 :                 if err != nil {
     426           0 :                         return err
     427           0 :                 }
     428           1 :                 level++
     429             :         }
     430           1 :         for i := 1; i < len(current.Levels); i++ {
     431           1 :                 if err := addTombstonesFromLevel(current.Levels[i].Iter(), i); err != nil {
     432           0 :                         return err
     433           0 :                 }
     434           1 :                 level++
     435             :         }
     436           1 :         if c.stats != nil {
     437           1 :                 c.stats.NumTombstones = len(tombstones)
     438           1 :         }
     439             :         // We now have truncated tombstones.
     440             :         // Fragment them all.
     441           1 :         userKeys := collectAllUserKeys(c.cmp, tombstones)
     442           1 :         tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
     443           1 :         return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
     444             : }
     445             : 
     446           0 : func levelOrMemtable(lsmLevel int, fileNum FileNum) string {
     447           0 :         if lsmLevel == -1 {
     448           0 :                 return "memtable"
     449           0 :         }
     450           0 :         return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, fileNum)
     451             : }
     452             : 
     453             : func addTombstonesFromIter(
     454             :         iter keyspan.FragmentIterator,
     455             :         level int,
     456             :         lsmLevel int,
     457             :         fileNum FileNum,
     458             :         tombstones []tombstoneWithLevel,
     459             :         seqNum base.SeqNum,
     460             :         cmp Compare,
     461             :         formatKey base.FormatKey,
     462           1 : ) (_ []tombstoneWithLevel, err error) {
     463           1 :         var prevTombstone keyspan.Span
     464           1 :         tomb, err := iter.First()
     465           1 :         for ; tomb != nil; tomb, err = iter.Next() {
     466           1 :                 t := tomb.Visible(seqNum)
     467           1 :                 if t.Empty() {
     468           1 :                         continue
     469             :                 }
     470           1 :                 t = t.Clone()
     471           1 :                 // This is mainly a test for rangeDelV2 formatted blocks which are expected to
     472           1 :                 // be ordered and fragmented on disk. But we anyways check for memtables,
     473           1 :                 // rangeDelV1 as well.
     474           1 :                 if cmp(prevTombstone.End, t.Start) > 0 {
     475           0 :                         return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
     476           0 :                                 prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, fileNum))
     477           0 :                 }
     478           1 :                 prevTombstone = t
     479           1 : 
     480           1 :                 if !t.Empty() {
     481           1 :                         tombstones = append(tombstones, tombstoneWithLevel{
     482           1 :                                 Span:     t,
     483           1 :                                 level:    level,
     484           1 :                                 lsmLevel: lsmLevel,
     485           1 :                                 fileNum:  fileNum,
     486           1 :                         })
     487           1 :                 }
     488             :         }
     489           1 :         if err != nil {
     490           0 :                 return nil, err
     491           0 :         }
     492           1 :         return tombstones, nil
     493             : }
     494             : 
     495             : type userKeysSort struct {
     496             :         cmp Compare
     497             :         buf [][]byte
     498             : }
     499             : 
     500           1 : func (v *userKeysSort) Len() int { return len(v.buf) }
     501           1 : func (v *userKeysSort) Less(i, j int) bool {
     502           1 :         return v.cmp(v.buf[i], v.buf[j]) < 0
     503           1 : }
     504           1 : func (v *userKeysSort) Swap(i, j int) {
     505           1 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
     506           1 : }
     507           1 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
     508           1 :         keys := make([][]byte, 0, len(tombstones)*2)
     509           1 :         for _, t := range tombstones {
     510           1 :                 keys = append(keys, t.Start)
     511           1 :                 keys = append(keys, t.End)
     512           1 :         }
     513           1 :         sorter := userKeysSort{
     514           1 :                 cmp: cmp,
     515           1 :                 buf: keys,
     516           1 :         }
     517           1 :         sort.Sort(&sorter)
     518           1 :         var last, curr int
     519           1 :         for last, curr = -1, 0; curr < len(keys); curr++ {
     520           1 :                 if last < 0 || cmp(keys[last], keys[curr]) != 0 {
     521           1 :                         last++
     522           1 :                         keys[last] = keys[curr]
     523           1 :                 }
     524             :         }
     525           1 :         keys = keys[:last+1]
     526           1 :         return keys
     527             : }
     528             : 
     529             : func fragmentUsingUserKeys(
     530             :         cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
     531           1 : ) []tombstoneWithLevel {
     532           1 :         var buf []tombstoneWithLevel
     533           1 :         for _, t := range tombstones {
     534           1 :                 // Find the first position with tombstone start < user key
     535           1 :                 i := sort.Search(len(userKeys), func(i int) bool {
     536           1 :                         return cmp(t.Start, userKeys[i]) < 0
     537           1 :                 })
     538           1 :                 for ; i < len(userKeys); i++ {
     539           1 :                         if cmp(userKeys[i], t.End) >= 0 {
     540           1 :                                 break
     541             :                         }
     542           1 :                         tPartial := t
     543           1 :                         tPartial.End = userKeys[i]
     544           1 :                         buf = append(buf, tPartial)
     545           1 :                         t.Start = userKeys[i]
     546             :                 }
     547           1 :                 buf = append(buf, t)
     548             :         }
     549           1 :         return buf
     550             : }
     551             : 
     552             : // CheckLevelsStats provides basic stats on points and tombstones encountered.
     553             : type CheckLevelsStats struct {
     554             :         NumPoints     int64
     555             :         NumTombstones int
     556             : }
     557             : 
     558             : // CheckLevels checks:
     559             : //   - Every entry in the DB is consistent with the level invariant. See the
     560             : //     comment at the top of the file.
     561             : //   - Point keys in sstables are ordered.
     562             : //   - Range delete tombstones in sstables are ordered and fragmented.
     563             : //   - Successful processing of all MERGE records.
     564           1 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
     565           1 :         // Grab and reference the current readState.
     566           1 :         readState := d.loadReadState()
     567           1 :         defer readState.unref()
     568           1 : 
     569           1 :         // Determine the seqnum to read at after grabbing the read state (current and
     570           1 :         // memtables) above.
     571           1 :         seqNum := d.mu.versions.visibleSeqNum.Load()
     572           1 : 
     573           1 :         checkConfig := &checkConfig{
     574           1 :                 logger:    d.opts.Logger,
     575           1 :                 comparer:  d.opts.Comparer,
     576           1 :                 readState: readState,
     577           1 :                 newIters:  d.newIters,
     578           1 :                 seqNum:    seqNum,
     579           1 :                 stats:     stats,
     580           1 :                 merge:     d.merge,
     581           1 :                 formatKey: d.opts.Comparer.FormatKey,
     582           1 :         }
     583           1 :         return checkLevelsInternal(checkConfig)
     584           1 : }
     585             : 
     586           1 : func checkLevelsInternal(c *checkConfig) (err error) {
     587           1 :         // Phase 1: Use a simpleMergingIter to step through all the points and ensure
     588           1 :         // that points with the same user key at different levels are not inverted
     589           1 :         // wrt sequence numbers and the same holds for tombstones that cover points.
     590           1 :         // To do this, one needs to construct a simpleMergingIter which is similar to
     591           1 :         // how one constructs a mergingIter.
     592           1 : 
     593           1 :         // Add mem tables from newest to oldest.
     594           1 :         var mlevels []simpleMergingIterLevel
     595           1 :         defer func() {
     596           1 :                 for i := range mlevels {
     597           1 :                         l := &mlevels[i]
     598           1 :                         if l.iter != nil {
     599           1 :                                 err = firstError(err, l.iter.Close())
     600           1 :                                 l.iter = nil
     601           1 :                         }
     602           1 :                         if l.rangeDelIter != nil {
     603           1 :                                 l.rangeDelIter.Close()
     604           1 :                                 l.rangeDelIter = nil
     605           1 :                         }
     606             :                 }
     607             :         }()
     608             : 
     609           1 :         memtables := c.readState.memtables
     610           1 :         for i := len(memtables) - 1; i >= 0; i-- {
     611           1 :                 mem := memtables[i]
     612           1 :                 mlevels = append(mlevels, simpleMergingIterLevel{
     613           1 :                         iter:         mem.newIter(nil),
     614           1 :                         rangeDelIter: mem.newRangeDelIter(nil),
     615           1 :                 })
     616           1 :         }
     617             : 
     618           1 :         current := c.readState.current
     619           1 :         // Determine the final size for mlevels so that there are no more
     620           1 :         // reallocations. levelIter will hold a pointer to elements in mlevels.
     621           1 :         start := len(mlevels)
     622           1 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     623           1 :                 if current.L0SublevelFiles[sublevel].Empty() {
     624           0 :                         continue
     625             :                 }
     626           1 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     627             :         }
     628           1 :         for level := 1; level < len(current.Levels); level++ {
     629           1 :                 if current.Levels[level].Empty() {
     630           1 :                         continue
     631             :                 }
     632           1 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     633             :         }
     634           1 :         mlevelAlloc := mlevels[start:]
     635           1 :         // Add L0 files by sublevel.
     636           1 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     637           1 :                 if current.L0SublevelFiles[sublevel].Empty() {
     638           0 :                         continue
     639             :                 }
     640           1 :                 manifestIter := current.L0SublevelFiles[sublevel].Iter()
     641           1 :                 iterOpts := IterOptions{logger: c.logger}
     642           1 :                 li := &levelIter{}
     643           1 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
     644           1 :                         manifest.L0Sublevel(sublevel), internalIterOpts{})
     645           1 :                 li.initRangeDel(&mlevelAlloc[0])
     646           1 :                 mlevelAlloc[0].iter = li
     647           1 :                 mlevelAlloc = mlevelAlloc[1:]
     648             :         }
     649           1 :         for level := 1; level < len(current.Levels); level++ {
     650           1 :                 if current.Levels[level].Empty() {
     651           1 :                         continue
     652             :                 }
     653             : 
     654           1 :                 iterOpts := IterOptions{logger: c.logger}
     655           1 :                 li := &levelIter{}
     656           1 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters,
     657           1 :                         current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
     658           1 :                 li.initRangeDel(&mlevelAlloc[0])
     659           1 :                 mlevelAlloc[0].iter = li
     660           1 :                 mlevelAlloc = mlevelAlloc[1:]
     661             :         }
     662             : 
     663           1 :         mergingIter := &simpleMergingIter{}
     664           1 :         mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
     665           1 :         for cont := mergingIter.step(); cont; cont = mergingIter.step() {
     666           1 :         }
     667           1 :         if err := mergingIter.err; err != nil {
     668           0 :                 return err
     669           0 :         }
     670           1 :         if c.stats != nil {
     671           1 :                 c.stats.NumPoints = mergingIter.numPoints
     672           1 :         }
     673             : 
     674             :         // Phase 2: Check that the tombstones are mutually consistent.
     675           1 :         return checkRangeTombstones(c)
     676             : }
     677             : 
     678             : type simpleMergingIterItem struct {
     679             :         index int
     680             :         key   InternalKey
     681             :         value base.LazyValue
     682             : }
     683             : 
     684             : type simpleMergingIterHeap struct {
     685             :         cmp     Compare
     686             :         reverse bool
     687             :         items   []simpleMergingIterItem
     688             : }
     689             : 
     690           1 : func (h *simpleMergingIterHeap) len() int {
     691           1 :         return len(h.items)
     692           1 : }
     693             : 
     694           1 : func (h *simpleMergingIterHeap) less(i, j int) bool {
     695           1 :         ikey, jkey := h.items[i].key, h.items[j].key
     696           1 :         if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
     697           1 :                 if h.reverse {
     698           0 :                         return c > 0
     699           0 :                 }
     700           1 :                 return c < 0
     701             :         }
     702           1 :         if h.reverse {
     703           0 :                 return ikey.Trailer < jkey.Trailer
     704           0 :         }
     705           1 :         return ikey.Trailer > jkey.Trailer
     706             : }
     707             : 
     708           1 : func (h *simpleMergingIterHeap) swap(i, j int) {
     709           1 :         h.items[i], h.items[j] = h.items[j], h.items[i]
     710           1 : }
     711             : 
     712             : // init, fix, up and down are copied from the go stdlib.
     713           1 : func (h *simpleMergingIterHeap) init() {
     714           1 :         // heapify
     715           1 :         n := h.len()
     716           1 :         for i := n/2 - 1; i >= 0; i-- {
     717           1 :                 h.down(i, n)
     718           1 :         }
     719             : }
     720             : 
     721           1 : func (h *simpleMergingIterHeap) fix(i int) {
     722           1 :         if !h.down(i, h.len()) {
     723           1 :                 h.up(i)
     724           1 :         }
     725             : }
     726             : 
     727           1 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
     728           1 :         n := h.len() - 1
     729           1 :         h.swap(0, n)
     730           1 :         h.down(0, n)
     731           1 :         item := &h.items[n]
     732           1 :         h.items = h.items[:n]
     733           1 :         return item
     734           1 : }
     735             : 
     736           1 : func (h *simpleMergingIterHeap) up(j int) {
     737           1 :         for {
     738           1 :                 i := (j - 1) / 2 // parent
     739           1 :                 if i == j || !h.less(j, i) {
     740           1 :                         break
     741             :                 }
     742           0 :                 h.swap(i, j)
     743           0 :                 j = i
     744             :         }
     745             : }
     746             : 
     747           1 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
     748           1 :         i := i0
     749           1 :         for {
     750           1 :                 j1 := 2*i + 1
     751           1 :                 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
     752           1 :                         break
     753             :                 }
     754           1 :                 j := j1 // left child
     755           1 :                 if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
     756           1 :                         j = j2 // = 2*i + 2  // right child
     757           1 :                 }
     758           1 :                 if !h.less(j, i) {
     759           1 :                         break
     760             :                 }
     761           1 :                 h.swap(i, j)
     762           1 :                 i = j
     763             :         }
     764           1 :         return i > i0
     765             : }

Generated by: LCOV version 1.14