LCOV - code coverage report
Current view: top level - pebble - level_checker.go (source / functions) Hit Total Coverage
Test: 2024-04-17 08:15Z 990bac8d - tests + meta.lcov Lines: 451 484 93.2 %
Date: 2024-04-17 08:18:29 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "io"
      11             :         "sort"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/keyspan"
      16             :         "github.com/cockroachdb/pebble/internal/manifest"
      17             : )
      18             : 
      19             : // This file implements DB.CheckLevels() which checks that every entry in the
      20             : // DB is consistent with respect to the level invariant: any point (or the
      21             : // infinite number of points in a range tombstone) has a seqnum such that a
      22             : // point with the same UserKey at a lower level has a lower seqnum. This is an
      23             : // expensive check since it involves iterating over all the entries in the DB,
      24             : // hence only intended for tests or tools.
      25             : //
      26             : // If we ignore range tombstones, the consistency checking of points can be
      27             : // done with a simplified version of mergingIter. simpleMergingIter is that
      28             : // simplified version of mergingIter that only needs to step through points
      29             : // (analogous to only doing Next()). It can also easily accommodate
      30             : // consistency checking of points relative to range tombstones.
      31             : // simpleMergingIter does not do any seek optimizations present in mergingIter
      32             : // (it minimally needs to seek the range delete iterators to position them at
      33             : // or past the current point) since it does not want to miss points for
      34             : // purposes of consistency checking.
      35             : //
      36             : // Mutual consistency of range tombstones is non-trivial to check. One needs
      37             : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
      38             : // a lower level. The start key of the former is not contained in the latter
      39             : // and we can't use the exclusive end key, c, for a containment check since it
      40             : // is the sentinel key. We observe that if these tombstones were fragmented
      41             : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
      42             : // [b, c)#10 at the lower level and then it is is trivial to compare the two
      43             : // [b, c) tombstones. Note that this fragmentation needs to take into account
      44             : // that tombstones in a file may be untruncated and need to act within the
      45             : // bounds of the file. This checking is performed by checkRangeTombstones()
      46             : // and its helper functions.
      47             : 
      48             : // The per-level structure used by simpleMergingIter.
      49             : type simpleMergingIterLevel struct {
      50             :         iter         internalIterator
      51             :         rangeDelIter keyspan.FragmentIterator
      52             :         levelIterBoundaryContext
      53             : 
      54             :         iterKV    *base.InternalKV
      55             :         tombstone *keyspan.Span
      56             : }
      57             : 
      58             : type simpleMergingIter struct {
      59             :         levels   []simpleMergingIterLevel
      60             :         snapshot uint64
      61             :         heap     simpleMergingIterHeap
      62             :         // The last point's key and level. For validation.
      63             :         lastKey     InternalKey
      64             :         lastLevel   int
      65             :         lastIterMsg string
      66             :         // A non-nil valueMerger means MERGE record processing is ongoing.
      67             :         valueMerger base.ValueMerger
      68             :         // The first error will cause step() to return false.
      69             :         err       error
      70             :         numPoints int64
      71             :         merge     Merge
      72             :         formatKey base.FormatKey
      73             : }
      74             : 
      75             : func (m *simpleMergingIter) init(
      76             :         merge Merge,
      77             :         cmp Compare,
      78             :         snapshot uint64,
      79             :         formatKey base.FormatKey,
      80             :         levels ...simpleMergingIterLevel,
      81           2 : ) {
      82           2 :         m.levels = levels
      83           2 :         m.formatKey = formatKey
      84           2 :         m.merge = merge
      85           2 :         m.snapshot = snapshot
      86           2 :         m.lastLevel = -1
      87           2 :         m.heap.cmp = cmp
      88           2 :         m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
      89           2 :         for i := range m.levels {
      90           2 :                 l := &m.levels[i]
      91           2 :                 l.iterKV = l.iter.First()
      92           2 :                 if l.iterKV != nil {
      93           2 :                         item := simpleMergingIterItem{
      94           2 :                                 index: i,
      95           2 :                                 value: l.iterKV.V,
      96           2 :                         }
      97           2 :                         item.key = l.iterKV.K.Clone()
      98           2 :                         m.heap.items = append(m.heap.items, item)
      99           2 :                 }
     100             :         }
     101           2 :         m.heap.init()
     102           2 : 
     103           2 :         if m.heap.len() == 0 {
     104           2 :                 return
     105           2 :         }
     106           2 :         m.positionRangeDels()
     107             : }
     108             : 
     109             : // Positions all the rangedel iterators at or past the current top of the
     110             : // heap, using SeekGE().
     111           2 : func (m *simpleMergingIter) positionRangeDels() {
     112           2 :         item := &m.heap.items[0]
     113           2 :         for i := range m.levels {
     114           2 :                 l := &m.levels[i]
     115           2 :                 if l.rangeDelIter == nil {
     116           2 :                         continue
     117             :                 }
     118           2 :                 t, err := l.rangeDelIter.SeekGE(item.key.UserKey)
     119           2 :                 m.err = firstError(m.err, err)
     120           2 :                 l.tombstone = t
     121             :         }
     122             : }
     123             : 
     124             : // Returns true if not yet done.
     125           2 : func (m *simpleMergingIter) step() bool {
     126           2 :         if m.heap.len() == 0 || m.err != nil {
     127           2 :                 return false
     128           2 :         }
     129           2 :         item := &m.heap.items[0]
     130           2 :         l := &m.levels[item.index]
     131           2 :         // Sentinels are not relevant for this point checking.
     132           2 :         if !l.isIgnorableBoundaryKey && !item.key.IsExclusiveSentinel() &&
     133           2 :                 item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
     134           2 :                 m.numPoints++
     135           2 :                 keyChanged := m.heap.cmp(item.key.UserKey, m.lastKey.UserKey) != 0
     136           2 :                 if !keyChanged {
     137           2 :                         // At the same user key. We will see them in decreasing seqnum
     138           2 :                         // order so the lastLevel must not be lower.
     139           2 :                         if m.lastLevel > item.index {
     140           1 :                                 m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
     141           1 :                                         item.key.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
     142           1 :                                         m.lastIterMsg)
     143           1 :                                 return false
     144           1 :                         }
     145           2 :                         m.lastLevel = item.index
     146           2 :                 } else {
     147           2 :                         // The user key has changed.
     148           2 :                         m.lastKey.Trailer = item.key.Trailer
     149           2 :                         m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.key.UserKey...)
     150           2 :                         m.lastLevel = item.index
     151           2 :                 }
     152             :                 // Ongoing series of MERGE records ends with a MERGE record.
     153           2 :                 if keyChanged && m.valueMerger != nil {
     154           2 :                         var closer io.Closer
     155           2 :                         _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     156           2 :                         if m.err == nil && closer != nil {
     157           0 :                                 m.err = closer.Close()
     158           0 :                         }
     159           2 :                         m.valueMerger = nil
     160             :                 }
     161           2 :                 itemValue, _, err := item.value.Value(nil)
     162           2 :                 if err != nil {
     163           0 :                         m.err = err
     164           0 :                         return false
     165           0 :                 }
     166           2 :                 if m.valueMerger != nil {
     167           2 :                         // Ongoing series of MERGE records.
     168           2 :                         switch item.key.Kind() {
     169           2 :                         case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
     170           2 :                                 var closer io.Closer
     171           2 :                                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     172           2 :                                 if m.err == nil && closer != nil {
     173           1 :                                         m.err = closer.Close()
     174           1 :                                 }
     175           2 :                                 m.valueMerger = nil
     176           2 :                         case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     177           2 :                                 m.err = m.valueMerger.MergeOlder(itemValue)
     178           2 :                                 if m.err == nil {
     179           2 :                                         var closer io.Closer
     180           2 :                                         _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     181           2 :                                         if m.err == nil && closer != nil {
     182           1 :                                                 m.err = closer.Close()
     183           1 :                                         }
     184             :                                 }
     185           2 :                                 m.valueMerger = nil
     186           2 :                         case InternalKeyKindMerge:
     187           2 :                                 m.err = m.valueMerger.MergeOlder(itemValue)
     188           0 :                         default:
     189           0 :                                 m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
     190           0 :                                         item.key.Pretty(m.formatKey),
     191           0 :                                         l.iter)
     192           0 :                                 return false
     193             :                         }
     194           2 :                 } else if item.key.Kind() == InternalKeyKindMerge && m.err == nil {
     195           2 :                         // New series of MERGE records.
     196           2 :                         m.valueMerger, m.err = m.merge(item.key.UserKey, itemValue)
     197           2 :                 }
     198           2 :                 if m.err != nil {
     199           1 :                         m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
     200           1 :                                 item.key.Pretty(m.formatKey), l.iter)
     201           1 :                         return false
     202           1 :                 }
     203             :                 // Is this point covered by a tombstone at a lower level? Note that all these
     204             :                 // iterators must be positioned at a key > item.key.
     205           2 :                 for level := item.index + 1; level < len(m.levels); level++ {
     206           2 :                         lvl := &m.levels[level]
     207           2 :                         if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
     208           2 :                                 continue
     209             :                         }
     210           2 :                         if lvl.tombstone.Contains(m.heap.cmp, item.key.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.key.SeqNum()) {
     211           1 :                                 m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
     212           1 :                                         lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.key.Pretty(m.formatKey),
     213           1 :                                         l.iter)
     214           1 :                                 return false
     215           1 :                         }
     216             :                 }
     217             :         }
     218             : 
     219             :         // The iterator for the current level may be closed in the following call to
     220             :         // Next(). We save its debug string for potential use after it is closed -
     221             :         // either in this current step() invocation or on the next invocation.
     222           2 :         m.lastIterMsg = l.iter.String()
     223           2 : 
     224           2 :         // Step to the next point.
     225           2 :         l.iterKV = l.iter.Next()
     226           2 :         if !l.isIgnorableBoundaryKey {
     227           2 :                 if l.iterKV != nil {
     228           2 :                         // Check point keys in an sstable are ordered. Although not required, we check
     229           2 :                         // for memtables as well. A subtle check here is that successive sstables of
     230           2 :                         // L1 and higher levels are ordered. This happens when levelIter moves to the
     231           2 :                         // next sstable in the level, in which case item.key is previous sstable's
     232           2 :                         // last point key.
     233           2 :                         if base.InternalCompare(m.heap.cmp, item.key, l.iterKV.K) >= 0 {
     234           1 :                                 m.err = errors.Errorf("out of order keys %s >= %s in %s",
     235           1 :                                         item.key.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
     236           1 :                                 return false
     237           1 :                         }
     238           2 :                         item.key = base.InternalKey{
     239           2 :                                 Trailer: l.iterKV.K.Trailer,
     240           2 :                                 UserKey: append(item.key.UserKey[:0], l.iterKV.K.UserKey...),
     241           2 :                         }
     242           2 :                         item.value = l.iterKV.V
     243           2 :                         if m.heap.len() > 1 {
     244           2 :                                 m.heap.fix(0)
     245           2 :                         }
     246           2 :                 } else {
     247           2 :                         m.err = l.iter.Close()
     248           2 :                         l.iter = nil
     249           2 :                         m.heap.pop()
     250           2 :                 }
     251           2 :                 if m.err != nil {
     252           0 :                         return false
     253           0 :                 }
     254           2 :                 if m.heap.len() == 0 {
     255           2 :                         // Last record was a MERGE record.
     256           2 :                         if m.valueMerger != nil {
     257           2 :                                 var closer io.Closer
     258           2 :                                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     259           2 :                                 if m.err == nil && closer != nil {
     260           0 :                                         m.err = closer.Close()
     261           0 :                                 }
     262           2 :                                 if m.err != nil {
     263           1 :                                         m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
     264           1 :                                                 item.key.Pretty(m.formatKey), m.lastIterMsg)
     265           1 :                                 }
     266           2 :                                 m.valueMerger = nil
     267             :                         }
     268           2 :                         return false
     269             :                 }
     270             :         }
     271           2 :         m.positionRangeDels()
     272           2 :         return true
     273             : }
     274             : 
     275             : // Checking that range tombstones are mutually consistent is performed by
     276             : // checkRangeTombstones(). See the overview comment at the top of the file.
     277             : //
     278             : // We do this check as follows:
     279             : // - Collect the tombstones for each level, put them into one pool of tombstones
     280             : //   along with their level information (addTombstonesFromIter()).
     281             : // - Collect the start and end user keys from all these tombstones
     282             : //   (collectAllUserKey()) and use them to fragment all the tombstones
     283             : //   (fragmentUsingUserKey()).
     284             : // - Sort tombstones by start key and decreasing seqnum
     285             : //   (tombstonesByStartKeyAndSeqnum) - all tombstones that have the same start
     286             : //   key will have the same end key because they have been fragmented.
     287             : // - Iterate and check (iterateAndCheckTombstones()).
     288             : //
     289             : // Note that this simple approach requires holding all the tombstones across all
     290             : // levels in-memory. A more sophisticated incremental approach could be devised,
     291             : // if necessary.
     292             : 
     293             : // A tombstone and the corresponding level it was found in.
     294             : type tombstoneWithLevel struct {
     295             :         keyspan.Span
     296             :         level int
     297             :         // The level in LSM. A -1 means it's a memtable.
     298             :         lsmLevel int
     299             :         fileNum  FileNum
     300             : }
     301             : 
     302             : // For sorting tombstoneWithLevels in increasing order of start UserKey and
     303             : // for the same start UserKey in decreasing order of seqnum.
     304             : type tombstonesByStartKeyAndSeqnum struct {
     305             :         cmp Compare
     306             :         buf []tombstoneWithLevel
     307             : }
     308             : 
     309           2 : func (v *tombstonesByStartKeyAndSeqnum) Len() int { return len(v.buf) }
     310           2 : func (v *tombstonesByStartKeyAndSeqnum) Less(i, j int) bool {
     311           2 :         less := v.cmp(v.buf[i].Start, v.buf[j].Start)
     312           2 :         if less == 0 {
     313           2 :                 return v.buf[i].LargestSeqNum() > v.buf[j].LargestSeqNum()
     314           2 :         }
     315           2 :         return less < 0
     316             : }
     317           2 : func (v *tombstonesByStartKeyAndSeqnum) Swap(i, j int) {
     318           2 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
     319           2 : }
     320             : 
     321             : func iterateAndCheckTombstones(
     322             :         cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
     323           2 : ) error {
     324           2 :         sortBuf := tombstonesByStartKeyAndSeqnum{
     325           2 :                 cmp: cmp,
     326           2 :                 buf: tombstones,
     327           2 :         }
     328           2 :         sort.Sort(&sortBuf)
     329           2 : 
     330           2 :         // For a sequence of tombstones that share the same start UserKey, we will
     331           2 :         // encounter them in non-increasing seqnum order and so should encounter them
     332           2 :         // in non-decreasing level order.
     333           2 :         lastTombstone := tombstoneWithLevel{}
     334           2 :         for _, t := range tombstones {
     335           2 :                 if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
     336           1 :                         return errors.Errorf("encountered tombstone %s in %s"+
     337           1 :                                 " that has a lower seqnum than the same tombstone in %s",
     338           1 :                                 t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.fileNum),
     339           1 :                                 levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.fileNum))
     340           1 :                 }
     341           2 :                 lastTombstone = t
     342             :         }
     343           2 :         return nil
     344             : }
     345             : 
     346             : type checkConfig struct {
     347             :         logger    Logger
     348             :         comparer  *Comparer
     349             :         readState *readState
     350             :         newIters  tableNewIters
     351             :         seqNum    uint64
     352             :         stats     *CheckLevelsStats
     353             :         merge     Merge
     354             :         formatKey base.FormatKey
     355             : }
     356             : 
     357             : // cmp is shorthand for comparer.Compare.
     358           2 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
     359             : 
     360           2 : func checkRangeTombstones(c *checkConfig) error {
     361           2 :         var level int
     362           2 :         var tombstones []tombstoneWithLevel
     363           2 :         var err error
     364           2 : 
     365           2 :         memtables := c.readState.memtables
     366           2 :         for i := len(memtables) - 1; i >= 0; i-- {
     367           2 :                 iter := memtables[i].newRangeDelIter(nil)
     368           2 :                 if iter == nil {
     369           2 :                         continue
     370             :                 }
     371           2 :                 tombstones, err = addTombstonesFromIter(
     372           2 :                         iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
     373           2 :                 )
     374           2 :                 if err != nil {
     375           0 :                         return err
     376           0 :                 }
     377           2 :                 level++
     378             :         }
     379             : 
     380           2 :         current := c.readState.current
     381           2 :         addTombstonesFromLevel := func(files manifest.LevelIterator, lsmLevel int) error {
     382           2 :                 for f := files.First(); f != nil; f = files.Next() {
     383           2 :                         lf := files.Take()
     384           2 :                         //lower, upper := manifest.KeyRange(c.cmp, lf.Iter())
     385           2 :                         iterToClose, iter, err := c.newIters.TODO(
     386           2 :                                 context.Background(), lf.FileMetadata, &IterOptions{level: manifest.Level(lsmLevel)}, internalIterOpts{})
     387           2 :                         if err != nil {
     388           0 :                                 return err
     389           0 :                         }
     390           2 :                         iterToClose.Close()
     391           2 :                         if iter == nil {
     392           2 :                                 continue
     393             :                         }
     394           2 :                         if tombstones, err = addTombstonesFromIter(iter, level, lsmLevel, f.FileNum,
     395           2 :                                 tombstones, c.seqNum, c.cmp, c.formatKey); err != nil {
     396           1 :                                 return err
     397           1 :                         }
     398             :                 }
     399           2 :                 return nil
     400             :         }
     401             :         // Now the levels with untruncated tombsones.
     402           2 :         for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
     403           2 :                 if current.L0SublevelFiles[i].Empty() {
     404           0 :                         continue
     405             :                 }
     406           2 :                 err := addTombstonesFromLevel(current.L0SublevelFiles[i].Iter(), 0)
     407           2 :                 if err != nil {
     408           0 :                         return err
     409           0 :                 }
     410           2 :                 level++
     411             :         }
     412           2 :         for i := 1; i < len(current.Levels); i++ {
     413           2 :                 if err := addTombstonesFromLevel(current.Levels[i].Iter(), i); err != nil {
     414           1 :                         return err
     415           1 :                 }
     416           2 :                 level++
     417             :         }
     418           2 :         if c.stats != nil {
     419           1 :                 c.stats.NumTombstones = len(tombstones)
     420           1 :         }
     421             :         // We now have truncated tombstones.
     422             :         // Fragment them all.
     423           2 :         userKeys := collectAllUserKeys(c.cmp, tombstones)
     424           2 :         tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
     425           2 :         return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
     426             : }
     427             : 
     428           1 : func levelOrMemtable(lsmLevel int, fileNum FileNum) string {
     429           1 :         if lsmLevel == -1 {
     430           0 :                 return "memtable"
     431           0 :         }
     432           1 :         return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, fileNum)
     433             : }
     434             : 
     435             : func addTombstonesFromIter(
     436             :         iter keyspan.FragmentIterator,
     437             :         level int,
     438             :         lsmLevel int,
     439             :         fileNum FileNum,
     440             :         tombstones []tombstoneWithLevel,
     441             :         seqNum uint64,
     442             :         cmp Compare,
     443             :         formatKey base.FormatKey,
     444           2 : ) (_ []tombstoneWithLevel, err error) {
     445           2 :         defer func() {
     446           2 :                 err = firstError(err, iter.Close())
     447           2 :         }()
     448             : 
     449           2 :         var prevTombstone keyspan.Span
     450           2 :         tomb, err := iter.First()
     451           2 :         for ; tomb != nil; tomb, err = iter.Next() {
     452           2 :                 t := tomb.Visible(seqNum)
     453           2 :                 if t.Empty() {
     454           2 :                         continue
     455             :                 }
     456           2 :                 t = t.DeepClone()
     457           2 :                 // This is mainly a test for rangeDelV2 formatted blocks which are expected to
     458           2 :                 // be ordered and fragmented on disk. But we anyways check for memtables,
     459           2 :                 // rangeDelV1 as well.
     460           2 :                 if cmp(prevTombstone.End, t.Start) > 0 {
     461           1 :                         return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
     462           1 :                                 prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, fileNum))
     463           1 :                 }
     464           2 :                 prevTombstone = t
     465           2 : 
     466           2 :                 if !t.Empty() {
     467           2 :                         tombstones = append(tombstones, tombstoneWithLevel{
     468           2 :                                 Span:     t,
     469           2 :                                 level:    level,
     470           2 :                                 lsmLevel: lsmLevel,
     471           2 :                                 fileNum:  fileNum,
     472           2 :                         })
     473           2 :                 }
     474             :         }
     475           2 :         if err != nil {
     476           0 :                 return nil, err
     477           0 :         }
     478           2 :         return tombstones, nil
     479             : }
     480             : 
     481             : type userKeysSort struct {
     482             :         cmp Compare
     483             :         buf [][]byte
     484             : }
     485             : 
     486           2 : func (v *userKeysSort) Len() int { return len(v.buf) }
     487           2 : func (v *userKeysSort) Less(i, j int) bool {
     488           2 :         return v.cmp(v.buf[i], v.buf[j]) < 0
     489           2 : }
     490           2 : func (v *userKeysSort) Swap(i, j int) {
     491           2 :         v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
     492           2 : }
     493           2 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
     494           2 :         keys := make([][]byte, 0, len(tombstones)*2)
     495           2 :         for _, t := range tombstones {
     496           2 :                 keys = append(keys, t.Start)
     497           2 :                 keys = append(keys, t.End)
     498           2 :         }
     499           2 :         sorter := userKeysSort{
     500           2 :                 cmp: cmp,
     501           2 :                 buf: keys,
     502           2 :         }
     503           2 :         sort.Sort(&sorter)
     504           2 :         var last, curr int
     505           2 :         for last, curr = -1, 0; curr < len(keys); curr++ {
     506           2 :                 if last < 0 || cmp(keys[last], keys[curr]) != 0 {
     507           2 :                         last++
     508           2 :                         keys[last] = keys[curr]
     509           2 :                 }
     510             :         }
     511           2 :         keys = keys[:last+1]
     512           2 :         return keys
     513             : }
     514             : 
     515             : func fragmentUsingUserKeys(
     516             :         cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
     517           2 : ) []tombstoneWithLevel {
     518           2 :         var buf []tombstoneWithLevel
     519           2 :         for _, t := range tombstones {
     520           2 :                 // Find the first position with tombstone start < user key
     521           2 :                 i := sort.Search(len(userKeys), func(i int) bool {
     522           2 :                         return cmp(t.Start, userKeys[i]) < 0
     523           2 :                 })
     524           2 :                 for ; i < len(userKeys); i++ {
     525           2 :                         if cmp(userKeys[i], t.End) >= 0 {
     526           2 :                                 break
     527             :                         }
     528           2 :                         tPartial := t
     529           2 :                         tPartial.End = userKeys[i]
     530           2 :                         buf = append(buf, tPartial)
     531           2 :                         t.Start = userKeys[i]
     532             :                 }
     533           2 :                 buf = append(buf, t)
     534             :         }
     535           2 :         return buf
     536             : }
     537             : 
     538             : // CheckLevelsStats provides basic stats on points and tombstones encountered.
     539             : type CheckLevelsStats struct {
     540             :         NumPoints     int64
     541             :         NumTombstones int
     542             : }
     543             : 
     544             : // CheckLevels checks:
     545             : //   - Every entry in the DB is consistent with the level invariant. See the
     546             : //     comment at the top of the file.
     547             : //   - Point keys in sstables are ordered.
     548             : //   - Range delete tombstones in sstables are ordered and fragmented.
     549             : //   - Successful processing of all MERGE records.
     550           2 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
     551           2 :         // Grab and reference the current readState.
     552           2 :         readState := d.loadReadState()
     553           2 :         defer readState.unref()
     554           2 : 
     555           2 :         // Determine the seqnum to read at after grabbing the read state (current and
     556           2 :         // memtables) above.
     557           2 :         seqNum := d.mu.versions.visibleSeqNum.Load()
     558           2 : 
     559           2 :         checkConfig := &checkConfig{
     560           2 :                 logger:    d.opts.Logger,
     561           2 :                 comparer:  d.opts.Comparer,
     562           2 :                 readState: readState,
     563           2 :                 newIters:  d.newIters,
     564           2 :                 seqNum:    seqNum,
     565           2 :                 stats:     stats,
     566           2 :                 merge:     d.merge,
     567           2 :                 formatKey: d.opts.Comparer.FormatKey,
     568           2 :         }
     569           2 :         return checkLevelsInternal(checkConfig)
     570           2 : }
     571             : 
     572           2 : func checkLevelsInternal(c *checkConfig) (err error) {
     573           2 :         // Phase 1: Use a simpleMergingIter to step through all the points and ensure
     574           2 :         // that points with the same user key at different levels are not inverted
     575           2 :         // wrt sequence numbers and the same holds for tombstones that cover points.
     576           2 :         // To do this, one needs to construct a simpleMergingIter which is similar to
     577           2 :         // how one constructs a mergingIter.
     578           2 : 
     579           2 :         // Add mem tables from newest to oldest.
     580           2 :         var mlevels []simpleMergingIterLevel
     581           2 :         defer func() {
     582           2 :                 for i := range mlevels {
     583           2 :                         l := &mlevels[i]
     584           2 :                         if l.iter != nil {
     585           2 :                                 err = firstError(err, l.iter.Close())
     586           2 :                                 l.iter = nil
     587           2 :                         }
     588           2 :                         if l.rangeDelIter != nil {
     589           2 :                                 err = firstError(err, l.rangeDelIter.Close())
     590           2 :                                 l.rangeDelIter = nil
     591           2 :                         }
     592             :                 }
     593             :         }()
     594             : 
     595           2 :         memtables := c.readState.memtables
     596           2 :         for i := len(memtables) - 1; i >= 0; i-- {
     597           2 :                 mem := memtables[i]
     598           2 :                 mlevels = append(mlevels, simpleMergingIterLevel{
     599           2 :                         iter:         mem.newIter(nil),
     600           2 :                         rangeDelIter: mem.newRangeDelIter(nil),
     601           2 :                 })
     602           2 :         }
     603             : 
     604           2 :         current := c.readState.current
     605           2 :         // Determine the final size for mlevels so that there are no more
     606           2 :         // reallocations. levelIter will hold a pointer to elements in mlevels.
     607           2 :         start := len(mlevels)
     608           2 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     609           2 :                 if current.L0SublevelFiles[sublevel].Empty() {
     610           0 :                         continue
     611             :                 }
     612           2 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     613             :         }
     614           2 :         for level := 1; level < len(current.Levels); level++ {
     615           2 :                 if current.Levels[level].Empty() {
     616           2 :                         continue
     617             :                 }
     618           2 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     619             :         }
     620           2 :         mlevelAlloc := mlevels[start:]
     621           2 :         // Add L0 files by sublevel.
     622           2 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     623           2 :                 if current.L0SublevelFiles[sublevel].Empty() {
     624           0 :                         continue
     625             :                 }
     626           2 :                 manifestIter := current.L0SublevelFiles[sublevel].Iter()
     627           2 :                 iterOpts := IterOptions{logger: c.logger}
     628           2 :                 li := &levelIter{}
     629           2 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
     630           2 :                         manifest.L0Sublevel(sublevel), internalIterOpts{})
     631           2 :                 li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
     632           2 :                 li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
     633           2 :                 mlevelAlloc[0].iter = li
     634           2 :                 mlevelAlloc = mlevelAlloc[1:]
     635             :         }
     636           2 :         for level := 1; level < len(current.Levels); level++ {
     637           2 :                 if current.Levels[level].Empty() {
     638           2 :                         continue
     639             :                 }
     640             : 
     641           2 :                 iterOpts := IterOptions{logger: c.logger}
     642           2 :                 li := &levelIter{}
     643           2 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters,
     644           2 :                         current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
     645           2 :                 li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
     646           2 :                 li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
     647           2 :                 mlevelAlloc[0].iter = li
     648           2 :                 mlevelAlloc = mlevelAlloc[1:]
     649             :         }
     650             : 
     651           2 :         mergingIter := &simpleMergingIter{}
     652           2 :         mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
     653           2 :         for cont := mergingIter.step(); cont; cont = mergingIter.step() {
     654           2 :         }
     655           2 :         if err := mergingIter.err; err != nil {
     656           1 :                 return err
     657           1 :         }
     658           2 :         if c.stats != nil {
     659           1 :                 c.stats.NumPoints = mergingIter.numPoints
     660           1 :         }
     661             : 
     662             :         // Phase 2: Check that the tombstones are mutually consistent.
     663           2 :         return checkRangeTombstones(c)
     664             : }
     665             : 
     666             : type simpleMergingIterItem struct {
     667             :         index int
     668             :         key   InternalKey
     669             :         value base.LazyValue
     670             : }
     671             : 
     672             : type simpleMergingIterHeap struct {
     673             :         cmp     Compare
     674             :         reverse bool
     675             :         items   []simpleMergingIterItem
     676             : }
     677             : 
     678           2 : func (h *simpleMergingIterHeap) len() int {
     679           2 :         return len(h.items)
     680           2 : }
     681             : 
     682           2 : func (h *simpleMergingIterHeap) less(i, j int) bool {
     683           2 :         ikey, jkey := h.items[i].key, h.items[j].key
     684           2 :         if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
     685           2 :                 if h.reverse {
     686           0 :                         return c > 0
     687           0 :                 }
     688           2 :                 return c < 0
     689             :         }
     690           2 :         if h.reverse {
     691           0 :                 return ikey.Trailer < jkey.Trailer
     692           0 :         }
     693           2 :         return ikey.Trailer > jkey.Trailer
     694             : }
     695             : 
     696           2 : func (h *simpleMergingIterHeap) swap(i, j int) {
     697           2 :         h.items[i], h.items[j] = h.items[j], h.items[i]
     698           2 : }
     699             : 
     700             : // init, fix, up and down are copied from the go stdlib.
     701           2 : func (h *simpleMergingIterHeap) init() {
     702           2 :         // heapify
     703           2 :         n := h.len()
     704           2 :         for i := n/2 - 1; i >= 0; i-- {
     705           2 :                 h.down(i, n)
     706           2 :         }
     707             : }
     708             : 
     709           2 : func (h *simpleMergingIterHeap) fix(i int) {
     710           2 :         if !h.down(i, h.len()) {
     711           2 :                 h.up(i)
     712           2 :         }
     713             : }
     714             : 
     715           2 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
     716           2 :         n := h.len() - 1
     717           2 :         h.swap(0, n)
     718           2 :         h.down(0, n)
     719           2 :         item := &h.items[n]
     720           2 :         h.items = h.items[:n]
     721           2 :         return item
     722           2 : }
     723             : 
     724           2 : func (h *simpleMergingIterHeap) up(j int) {
     725           2 :         for {
     726           2 :                 i := (j - 1) / 2 // parent
     727           2 :                 if i == j || !h.less(j, i) {
     728           2 :                         break
     729             :                 }
     730           0 :                 h.swap(i, j)
     731           0 :                 j = i
     732             :         }
     733             : }
     734             : 
     735           2 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
     736           2 :         i := i0
     737           2 :         for {
     738           2 :                 j1 := 2*i + 1
     739           2 :                 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
     740           2 :                         break
     741             :                 }
     742           2 :                 j := j1 // left child
     743           2 :                 if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
     744           2 :                         j = j2 // = 2*i + 2  // right child
     745           2 :                 }
     746           2 :                 if !h.less(j, i) {
     747           2 :                         break
     748             :                 }
     749           2 :                 h.swap(i, j)
     750           2 :                 i = j
     751             :         }
     752           2 :         return i > i0
     753             : }

Generated by: LCOV version 1.14