LCOV - code coverage report
Current view: top level - pebble - level_checker.go (source / functions) Coverage Total Hit
Test: 2025-06-11 08:18Z 549acfee - tests + meta.lcov Lines: 83.3 % 466 388
Test Date: 2025-06-11 08:20:04 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         stdcmp "cmp"
       9              :         "context"
      10              :         "fmt"
      11              :         "io"
      12              :         "iter"
      13              :         "slices"
      14              :         "sort"
      15              : 
      16              :         "github.com/cockroachdb/errors"
      17              :         "github.com/cockroachdb/pebble/internal/base"
      18              :         "github.com/cockroachdb/pebble/internal/keyspan"
      19              :         "github.com/cockroachdb/pebble/internal/manifest"
      20              :         "github.com/cockroachdb/pebble/sstable"
      21              :         "github.com/cockroachdb/pebble/sstable/blob"
      22              :         "github.com/cockroachdb/pebble/sstable/block"
      23              : )
      24              : 
      25              : // This file implements DB.CheckLevels() which checks that every entry in the
      26              : // DB is consistent with respect to the level invariant: any point (or the
      27              : // infinite number of points in a range tombstone) has a seqnum such that a
      28              : // point with the same UserKey at a lower level has a lower seqnum. This is an
      29              : // expensive check since it involves iterating over all the entries in the DB,
      30              : // hence only intended for tests or tools.
      31              : //
      32              : // If we ignore range tombstones, the consistency checking of points can be
      33              : // done with a simplified version of mergingIter. simpleMergingIter is that
      34              : // simplified version of mergingIter that only needs to step through points
      35              : // (analogous to only doing Next()). It can also easily accommodate
      36              : // consistency checking of points relative to range tombstones.
      37              : // simpleMergingIter does not do any seek optimizations present in mergingIter
      38              : // (it minimally needs to seek the range delete iterators to position them at
      39              : // or past the current point) since it does not want to miss points for
      40              : // purposes of consistency checking.
      41              : //
      42              : // Mutual consistency of range tombstones is non-trivial to check. One needs
      43              : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
      44              : // a lower level. The start key of the former is not contained in the latter
      45              : // and we can't use the exclusive end key, c, for a containment check since it
      46              : // is the sentinel key. We observe that if these tombstones were fragmented
      47              : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
      48              : // [b, c)#10 at the lower level and then it is is trivial to compare the two
      49              : // [b, c) tombstones. Note that this fragmentation needs to take into account
      50              : // that tombstones in a file may be untruncated and need to act within the
      51              : // bounds of the file. This checking is performed by checkRangeTombstones()
      52              : // and its helper functions.
      53              : 
      54              : // The per-level structure used by simpleMergingIter.
      55              : type simpleMergingIterLevel struct {
      56              :         iter         internalIterator
      57              :         rangeDelIter keyspan.FragmentIterator
      58              : 
      59              :         iterKV    *base.InternalKV
      60              :         tombstone *keyspan.Span
      61              : }
      62              : 
      63            2 : func (ml *simpleMergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
      64            2 :         ml.tombstone = nil
      65            2 :         if ml.rangeDelIter != nil {
      66            2 :                 ml.rangeDelIter.Close()
      67            2 :         }
      68            2 :         ml.rangeDelIter = iter
      69              : }
      70              : 
      71              : type simpleMergingIter struct {
      72              :         levels   []simpleMergingIterLevel
      73              :         snapshot base.SeqNum
      74              :         heap     simpleMergingIterHeap
      75              :         // The last point's key and level. For validation.
      76              :         lastKey     InternalKey
      77              :         lastLevel   int
      78              :         lastIterMsg string
      79              :         // A non-nil valueMerger means MERGE record processing is ongoing.
      80              :         valueMerger base.ValueMerger
      81              :         // The first error will cause step() to return false.
      82              :         err       error
      83              :         numPoints int64
      84              :         merge     Merge
      85              :         formatKey base.FormatKey
      86              : }
      87              : 
      88              : func (m *simpleMergingIter) init(
      89              :         merge Merge,
      90              :         cmp Compare,
      91              :         snapshot base.SeqNum,
      92              :         formatKey base.FormatKey,
      93              :         levels ...simpleMergingIterLevel,
      94            2 : ) {
      95            2 :         m.levels = levels
      96            2 :         m.formatKey = formatKey
      97            2 :         m.merge = merge
      98            2 :         m.snapshot = snapshot
      99            2 :         m.lastLevel = -1
     100            2 :         m.heap.cmp = cmp
     101            2 :         m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
     102            2 :         for i := range m.levels {
     103            2 :                 l := &m.levels[i]
     104            2 :                 l.iterKV = l.iter.First()
     105            2 :                 if l.iterKV != nil {
     106            2 :                         item := simpleMergingIterItem{
     107            2 :                                 index: i,
     108            2 :                                 kv:    *l.iterKV,
     109            2 :                         }
     110            2 :                         item.kv.K = l.iterKV.K.Clone()
     111            2 :                         m.heap.items = append(m.heap.items, item)
     112            2 :                 }
     113              :         }
     114            2 :         m.heap.init()
     115            2 : 
     116            2 :         if m.heap.len() == 0 {
     117            2 :                 return
     118            2 :         }
     119            2 :         m.positionRangeDels()
     120              : }
     121              : 
     122              : // Positions all the rangedel iterators at or past the current top of the
     123              : // heap, using SeekGE().
     124            2 : func (m *simpleMergingIter) positionRangeDels() {
     125            2 :         item := &m.heap.items[0]
     126            2 :         for i := range m.levels {
     127            2 :                 l := &m.levels[i]
     128            2 :                 if l.rangeDelIter == nil {
     129            2 :                         continue
     130              :                 }
     131            2 :                 t, err := l.rangeDelIter.SeekGE(item.kv.K.UserKey)
     132            2 :                 m.err = firstError(m.err, err)
     133            2 :                 l.tombstone = t
     134              :         }
     135              : }
     136              : 
     137              : // Returns true if not yet done.
     138            2 : func (m *simpleMergingIter) step() bool {
     139            2 :         if m.heap.len() == 0 || m.err != nil {
     140            2 :                 return false
     141            2 :         }
     142            2 :         item := &m.heap.items[0]
     143            2 :         l := &m.levels[item.index]
     144            2 :         // Sentinels are not relevant for this point checking.
     145            2 :         if !item.kv.K.IsExclusiveSentinel() && item.kv.K.Visible(m.snapshot, base.SeqNumMax) {
     146            2 :                 // This is a visible point key.
     147            2 :                 if !m.handleVisiblePoint(item, l) {
     148            0 :                         return false
     149            0 :                 }
     150              :         }
     151              : 
     152              :         // The iterator for the current level may be closed in the following call to
     153              :         // Next(). We save its debug string for potential use after it is closed -
     154              :         // either in this current step() invocation or on the next invocation.
     155            2 :         m.lastIterMsg = l.iter.String()
     156            2 : 
     157            2 :         // Step to the next point.
     158            2 :         l.iterKV = l.iter.Next()
     159            2 :         if l.iterKV == nil {
     160            2 :                 m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
     161            2 :                 l.iter = nil
     162            2 :                 m.heap.pop()
     163            2 :         } else {
     164            2 :                 // Check point keys in an sstable are ordered. Although not required, we check
     165            2 :                 // for memtables as well. A subtle check here is that successive sstables of
     166            2 :                 // L1 and higher levels are ordered. This happens when levelIter moves to the
     167            2 :                 // next sstable in the level, in which case item.key is previous sstable's
     168            2 :                 // last point key.
     169            2 :                 if !l.iterKV.K.IsExclusiveSentinel() && base.InternalCompare(m.heap.cmp, item.kv.K, l.iterKV.K) >= 0 {
     170            0 :                         m.err = errors.Errorf("out of order keys %s >= %s in %s",
     171            0 :                                 item.kv.K.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
     172            0 :                         return false
     173            0 :                 }
     174            2 :                 userKeyBuf := item.kv.K.UserKey[:0]
     175            2 :                 item.kv = *l.iterKV
     176            2 :                 item.kv.K.UserKey = append(userKeyBuf, l.iterKV.K.UserKey...)
     177            2 :                 if m.heap.len() > 1 {
     178            2 :                         m.heap.fix(0)
     179            2 :                 }
     180              :         }
     181            2 :         if m.err != nil {
     182            0 :                 return false
     183            0 :         }
     184            2 :         if m.heap.len() == 0 {
     185            2 :                 // If m.valueMerger != nil, the last record was a MERGE record.
     186            2 :                 if m.valueMerger != nil {
     187            2 :                         var closer io.Closer
     188            2 :                         var err error
     189            2 :                         _, closer, err = m.valueMerger.Finish(true /* includesBase */)
     190            2 :                         if closer != nil {
     191            0 :                                 err = errors.CombineErrors(err, closer.Close())
     192            0 :                         }
     193            2 :                         if err != nil {
     194            0 :                                 m.err = errors.CombineErrors(m.err,
     195            0 :                                         errors.Wrapf(err, "merge processing error on key %s in %s",
     196            0 :                                                 item.kv.K.Pretty(m.formatKey), m.lastIterMsg))
     197            0 :                         }
     198            2 :                         m.valueMerger = nil
     199              :                 }
     200            2 :                 return false
     201              :         }
     202            2 :         m.positionRangeDels()
     203            2 :         return true
     204              : }
     205              : 
     206              : // handleVisiblePoint returns true if validation succeeded and level checking
     207              : // can continue.
     208              : func (m *simpleMergingIter) handleVisiblePoint(
     209              :         item *simpleMergingIterItem, l *simpleMergingIterLevel,
     210            2 : ) (ok bool) {
     211            2 :         m.numPoints++
     212            2 :         keyChanged := m.heap.cmp(item.kv.K.UserKey, m.lastKey.UserKey) != 0
     213            2 :         if !keyChanged {
     214            2 :                 // At the same user key. We will see them in decreasing seqnum
     215            2 :                 // order so the lastLevel must not be lower.
     216            2 :                 if m.lastLevel > item.index {
     217            0 :                         m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
     218            0 :                                 item.kv.K.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
     219            0 :                                 m.lastIterMsg)
     220            0 :                         return false
     221            0 :                 }
     222            2 :                 m.lastLevel = item.index
     223            2 :         } else {
     224            2 :                 // The user key has changed.
     225            2 :                 m.lastKey.Trailer = item.kv.K.Trailer
     226            2 :                 m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.kv.K.UserKey...)
     227            2 :                 m.lastLevel = item.index
     228            2 :         }
     229              :         // Ongoing series of MERGE records ends with a MERGE record.
     230            2 :         if keyChanged && m.valueMerger != nil {
     231            2 :                 var closer io.Closer
     232            2 :                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     233            2 :                 if m.err == nil && closer != nil {
     234            0 :                         m.err = closer.Close()
     235            0 :                 }
     236            2 :                 m.valueMerger = nil
     237              :         }
     238            2 :         itemValue, _, err := item.kv.Value(nil)
     239            2 :         if err != nil {
     240            0 :                 m.err = err
     241            0 :                 return false
     242            0 :         }
     243            2 :         if m.valueMerger != nil {
     244            2 :                 // Ongoing series of MERGE records.
     245            2 :                 switch item.kv.K.Kind() {
     246            2 :                 case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
     247            2 :                         var closer io.Closer
     248            2 :                         _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     249            2 :                         if m.err == nil && closer != nil {
     250            0 :                                 m.err = closer.Close()
     251            0 :                         }
     252            2 :                         m.valueMerger = nil
     253            2 :                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     254            2 :                         m.err = m.valueMerger.MergeOlder(itemValue)
     255            2 :                         if m.err == nil {
     256            2 :                                 var closer io.Closer
     257            2 :                                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     258            2 :                                 if m.err == nil && closer != nil {
     259            0 :                                         m.err = closer.Close()
     260            0 :                                 }
     261              :                         }
     262            2 :                         m.valueMerger = nil
     263            2 :                 case InternalKeyKindMerge:
     264            2 :                         m.err = m.valueMerger.MergeOlder(itemValue)
     265            0 :                 default:
     266            0 :                         m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
     267            0 :                                 item.kv.K.Pretty(m.formatKey),
     268            0 :                                 l.iter)
     269            0 :                         return false
     270              :                 }
     271            2 :         } else if item.kv.K.Kind() == InternalKeyKindMerge && m.err == nil {
     272            2 :                 // New series of MERGE records.
     273            2 :                 m.valueMerger, m.err = m.merge(item.kv.K.UserKey, itemValue)
     274            2 :         }
     275            2 :         if m.err != nil {
     276            0 :                 m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
     277            0 :                         item.kv.K.Pretty(m.formatKey), l.iter)
     278            0 :                 return false
     279            0 :         }
     280              :         // Is this point covered by a tombstone at a lower level? Note that all these
     281              :         // iterators must be positioned at a key > item.key.
     282            2 :         for level := item.index + 1; level < len(m.levels); level++ {
     283            2 :                 lvl := &m.levels[level]
     284            2 :                 if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
     285            2 :                         continue
     286              :                 }
     287            2 :                 if lvl.tombstone.Contains(m.heap.cmp, item.kv.K.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.kv.K.SeqNum()) {
     288            0 :                         m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
     289            0 :                                 lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.kv.K.Pretty(m.formatKey),
     290            0 :                                 l.iter)
     291            0 :                         return false
     292            0 :                 }
     293              :         }
     294            2 :         return true
     295              : }
     296              : 
     297              : // Checking that range tombstones are mutually consistent is performed by
     298              : // checkRangeTombstones(). See the overview comment at the top of the file.
     299              : //
     300              : // We do this check as follows:
     301              : // - Collect the tombstones for each level, put them into one pool of tombstones
     302              : //   along with their level information (addTombstonesFromIter()).
     303              : // - Collect the start and end user keys from all these tombstones
     304              : //   (collectAllUserKey()) and use them to fragment all the tombstones
     305              : //   (fragmentUsingUserKey()).
     306              : // - Sort tombstones by start key and decreasing seqnum (all tombstones that
     307              : //   have the same start key will have the same end key because they have been
     308              : //   fragmented)
     309              : // - Iterate and check (iterateAndCheckTombstones()).
     310              : //
     311              : // Note that this simple approach requires holding all the tombstones across all
     312              : // levels in-memory. A more sophisticated incremental approach could be devised,
     313              : // if necessary.
     314              : 
     315              : // A tombstone and the corresponding level it was found in.
     316              : type tombstoneWithLevel struct {
     317              :         keyspan.Span
     318              :         level int
     319              :         // The level in LSM. A -1 means it's a memtable.
     320              :         lsmLevel int
     321              :         tableNum base.TableNum
     322              : }
     323              : 
     324              : func iterateAndCheckTombstones(
     325              :         cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
     326            2 : ) error {
     327            2 :         slices.SortFunc(tombstones, func(a, b tombstoneWithLevel) int {
     328            2 :                 if v := cmp(a.Start, b.Start); v != 0 {
     329            2 :                         return v
     330            2 :                 }
     331            2 :                 return stdcmp.Compare(b.LargestSeqNum(), a.LargestSeqNum())
     332              :         })
     333              : 
     334              :         // For a sequence of tombstones that share the same start UserKey, we will
     335              :         // encounter them in non-increasing seqnum order and so should encounter them
     336              :         // in non-decreasing level order.
     337            2 :         lastTombstone := tombstoneWithLevel{}
     338            2 :         for _, t := range tombstones {
     339            2 :                 if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
     340            0 :                         return errors.Errorf("encountered tombstone %s in %s"+
     341            0 :                                 " that has a lower seqnum than the same tombstone in %s",
     342            0 :                                 t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.tableNum),
     343            0 :                                 levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.tableNum))
     344            0 :                 }
     345            2 :                 lastTombstone = t
     346              :         }
     347            2 :         return nil
     348              : }
     349              : 
     350              : type checkConfig struct {
     351              :         logger    Logger
     352              :         comparer  *Comparer
     353              :         readState *readState
     354              :         newIters  tableNewIters
     355              :         seqNum    base.SeqNum
     356              :         stats     *CheckLevelsStats
     357              :         merge     Merge
     358              :         formatKey base.FormatKey
     359              :         readEnv   block.ReadEnv
     360              :         // blobValueFetcher is the ValueFetcher to use when retrieving values stored
     361              :         // externally in blob files.
     362              :         blobValueFetcher blob.ValueFetcher
     363              : }
     364              : 
     365              : // cmp is shorthand for comparer.Compare.
     366            2 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
     367              : 
     368            2 : func checkRangeTombstones(c *checkConfig) error {
     369            2 :         var level int
     370            2 :         var tombstones []tombstoneWithLevel
     371            2 :         var err error
     372            2 : 
     373            2 :         memtables := c.readState.memtables
     374            2 :         for i := len(memtables) - 1; i >= 0; i-- {
     375            2 :                 iter := memtables[i].newRangeDelIter(nil)
     376            2 :                 if iter == nil {
     377            2 :                         continue
     378              :                 }
     379            2 :                 tombstones, err = addTombstonesFromIter(
     380            2 :                         iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
     381            2 :                 )
     382            2 :                 iter.Close()
     383            2 :                 if err != nil {
     384            0 :                         return err
     385            0 :                 }
     386            2 :                 level++
     387              :         }
     388              : 
     389            2 :         current := c.readState.current
     390            2 :         addTombstonesFromLevel := func(files iter.Seq[*manifest.TableMetadata], lsmLevel int) error {
     391            2 :                 for f := range files {
     392            2 :                         iters, err := c.newIters(
     393            2 :                                 context.Background(), f, &IterOptions{layer: manifest.Level(lsmLevel)},
     394            2 :                                 internalIterOpts{}, iterRangeDeletions)
     395            2 :                         if err != nil {
     396            0 :                                 return err
     397            0 :                         }
     398            2 :                         tombstones, err = addTombstonesFromIter(iters.RangeDeletion(), level, lsmLevel, f.TableNum,
     399            2 :                                 tombstones, c.seqNum, c.cmp, c.formatKey)
     400            2 :                         _ = iters.CloseAll()
     401            2 : 
     402            2 :                         if err != nil {
     403            0 :                                 return err
     404            0 :                         }
     405              :                 }
     406            2 :                 return nil
     407              :         }
     408              :         // Now the levels with untruncated tombsones.
     409            2 :         for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
     410            2 :                 if current.L0SublevelFiles[i].Empty() {
     411            0 :                         continue
     412              :                 }
     413            2 :                 err := addTombstonesFromLevel(current.L0SublevelFiles[i].All(), 0)
     414            2 :                 if err != nil {
     415            0 :                         return err
     416            0 :                 }
     417            2 :                 level++
     418              :         }
     419            2 :         for i := 1; i < len(current.Levels); i++ {
     420            2 :                 if err := addTombstonesFromLevel(current.Levels[i].All(), i); err != nil {
     421            0 :                         return err
     422            0 :                 }
     423            2 :                 level++
     424              :         }
     425            2 :         if c.stats != nil {
     426            1 :                 c.stats.NumTombstones = len(tombstones)
     427            1 :         }
     428              :         // We now have truncated tombstones.
     429              :         // Fragment them all.
     430            2 :         userKeys := collectAllUserKeys(c.cmp, tombstones)
     431            2 :         tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
     432            2 :         return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
     433              : }
     434              : 
     435            0 : func levelOrMemtable(lsmLevel int, tableNum base.TableNum) string {
     436            0 :         if lsmLevel == -1 {
     437            0 :                 return "memtable"
     438            0 :         }
     439            0 :         return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, tableNum)
     440              : }
     441              : 
     442              : func addTombstonesFromIter(
     443              :         iter keyspan.FragmentIterator,
     444              :         level int,
     445              :         lsmLevel int,
     446              :         tableNum base.TableNum,
     447              :         tombstones []tombstoneWithLevel,
     448              :         seqNum base.SeqNum,
     449              :         cmp Compare,
     450              :         formatKey base.FormatKey,
     451            2 : ) (_ []tombstoneWithLevel, err error) {
     452            2 :         var prevTombstone keyspan.Span
     453            2 :         tomb, err := iter.First()
     454            2 :         for ; tomb != nil; tomb, err = iter.Next() {
     455            2 :                 t := tomb.Visible(seqNum)
     456            2 :                 if t.Empty() {
     457            2 :                         continue
     458              :                 }
     459            2 :                 t = t.Clone()
     460            2 :                 // This is mainly a test for rangeDelV2 formatted blocks which are expected to
     461            2 :                 // be ordered and fragmented on disk. But we anyways check for memtables,
     462            2 :                 // rangeDelV1 as well.
     463            2 :                 if cmp(prevTombstone.End, t.Start) > 0 {
     464            0 :                         return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
     465            0 :                                 prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, tableNum))
     466            0 :                 }
     467            2 :                 prevTombstone = t
     468            2 : 
     469            2 :                 if !t.Empty() {
     470            2 :                         tombstones = append(tombstones, tombstoneWithLevel{
     471            2 :                                 Span:     t,
     472            2 :                                 level:    level,
     473            2 :                                 lsmLevel: lsmLevel,
     474            2 :                                 tableNum: tableNum,
     475            2 :                         })
     476            2 :                 }
     477              :         }
     478            2 :         if err != nil {
     479            0 :                 return nil, err
     480            0 :         }
     481            2 :         return tombstones, nil
     482              : }
     483              : 
     484            2 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
     485            2 :         keys := make([][]byte, 0, len(tombstones)*2)
     486            2 :         for _, t := range tombstones {
     487            2 :                 keys = append(keys, t.Start, t.End)
     488            2 :         }
     489            2 :         slices.SortFunc(keys, cmp)
     490            2 :         return slices.CompactFunc(keys, func(a, b []byte) bool {
     491            2 :                 return cmp(a, b) == 0
     492            2 :         })
     493              : }
     494              : 
     495              : func fragmentUsingUserKeys(
     496              :         cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
     497            2 : ) []tombstoneWithLevel {
     498            2 :         var buf []tombstoneWithLevel
     499            2 :         for _, t := range tombstones {
     500            2 :                 // Find the first position with tombstone start < user key
     501            2 :                 i := sort.Search(len(userKeys), func(i int) bool {
     502            2 :                         return cmp(t.Start, userKeys[i]) < 0
     503            2 :                 })
     504            2 :                 for ; i < len(userKeys); i++ {
     505            2 :                         if cmp(userKeys[i], t.End) >= 0 {
     506            2 :                                 break
     507              :                         }
     508            2 :                         tPartial := t
     509            2 :                         tPartial.End = userKeys[i]
     510            2 :                         buf = append(buf, tPartial)
     511            2 :                         t.Start = userKeys[i]
     512              :                 }
     513            2 :                 buf = append(buf, t)
     514              :         }
     515            2 :         return buf
     516              : }
     517              : 
     518              : // CheckLevelsStats provides basic stats on points and tombstones encountered.
     519              : type CheckLevelsStats struct {
     520              :         NumPoints     int64
     521              :         NumTombstones int
     522              : }
     523              : 
     524              : // CheckLevels checks:
     525              : //   - Every entry in the DB is consistent with the level invariant. See the
     526              : //     comment at the top of the file.
     527              : //   - Point keys in sstables are ordered.
     528              : //   - Range delete tombstones in sstables are ordered and fragmented.
     529              : //   - Successful processing of all MERGE records.
     530            2 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
     531            2 :         // Grab and reference the current readState.
     532            2 :         readState := d.loadReadState()
     533            2 :         defer readState.unref()
     534            2 : 
     535            2 :         // Determine the seqnum to read at after grabbing the read state (current and
     536            2 :         // memtables) above.
     537            2 :         seqNum := d.mu.versions.visibleSeqNum.Load()
     538            2 : 
     539            2 :         checkConfig := &checkConfig{
     540            2 :                 logger:    d.opts.Logger,
     541            2 :                 comparer:  d.opts.Comparer,
     542            2 :                 readState: readState,
     543            2 :                 newIters:  d.newIters,
     544            2 :                 seqNum:    seqNum,
     545            2 :                 stats:     stats,
     546            2 :                 merge:     d.merge,
     547            2 :                 formatKey: d.opts.Comparer.FormatKey,
     548            2 :                 readEnv:   block.ReadEnv{
     549            2 :                         // TODO(jackson): Add categorized stats.
     550            2 :                 },
     551            2 :         }
     552            2 :         checkConfig.blobValueFetcher.Init(d.fileCache, checkConfig.readEnv)
     553            2 :         defer func() { _ = checkConfig.blobValueFetcher.Close() }()
     554            2 :         return checkLevelsInternal(checkConfig)
     555              : }
     556              : 
     557            2 : func checkLevelsInternal(c *checkConfig) (err error) {
     558            2 :         internalOpts := internalIterOpts{
     559            2 :                 readEnv:          sstable.ReadEnv{Block: c.readEnv},
     560            2 :                 blobValueFetcher: &c.blobValueFetcher,
     561            2 :         }
     562            2 : 
     563            2 :         // Phase 1: Use a simpleMergingIter to step through all the points and ensure
     564            2 :         // that points with the same user key at different levels are not inverted
     565            2 :         // wrt sequence numbers and the same holds for tombstones that cover points.
     566            2 :         // To do this, one needs to construct a simpleMergingIter which is similar to
     567            2 :         // how one constructs a mergingIter.
     568            2 : 
     569            2 :         // Add mem tables from newest to oldest.
     570            2 :         var mlevels []simpleMergingIterLevel
     571            2 :         defer func() {
     572            2 :                 for i := range mlevels {
     573            2 :                         l := &mlevels[i]
     574            2 :                         if l.iter != nil {
     575            2 :                                 err = firstError(err, l.iter.Close())
     576            2 :                                 l.iter = nil
     577            2 :                         }
     578            2 :                         if l.rangeDelIter != nil {
     579            2 :                                 l.rangeDelIter.Close()
     580            2 :                                 l.rangeDelIter = nil
     581            2 :                         }
     582              :                 }
     583              :         }()
     584              : 
     585            2 :         memtables := c.readState.memtables
     586            2 :         for i := len(memtables) - 1; i >= 0; i-- {
     587            2 :                 mem := memtables[i]
     588            2 :                 mlevels = append(mlevels, simpleMergingIterLevel{
     589            2 :                         iter:         mem.newIter(nil),
     590            2 :                         rangeDelIter: mem.newRangeDelIter(nil),
     591            2 :                 })
     592            2 :         }
     593              : 
     594            2 :         current := c.readState.current
     595            2 :         // Determine the final size for mlevels so that there are no more
     596            2 :         // reallocations. levelIter will hold a pointer to elements in mlevels.
     597            2 :         start := len(mlevels)
     598            2 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     599            2 :                 if current.L0SublevelFiles[sublevel].Empty() {
     600            0 :                         continue
     601              :                 }
     602            2 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     603              :         }
     604            2 :         for level := 1; level < len(current.Levels); level++ {
     605            2 :                 if current.Levels[level].Empty() {
     606            2 :                         continue
     607              :                 }
     608            2 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     609              :         }
     610            2 :         mlevelAlloc := mlevels[start:]
     611            2 :         // Add L0 files by sublevel.
     612            2 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     613            2 :                 if current.L0SublevelFiles[sublevel].Empty() {
     614            0 :                         continue
     615              :                 }
     616            2 :                 manifestIter := current.L0SublevelFiles[sublevel].Iter()
     617            2 :                 iterOpts := IterOptions{logger: c.logger}
     618            2 :                 li := &levelIter{}
     619            2 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
     620            2 :                         manifest.L0Sublevel(sublevel), internalOpts)
     621            2 :                 li.initRangeDel(&mlevelAlloc[0])
     622            2 :                 mlevelAlloc[0].iter = li
     623            2 :                 mlevelAlloc = mlevelAlloc[1:]
     624              :         }
     625            2 :         for level := 1; level < len(current.Levels); level++ {
     626            2 :                 if current.Levels[level].Empty() {
     627            2 :                         continue
     628              :                 }
     629              : 
     630            2 :                 iterOpts := IterOptions{logger: c.logger}
     631            2 :                 li := &levelIter{}
     632            2 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters,
     633            2 :                         current.Levels[level].Iter(), manifest.Level(level), internalOpts)
     634            2 :                 li.initRangeDel(&mlevelAlloc[0])
     635            2 :                 mlevelAlloc[0].iter = li
     636            2 :                 mlevelAlloc = mlevelAlloc[1:]
     637              :         }
     638              : 
     639            2 :         mergingIter := &simpleMergingIter{}
     640            2 :         mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
     641            2 :         for cont := mergingIter.step(); cont; cont = mergingIter.step() {
     642            2 :         }
     643            2 :         if err := mergingIter.err; err != nil {
     644            0 :                 return err
     645            0 :         }
     646            2 :         if c.stats != nil {
     647            1 :                 c.stats.NumPoints = mergingIter.numPoints
     648            1 :         }
     649              : 
     650              :         // Phase 2: Check that the tombstones are mutually consistent.
     651            2 :         return checkRangeTombstones(c)
     652              : }
     653              : 
     654              : type simpleMergingIterItem struct {
     655              :         index int
     656              :         kv    base.InternalKV
     657              : }
     658              : 
     659              : type simpleMergingIterHeap struct {
     660              :         cmp     Compare
     661              :         reverse bool
     662              :         items   []simpleMergingIterItem
     663              : }
     664              : 
     665            2 : func (h *simpleMergingIterHeap) len() int {
     666            2 :         return len(h.items)
     667            2 : }
     668              : 
     669            2 : func (h *simpleMergingIterHeap) less(i, j int) bool {
     670            2 :         ikey, jkey := h.items[i].kv.K, h.items[j].kv.K
     671            2 :         if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
     672            2 :                 if h.reverse {
     673            0 :                         return c > 0
     674            0 :                 }
     675            2 :                 return c < 0
     676              :         }
     677            2 :         if h.reverse {
     678            0 :                 return ikey.Trailer < jkey.Trailer
     679            0 :         }
     680            2 :         return ikey.Trailer > jkey.Trailer
     681              : }
     682              : 
     683            2 : func (h *simpleMergingIterHeap) swap(i, j int) {
     684            2 :         h.items[i], h.items[j] = h.items[j], h.items[i]
     685            2 : }
     686              : 
     687              : // init, fix, up and down are copied from the go stdlib.
     688            2 : func (h *simpleMergingIterHeap) init() {
     689            2 :         // heapify
     690            2 :         n := h.len()
     691            2 :         for i := n/2 - 1; i >= 0; i-- {
     692            2 :                 h.down(i, n)
     693            2 :         }
     694              : }
     695              : 
     696            2 : func (h *simpleMergingIterHeap) fix(i int) {
     697            2 :         if !h.down(i, h.len()) {
     698            2 :                 h.up(i)
     699            2 :         }
     700              : }
     701              : 
     702            2 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
     703            2 :         n := h.len() - 1
     704            2 :         h.swap(0, n)
     705            2 :         h.down(0, n)
     706            2 :         item := &h.items[n]
     707            2 :         h.items = h.items[:n]
     708            2 :         return item
     709            2 : }
     710              : 
     711            2 : func (h *simpleMergingIterHeap) up(j int) {
     712            2 :         for {
     713            2 :                 i := (j - 1) / 2 // parent
     714            2 :                 if i == j || !h.less(j, i) {
     715            2 :                         break
     716              :                 }
     717            0 :                 h.swap(i, j)
     718            0 :                 j = i
     719              :         }
     720              : }
     721              : 
     722            2 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
     723            2 :         i := i0
     724            2 :         for {
     725            2 :                 j1 := 2*i + 1
     726            2 :                 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
     727            2 :                         break
     728              :                 }
     729            2 :                 j := j1 // left child
     730            2 :                 if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
     731            2 :                         j = j2 // = 2*i + 2  // right child
     732            2 :                 }
     733            2 :                 if !h.less(j, i) {
     734            2 :                         break
     735              :                 }
     736            2 :                 h.swap(i, j)
     737            2 :                 i = j
     738              :         }
     739            2 :         return i > i0
     740              : }
        

Generated by: LCOV version 2.0-1