LCOV - code coverage report
Current view: top level - pebble - level_checker.go (source / functions) Coverage Total Hit
Test: 2025-03-10 08:17Z de0c5a11 - meta test only.lcov Lines: 83.2 % 489 407
Test Date: 2025-03-10 08:18:31 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "context"
       9              :         "fmt"
      10              :         "io"
      11              :         "sort"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/keyspan"
      16              :         "github.com/cockroachdb/pebble/internal/manifest"
      17              : )
      18              : 
      19              : // This file implements DB.CheckLevels() which checks that every entry in the
      20              : // DB is consistent with respect to the level invariant: any point (or the
      21              : // infinite number of points in a range tombstone) has a seqnum such that a
      22              : // point with the same UserKey at a lower level has a lower seqnum. This is an
      23              : // expensive check since it involves iterating over all the entries in the DB,
      24              : // hence only intended for tests or tools.
      25              : //
      26              : // If we ignore range tombstones, the consistency checking of points can be
      27              : // done with a simplified version of mergingIter. simpleMergingIter is that
      28              : // simplified version of mergingIter that only needs to step through points
      29              : // (analogous to only doing Next()). It can also easily accommodate
      30              : // consistency checking of points relative to range tombstones.
      31              : // simpleMergingIter does not do any seek optimizations present in mergingIter
      32              : // (it minimally needs to seek the range delete iterators to position them at
      33              : // or past the current point) since it does not want to miss points for
      34              : // purposes of consistency checking.
      35              : //
      36              : // Mutual consistency of range tombstones is non-trivial to check. One needs
      37              : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
      38              : // a lower level. The start key of the former is not contained in the latter
      39              : // and we can't use the exclusive end key, c, for a containment check since it
      40              : // is the sentinel key. We observe that if these tombstones were fragmented
      41              : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
      42              : // [b, c)#10 at the lower level and then it is is trivial to compare the two
      43              : // [b, c) tombstones. Note that this fragmentation needs to take into account
      44              : // that tombstones in a file may be untruncated and need to act within the
      45              : // bounds of the file. This checking is performed by checkRangeTombstones()
      46              : // and its helper functions.
      47              : 
      48              : // The per-level structure used by simpleMergingIter.
      49              : type simpleMergingIterLevel struct {
      50              :         iter         internalIterator
      51              :         rangeDelIter keyspan.FragmentIterator
      52              : 
      53              :         iterKV    *base.InternalKV
      54              :         tombstone *keyspan.Span
      55              : }
      56              : 
      57            1 : func (ml *simpleMergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
      58            1 :         ml.tombstone = nil
      59            1 :         if ml.rangeDelIter != nil {
      60            1 :                 ml.rangeDelIter.Close()
      61            1 :         }
      62            1 :         ml.rangeDelIter = iter
      63              : }
      64              : 
      65              : type simpleMergingIter struct {
      66              :         levels   []simpleMergingIterLevel
      67              :         snapshot base.SeqNum
      68              :         heap     simpleMergingIterHeap
      69              :         // The last point's key and level. For validation.
      70              :         lastKey     InternalKey
      71              :         lastLevel   int
      72              :         lastIterMsg string
      73              :         // A non-nil valueMerger means MERGE record processing is ongoing.
      74              :         valueMerger base.ValueMerger
      75              :         // The first error will cause step() to return false.
      76              :         err       error
      77              :         numPoints int64
      78              :         merge     Merge
      79              :         formatKey base.FormatKey
      80              : }
      81              : 
      82              : func (m *simpleMergingIter) init(
      83              :         merge Merge,
      84              :         cmp Compare,
      85              :         snapshot base.SeqNum,
      86              :         formatKey base.FormatKey,
      87              :         levels ...simpleMergingIterLevel,
      88            1 : ) {
      89            1 :         m.levels = levels
      90            1 :         m.formatKey = formatKey
      91            1 :         m.merge = merge
      92            1 :         m.snapshot = snapshot
      93            1 :         m.lastLevel = -1
      94            1 :         m.heap.cmp = cmp
      95            1 :         m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
      96            1 :         for i := range m.levels {
      97            1 :                 l := &m.levels[i]
      98            1 :                 l.iterKV = l.iter.First()
      99            1 :                 if l.iterKV != nil {
     100            1 :                         item := simpleMergingIterItem{
     101            1 :                                 index: i,
     102            1 :                                 kv:    *l.iterKV,
     103            1 :                         }
     104            1 :                         item.kv.K = l.iterKV.K.Clone()
     105            1 :                         m.heap.items = append(m.heap.items, item)
     106            1 :                 }
     107              :         }
     108            1 :         m.heap.init()
     109            1 : 
     110            1 :         if m.heap.len() == 0 {
     111            1 :                 return
     112            1 :         }
     113            1 :         m.positionRangeDels()
     114              : }
     115              : 
     116              : // Positions all the rangedel iterators at or past the current top of the
     117              : // heap, using SeekGE().
     118            1 : func (m *simpleMergingIter) positionRangeDels() {
     119            1 :         item := &m.heap.items[0]
     120            1 :         for i := range m.levels {
     121            1 :                 l := &m.levels[i]
     122            1 :                 if l.rangeDelIter == nil {
     123            1 :                         continue
     124              :                 }
     125            1 :                 t, err := l.rangeDelIter.SeekGE(item.kv.K.UserKey)
     126            1 :                 m.err = firstError(m.err, err)
     127            1 :                 l.tombstone = t
     128              :         }
     129              : }
     130              : 
     131              : // Returns true if not yet done.
     132            1 : func (m *simpleMergingIter) step() bool {
     133            1 :         if m.heap.len() == 0 || m.err != nil {
     134            1 :                 return false
     135            1 :         }
     136            1 :         item := &m.heap.items[0]
     137            1 :         l := &m.levels[item.index]
     138            1 :         // Sentinels are not relevant for this point checking.
     139            1 :         if !item.kv.K.IsExclusiveSentinel() && item.kv.K.Visible(m.snapshot, base.SeqNumMax) {
     140            1 :                 // This is a visible point key.
     141            1 :                 if !m.handleVisiblePoint(item, l) {
     142            0 :                         return false
     143            0 :                 }
     144              :         }
     145              : 
     146              :         // The iterator for the current level may be closed in the following call to
     147              :         // Next(). We save its debug string for potential use after it is closed -
     148              :         // either in this current step() invocation or on the next invocation.
     149            1 :         m.lastIterMsg = l.iter.String()
     150            1 : 
     151            1 :         // Step to the next point.
     152            1 :         l.iterKV = l.iter.Next()
     153            1 :         if l.iterKV == nil {
     154            1 :                 m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
     155            1 :                 l.iter = nil
     156            1 :                 m.heap.pop()
     157            1 :         } else {
     158            1 :                 // Check point keys in an sstable are ordered. Although not required, we check
     159            1 :                 // for memtables as well. A subtle check here is that successive sstables of
     160            1 :                 // L1 and higher levels are ordered. This happens when levelIter moves to the
     161            1 :                 // next sstable in the level, in which case item.key is previous sstable's
     162            1 :                 // last point key.
     163            1 :                 if !l.iterKV.K.IsExclusiveSentinel() && base.InternalCompare(m.heap.cmp, item.kv.K, l.iterKV.K) >= 0 {
     164            0 :                         m.err = errors.Errorf("out of order keys %s >= %s in %s",
     165            0 :                                 item.kv.K.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
     166            0 :                         return false
     167            0 :                 }
     168            1 :                 userKeyBuf := item.kv.K.UserKey[:0]
     169            1 :                 item.kv = *l.iterKV
     170            1 :                 item.kv.K.UserKey = append(userKeyBuf, l.iterKV.K.UserKey...)
     171            1 :                 if m.heap.len() > 1 {
     172            1 :                         m.heap.fix(0)
     173            1 :                 }
     174              :         }
     175            1 :         if m.err != nil {
     176            0 :                 return false
     177            0 :         }
     178            1 :         if m.heap.len() == 0 {
     179            1 :                 // If m.valueMerger != nil, the last record was a MERGE record.
     180            1 :                 if m.valueMerger != nil {
     181            1 :                         var closer io.Closer
     182            1 :                         var err error
     183            1 :                         _, closer, err = m.valueMerger.Finish(true /* includesBase */)
     184            1 :                         if closer != nil {
     185            0 :                                 err = errors.CombineErrors(err, closer.Close())
     186            0 :                         }
     187            1 :                         if err != nil {
     188            0 :                                 m.err = errors.CombineErrors(m.err,
     189            0 :                                         errors.Wrapf(err, "merge processing error on key %s in %s",
     190            0 :                                                 item.kv.K.Pretty(m.formatKey), m.lastIterMsg))
     191            0 :                         }
     192            1 :                         m.valueMerger = nil
     193              :                 }
     194            1 :                 return false
     195              :         }
     196            1 :         m.positionRangeDels()
     197            1 :         return true
     198              : }
     199              : 
     200              : // handleVisiblePoint returns true if validation succeeded and level checking
     201              : // can continue.
     202              : func (m *simpleMergingIter) handleVisiblePoint(
     203              :         item *simpleMergingIterItem, l *simpleMergingIterLevel,
     204            1 : ) (ok bool) {
     205            1 :         m.numPoints++
     206            1 :         keyChanged := m.heap.cmp(item.kv.K.UserKey, m.lastKey.UserKey) != 0
     207            1 :         if !keyChanged {
     208            1 :                 // At the same user key. We will see them in decreasing seqnum
     209            1 :                 // order so the lastLevel must not be lower.
     210            1 :                 if m.lastLevel > item.index {
     211            0 :                         m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
     212            0 :                                 item.kv.K.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
     213            0 :                                 m.lastIterMsg)
     214            0 :                         return false
     215            0 :                 }
     216            1 :                 m.lastLevel = item.index
     217            1 :         } else {
     218            1 :                 // The user key has changed.
     219            1 :                 m.lastKey.Trailer = item.kv.K.Trailer
     220            1 :                 m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.kv.K.UserKey...)
     221            1 :                 m.lastLevel = item.index
     222            1 :         }
     223              :         // Ongoing series of MERGE records ends with a MERGE record.
     224            1 :         if keyChanged && m.valueMerger != nil {
     225            1 :                 var closer io.Closer
     226            1 :                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     227            1 :                 if m.err == nil && closer != nil {
     228            0 :                         m.err = closer.Close()
     229            0 :                 }
     230            1 :                 m.valueMerger = nil
     231              :         }
     232            1 :         itemValue, _, err := item.kv.Value(nil)
     233            1 :         if err != nil {
     234            0 :                 m.err = err
     235            0 :                 return false
     236            0 :         }
     237            1 :         if m.valueMerger != nil {
     238            1 :                 // Ongoing series of MERGE records.
     239            1 :                 switch item.kv.K.Kind() {
     240            1 :                 case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
     241            1 :                         var closer io.Closer
     242            1 :                         _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     243            1 :                         if m.err == nil && closer != nil {
     244            0 :                                 m.err = closer.Close()
     245            0 :                         }
     246            1 :                         m.valueMerger = nil
     247            1 :                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     248            1 :                         m.err = m.valueMerger.MergeOlder(itemValue)
     249            1 :                         if m.err == nil {
     250            1 :                                 var closer io.Closer
     251            1 :                                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     252            1 :                                 if m.err == nil && closer != nil {
     253            0 :                                         m.err = closer.Close()
     254            0 :                                 }
     255              :                         }
     256            1 :                         m.valueMerger = nil
     257            1 :                 case InternalKeyKindMerge:
     258            1 :                         m.err = m.valueMerger.MergeOlder(itemValue)
     259            0 :                 default:
     260            0 :                         m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
     261            0 :                                 item.kv.K.Pretty(m.formatKey),
     262            0 :                                 l.iter)
     263            0 :                         return false
     264              :                 }
     265            1 :         } else if item.kv.K.Kind() == InternalKeyKindMerge && m.err == nil {
     266            1 :                 // New series of MERGE records.
     267            1 :                 m.valueMerger, m.err = m.merge(item.kv.K.UserKey, itemValue)
     268            1 :         }
     269            1 :         if m.err != nil {
     270            0 :                 m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
     271            0 :                         item.kv.K.Pretty(m.formatKey), l.iter)
     272            0 :                 return false
     273            0 :         }
     274              :         // Is this point covered by a tombstone at a lower level? Note that all these
     275              :         // iterators must be positioned at a key > item.key.
     276            1 :         for level := item.index + 1; level < len(m.levels); level++ {
     277            1 :                 lvl := &m.levels[level]
     278            1 :                 if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
     279            1 :                         continue
     280              :                 }
     281            1 :                 if lvl.tombstone.Contains(m.heap.cmp, item.kv.K.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.kv.K.SeqNum()) {
     282            0 :                         m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
     283            0 :                                 lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.kv.K.Pretty(m.formatKey),
     284            0 :                                 l.iter)
     285            0 :                         return false
     286            0 :                 }
     287              :         }
     288            1 :         return true
     289              : }
     290              : 
     291              : // Checking that range tombstones are mutually consistent is performed by
     292              : // checkRangeTombstones(). See the overview comment at the top of the file.
     293              : //
     294              : // We do this check as follows:
     295              : // - Collect the tombstones for each level, put them into one pool of tombstones
     296              : //   along with their level information (addTombstonesFromIter()).
     297              : // - Collect the start and end user keys from all these tombstones
     298              : //   (collectAllUserKey()) and use them to fragment all the tombstones
     299              : //   (fragmentUsingUserKey()).
     300              : // - Sort tombstones by start key and decreasing seqnum
     301              : //   (tombstonesByStartKeyAndSeqnum) - all tombstones that have the same start
     302              : //   key will have the same end key because they have been fragmented.
     303              : // - Iterate and check (iterateAndCheckTombstones()).
     304              : //
     305              : // Note that this simple approach requires holding all the tombstones across all
     306              : // levels in-memory. A more sophisticated incremental approach could be devised,
     307              : // if necessary.
     308              : 
     309              : // A tombstone and the corresponding level it was found in.
     310              : type tombstoneWithLevel struct {
     311              :         keyspan.Span
     312              :         level int
     313              :         // The level in LSM. A -1 means it's a memtable.
     314              :         lsmLevel int
     315              :         fileNum  base.FileNum
     316              : }
     317              : 
     318              : // For sorting tombstoneWithLevels in increasing order of start UserKey and
     319              : // for the same start UserKey in decreasing order of seqnum.
     320              : type tombstonesByStartKeyAndSeqnum struct {
     321              :         cmp Compare
     322              :         buf []tombstoneWithLevel
     323              : }
     324              : 
     325            1 : func (v *tombstonesByStartKeyAndSeqnum) Len() int { return len(v.buf) }
     326            1 : func (v *tombstonesByStartKeyAndSeqnum) Less(i, j int) bool {
     327            1 :         less := v.cmp(v.buf[i].Start, v.buf[j].Start)
     328            1 :         if less == 0 {
     329            1 :                 return v.buf[i].LargestSeqNum() > v.buf[j].LargestSeqNum()
     330            1 :         }
     331            1 :         return less < 0
     332              : }
     333            1 : func (v *tombstonesByStartKeyAndSeqnum) Swap(i, j int) {
     334            1 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
     335            1 : }
     336              : 
     337              : func iterateAndCheckTombstones(
     338              :         cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
     339            1 : ) error {
     340            1 :         sortBuf := tombstonesByStartKeyAndSeqnum{
     341            1 :                 cmp: cmp,
     342            1 :                 buf: tombstones,
     343            1 :         }
     344            1 :         sort.Sort(&sortBuf)
     345            1 : 
     346            1 :         // For a sequence of tombstones that share the same start UserKey, we will
     347            1 :         // encounter them in non-increasing seqnum order and so should encounter them
     348            1 :         // in non-decreasing level order.
     349            1 :         lastTombstone := tombstoneWithLevel{}
     350            1 :         for _, t := range tombstones {
     351            1 :                 if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
     352            0 :                         return errors.Errorf("encountered tombstone %s in %s"+
     353            0 :                                 " that has a lower seqnum than the same tombstone in %s",
     354            0 :                                 t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.fileNum),
     355            0 :                                 levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.fileNum))
     356            0 :                 }
     357            1 :                 lastTombstone = t
     358              :         }
     359            1 :         return nil
     360              : }
     361              : 
     362              : type checkConfig struct {
     363              :         logger    Logger
     364              :         comparer  *Comparer
     365              :         readState *readState
     366              :         newIters  tableNewIters
     367              :         seqNum    base.SeqNum
     368              :         stats     *CheckLevelsStats
     369              :         merge     Merge
     370              :         formatKey base.FormatKey
     371              : }
     372              : 
     373              : // cmp is shorthand for comparer.Compare.
     374            1 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
     375              : 
     376            1 : func checkRangeTombstones(c *checkConfig) error {
     377            1 :         var level int
     378            1 :         var tombstones []tombstoneWithLevel
     379            1 :         var err error
     380            1 : 
     381            1 :         memtables := c.readState.memtables
     382            1 :         for i := len(memtables) - 1; i >= 0; i-- {
     383            1 :                 iter := memtables[i].newRangeDelIter(nil)
     384            1 :                 if iter == nil {
     385            1 :                         continue
     386              :                 }
     387            1 :                 tombstones, err = addTombstonesFromIter(
     388            1 :                         iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
     389            1 :                 )
     390            1 :                 iter.Close()
     391            1 :                 if err != nil {
     392            0 :                         return err
     393            0 :                 }
     394            1 :                 level++
     395              :         }
     396              : 
     397            1 :         current := c.readState.current
     398            1 :         addTombstonesFromLevel := func(files manifest.LevelIterator, lsmLevel int) error {
     399            1 :                 for f := files.First(); f != nil; f = files.Next() {
     400            1 :                         lf := files.Take()
     401            1 :                         iters, err := c.newIters(
     402            1 :                                 context.Background(), lf.TableMetadata, &IterOptions{layer: manifest.Level(lsmLevel)},
     403            1 :                                 internalIterOpts{}, iterRangeDeletions)
     404            1 :                         if err != nil {
     405            0 :                                 return err
     406            0 :                         }
     407            1 :                         tombstones, err = addTombstonesFromIter(iters.RangeDeletion(), level, lsmLevel, f.FileNum,
     408            1 :                                 tombstones, c.seqNum, c.cmp, c.formatKey)
     409            1 :                         iters.CloseAll()
     410            1 : 
     411            1 :                         if err != nil {
     412            0 :                                 return err
     413            0 :                         }
     414              :                 }
     415            1 :                 return nil
     416              :         }
     417              :         // Now the levels with untruncated tombsones.
     418            1 :         for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
     419            1 :                 if current.L0SublevelFiles[i].Empty() {
     420            0 :                         continue
     421              :                 }
     422            1 :                 err := addTombstonesFromLevel(current.L0SublevelFiles[i].Iter(), 0)
     423            1 :                 if err != nil {
     424            0 :                         return err
     425            0 :                 }
     426            1 :                 level++
     427              :         }
     428            1 :         for i := 1; i < len(current.Levels); i++ {
     429            1 :                 if err := addTombstonesFromLevel(current.Levels[i].Iter(), i); err != nil {
     430            0 :                         return err
     431            0 :                 }
     432            1 :                 level++
     433              :         }
     434            1 :         if c.stats != nil {
     435            0 :                 c.stats.NumTombstones = len(tombstones)
     436            0 :         }
     437              :         // We now have truncated tombstones.
     438              :         // Fragment them all.
     439            1 :         userKeys := collectAllUserKeys(c.cmp, tombstones)
     440            1 :         tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
     441            1 :         return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
     442              : }
     443              : 
     444            0 : func levelOrMemtable(lsmLevel int, fileNum base.FileNum) string {
     445            0 :         if lsmLevel == -1 {
     446            0 :                 return "memtable"
     447            0 :         }
     448            0 :         return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, fileNum)
     449              : }
     450              : 
     451              : func addTombstonesFromIter(
     452              :         iter keyspan.FragmentIterator,
     453              :         level int,
     454              :         lsmLevel int,
     455              :         fileNum base.FileNum,
     456              :         tombstones []tombstoneWithLevel,
     457              :         seqNum base.SeqNum,
     458              :         cmp Compare,
     459              :         formatKey base.FormatKey,
     460            1 : ) (_ []tombstoneWithLevel, err error) {
     461            1 :         var prevTombstone keyspan.Span
     462            1 :         tomb, err := iter.First()
     463            1 :         for ; tomb != nil; tomb, err = iter.Next() {
     464            1 :                 t := tomb.Visible(seqNum)
     465            1 :                 if t.Empty() {
     466            1 :                         continue
     467              :                 }
     468            1 :                 t = t.Clone()
     469            1 :                 // This is mainly a test for rangeDelV2 formatted blocks which are expected to
     470            1 :                 // be ordered and fragmented on disk. But we anyways check for memtables,
     471            1 :                 // rangeDelV1 as well.
     472            1 :                 if cmp(prevTombstone.End, t.Start) > 0 {
     473            0 :                         return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
     474            0 :                                 prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, fileNum))
     475            0 :                 }
     476            1 :                 prevTombstone = t
     477            1 : 
     478            1 :                 if !t.Empty() {
     479            1 :                         tombstones = append(tombstones, tombstoneWithLevel{
     480            1 :                                 Span:     t,
     481            1 :                                 level:    level,
     482            1 :                                 lsmLevel: lsmLevel,
     483            1 :                                 fileNum:  fileNum,
     484            1 :                         })
     485            1 :                 }
     486              :         }
     487            1 :         if err != nil {
     488            0 :                 return nil, err
     489            0 :         }
     490            1 :         return tombstones, nil
     491              : }
     492              : 
     493              : type userKeysSort struct {
     494              :         cmp Compare
     495              :         buf [][]byte
     496              : }
     497              : 
     498            1 : func (v *userKeysSort) Len() int { return len(v.buf) }
     499            1 : func (v *userKeysSort) Less(i, j int) bool {
     500            1 :         return v.cmp(v.buf[i], v.buf[j]) < 0
     501            1 : }
     502            1 : func (v *userKeysSort) Swap(i, j int) {
     503            1 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
     504            1 : }
     505            1 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
     506            1 :         keys := make([][]byte, 0, len(tombstones)*2)
     507            1 :         for _, t := range tombstones {
     508            1 :                 keys = append(keys, t.Start)
     509            1 :                 keys = append(keys, t.End)
     510            1 :         }
     511            1 :         sorter := userKeysSort{
     512            1 :                 cmp: cmp,
     513            1 :                 buf: keys,
     514            1 :         }
     515            1 :         sort.Sort(&sorter)
     516            1 :         var last, curr int
     517            1 :         for last, curr = -1, 0; curr < len(keys); curr++ {
     518            1 :                 if last < 0 || cmp(keys[last], keys[curr]) != 0 {
     519            1 :                         last++
     520            1 :                         keys[last] = keys[curr]
     521            1 :                 }
     522              :         }
     523            1 :         keys = keys[:last+1]
     524            1 :         return keys
     525              : }
     526              : 
     527              : func fragmentUsingUserKeys(
     528              :         cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
     529            1 : ) []tombstoneWithLevel {
     530            1 :         var buf []tombstoneWithLevel
     531            1 :         for _, t := range tombstones {
     532            1 :                 // Find the first position with tombstone start < user key
     533            1 :                 i := sort.Search(len(userKeys), func(i int) bool {
     534            1 :                         return cmp(t.Start, userKeys[i]) < 0
     535            1 :                 })
     536            1 :                 for ; i < len(userKeys); i++ {
     537            1 :                         if cmp(userKeys[i], t.End) >= 0 {
     538            1 :                                 break
     539              :                         }
     540            1 :                         tPartial := t
     541            1 :                         tPartial.End = userKeys[i]
     542            1 :                         buf = append(buf, tPartial)
     543            1 :                         t.Start = userKeys[i]
     544              :                 }
     545            1 :                 buf = append(buf, t)
     546              :         }
     547            1 :         return buf
     548              : }
     549              : 
     550              : // CheckLevelsStats provides basic stats on points and tombstones encountered.
     551              : type CheckLevelsStats struct {
     552              :         NumPoints     int64
     553              :         NumTombstones int
     554              : }
     555              : 
     556              : // CheckLevels checks:
     557              : //   - Every entry in the DB is consistent with the level invariant. See the
     558              : //     comment at the top of the file.
     559              : //   - Point keys in sstables are ordered.
     560              : //   - Range delete tombstones in sstables are ordered and fragmented.
     561              : //   - Successful processing of all MERGE records.
     562            1 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
     563            1 :         // Grab and reference the current readState.
     564            1 :         readState := d.loadReadState()
     565            1 :         defer readState.unref()
     566            1 : 
     567            1 :         // Determine the seqnum to read at after grabbing the read state (current and
     568            1 :         // memtables) above.
     569            1 :         seqNum := d.mu.versions.visibleSeqNum.Load()
     570            1 : 
     571            1 :         checkConfig := &checkConfig{
     572            1 :                 logger:    d.opts.Logger,
     573            1 :                 comparer:  d.opts.Comparer,
     574            1 :                 readState: readState,
     575            1 :                 newIters:  d.newIters,
     576            1 :                 seqNum:    seqNum,
     577            1 :                 stats:     stats,
     578            1 :                 merge:     d.merge,
     579            1 :                 formatKey: d.opts.Comparer.FormatKey,
     580            1 :         }
     581            1 :         return checkLevelsInternal(checkConfig)
     582            1 : }
     583              : 
     584            1 : func checkLevelsInternal(c *checkConfig) (err error) {
     585            1 :         // Phase 1: Use a simpleMergingIter to step through all the points and ensure
     586            1 :         // that points with the same user key at different levels are not inverted
     587            1 :         // wrt sequence numbers and the same holds for tombstones that cover points.
     588            1 :         // To do this, one needs to construct a simpleMergingIter which is similar to
     589            1 :         // how one constructs a mergingIter.
     590            1 : 
     591            1 :         // Add mem tables from newest to oldest.
     592            1 :         var mlevels []simpleMergingIterLevel
     593            1 :         defer func() {
     594            1 :                 for i := range mlevels {
     595            1 :                         l := &mlevels[i]
     596            1 :                         if l.iter != nil {
     597            1 :                                 err = firstError(err, l.iter.Close())
     598            1 :                                 l.iter = nil
     599            1 :                         }
     600            1 :                         if l.rangeDelIter != nil {
     601            1 :                                 l.rangeDelIter.Close()
     602            1 :                                 l.rangeDelIter = nil
     603            1 :                         }
     604              :                 }
     605              :         }()
     606              : 
     607            1 :         memtables := c.readState.memtables
     608            1 :         for i := len(memtables) - 1; i >= 0; i-- {
     609            1 :                 mem := memtables[i]
     610            1 :                 mlevels = append(mlevels, simpleMergingIterLevel{
     611            1 :                         iter:         mem.newIter(nil),
     612            1 :                         rangeDelIter: mem.newRangeDelIter(nil),
     613            1 :                 })
     614            1 :         }
     615              : 
     616            1 :         current := c.readState.current
     617            1 :         // Determine the final size for mlevels so that there are no more
     618            1 :         // reallocations. levelIter will hold a pointer to elements in mlevels.
     619            1 :         start := len(mlevels)
     620            1 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     621            1 :                 if current.L0SublevelFiles[sublevel].Empty() {
     622            0 :                         continue
     623              :                 }
     624            1 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     625              :         }
     626            1 :         for level := 1; level < len(current.Levels); level++ {
     627            1 :                 if current.Levels[level].Empty() {
     628            1 :                         continue
     629              :                 }
     630            1 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     631              :         }
     632            1 :         mlevelAlloc := mlevels[start:]
     633            1 :         // Add L0 files by sublevel.
     634            1 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     635            1 :                 if current.L0SublevelFiles[sublevel].Empty() {
     636            0 :                         continue
     637              :                 }
     638            1 :                 manifestIter := current.L0SublevelFiles[sublevel].Iter()
     639            1 :                 iterOpts := IterOptions{logger: c.logger}
     640            1 :                 li := &levelIter{}
     641            1 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
     642            1 :                         manifest.L0Sublevel(sublevel), internalIterOpts{})
     643            1 :                 li.initRangeDel(&mlevelAlloc[0])
     644            1 :                 mlevelAlloc[0].iter = li
     645            1 :                 mlevelAlloc = mlevelAlloc[1:]
     646              :         }
     647            1 :         for level := 1; level < len(current.Levels); level++ {
     648            1 :                 if current.Levels[level].Empty() {
     649            1 :                         continue
     650              :                 }
     651              : 
     652            1 :                 iterOpts := IterOptions{logger: c.logger}
     653            1 :                 li := &levelIter{}
     654            1 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters,
     655            1 :                         current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
     656            1 :                 li.initRangeDel(&mlevelAlloc[0])
     657            1 :                 mlevelAlloc[0].iter = li
     658            1 :                 mlevelAlloc = mlevelAlloc[1:]
     659              :         }
     660              : 
     661            1 :         mergingIter := &simpleMergingIter{}
     662            1 :         mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
     663            1 :         for cont := mergingIter.step(); cont; cont = mergingIter.step() {
     664            1 :         }
     665            1 :         if err := mergingIter.err; err != nil {
     666            0 :                 return err
     667            0 :         }
     668            1 :         if c.stats != nil {
     669            0 :                 c.stats.NumPoints = mergingIter.numPoints
     670            0 :         }
     671              : 
     672              :         // Phase 2: Check that the tombstones are mutually consistent.
     673            1 :         return checkRangeTombstones(c)
     674              : }
     675              : 
     676              : type simpleMergingIterItem struct {
     677              :         index int
     678              :         kv    base.InternalKV
     679              : }
     680              : 
     681              : type simpleMergingIterHeap struct {
     682              :         cmp     Compare
     683              :         reverse bool
     684              :         items   []simpleMergingIterItem
     685              : }
     686              : 
     687            1 : func (h *simpleMergingIterHeap) len() int {
     688            1 :         return len(h.items)
     689            1 : }
     690              : 
     691            1 : func (h *simpleMergingIterHeap) less(i, j int) bool {
     692            1 :         ikey, jkey := h.items[i].kv.K, h.items[j].kv.K
     693            1 :         if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
     694            1 :                 if h.reverse {
     695            0 :                         return c > 0
     696            0 :                 }
     697            1 :                 return c < 0
     698              :         }
     699            1 :         if h.reverse {
     700            0 :                 return ikey.Trailer < jkey.Trailer
     701            0 :         }
     702            1 :         return ikey.Trailer > jkey.Trailer
     703              : }
     704              : 
     705            1 : func (h *simpleMergingIterHeap) swap(i, j int) {
     706            1 :         h.items[i], h.items[j] = h.items[j], h.items[i]
     707            1 : }
     708              : 
     709              : // init, fix, up and down are copied from the go stdlib.
     710            1 : func (h *simpleMergingIterHeap) init() {
     711            1 :         // heapify
     712            1 :         n := h.len()
     713            1 :         for i := n/2 - 1; i >= 0; i-- {
     714            1 :                 h.down(i, n)
     715            1 :         }
     716              : }
     717              : 
     718            1 : func (h *simpleMergingIterHeap) fix(i int) {
     719            1 :         if !h.down(i, h.len()) {
     720            1 :                 h.up(i)
     721            1 :         }
     722              : }
     723              : 
     724            1 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
     725            1 :         n := h.len() - 1
     726            1 :         h.swap(0, n)
     727            1 :         h.down(0, n)
     728            1 :         item := &h.items[n]
     729            1 :         h.items = h.items[:n]
     730            1 :         return item
     731            1 : }
     732              : 
     733            1 : func (h *simpleMergingIterHeap) up(j int) {
     734            1 :         for {
     735            1 :                 i := (j - 1) / 2 // parent
     736            1 :                 if i == j || !h.less(j, i) {
     737            1 :                         break
     738              :                 }
     739            0 :                 h.swap(i, j)
     740            0 :                 j = i
     741              :         }
     742              : }
     743              : 
     744            1 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
     745            1 :         i := i0
     746            1 :         for {
     747            1 :                 j1 := 2*i + 1
     748            1 :                 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
     749            1 :                         break
     750              :                 }
     751            1 :                 j := j1 // left child
     752            1 :                 if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
     753            1 :                         j = j2 // = 2*i + 2  // right child
     754            1 :                 }
     755            1 :                 if !h.less(j, i) {
     756            1 :                         break
     757              :                 }
     758            1 :                 h.swap(i, j)
     759            1 :                 i = j
     760              :         }
     761            1 :         return i > i0
     762              : }
        

Generated by: LCOV version 2.0-1