LCOV - code coverage report
Current view: top level - pebble - level_checker.go (source / functions) Coverage Total Hit
Test: 2025-07-28 08:19Z bb252436 - meta test only.lcov Lines: 79.6 % 569 453
Test Date: 2025-07-28 08:20:54 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         stdcmp "cmp"
       9              :         "context"
      10              :         "fmt"
      11              :         "io"
      12              :         "iter"
      13              :         "maps"
      14              :         "slices"
      15              :         "sort"
      16              : 
      17              :         "github.com/cockroachdb/errors"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/keyspan"
      20              :         "github.com/cockroachdb/pebble/internal/manifest"
      21              :         "github.com/cockroachdb/pebble/sstable"
      22              :         "github.com/cockroachdb/pebble/sstable/blob"
      23              :         "github.com/cockroachdb/pebble/sstable/block"
      24              :         "github.com/cockroachdb/pebble/sstable/colblk"
      25              : )
      26              : 
      27              : // This file implements DB.CheckLevels() which checks that every entry in the
      28              : // DB is consistent with respect to the level invariant: any point (or the
      29              : // infinite number of points in a range tombstone) has a seqnum such that a
      30              : // point with the same UserKey at a lower level has a lower seqnum. This is an
      31              : // expensive check since it involves iterating over all the entries in the DB,
      32              : // hence only intended for tests or tools.
      33              : //
      34              : // If we ignore range tombstones, the consistency checking of points can be
      35              : // done with a simplified version of mergingIter. simpleMergingIter is that
      36              : // simplified version of mergingIter that only needs to step through points
      37              : // (analogous to only doing Next()). It can also easily accommodate
      38              : // consistency checking of points relative to range tombstones.
      39              : // simpleMergingIter does not do any seek optimizations present in mergingIter
      40              : // (it minimally needs to seek the range delete iterators to position them at
      41              : // or past the current point) since it does not want to miss points for
      42              : // purposes of consistency checking.
      43              : //
      44              : // Mutual consistency of range tombstones is non-trivial to check. One needs
      45              : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
      46              : // a lower level. The start key of the former is not contained in the latter
      47              : // and we can't use the exclusive end key, c, for a containment check since it
      48              : // is the sentinel key. We observe that if these tombstones were fragmented
      49              : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
      50              : // [b, c)#10 at the lower level and then it is is trivial to compare the two
      51              : // [b, c) tombstones. Note that this fragmentation needs to take into account
      52              : // that tombstones in a file may be untruncated and need to act within the
      53              : // bounds of the file. This checking is performed by checkRangeTombstones()
      54              : // and its helper functions.
      55              : 
      56              : // The per-level structure used by simpleMergingIter.
      57              : type simpleMergingIterLevel struct {
      58              :         iter         internalIterator
      59              :         rangeDelIter keyspan.FragmentIterator
      60              : 
      61              :         iterKV    *base.InternalKV
      62              :         tombstone *keyspan.Span
      63              : }
      64              : 
      65            1 : func (ml *simpleMergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
      66            1 :         ml.tombstone = nil
      67            1 :         if ml.rangeDelIter != nil {
      68            1 :                 ml.rangeDelIter.Close()
      69            1 :         }
      70            1 :         ml.rangeDelIter = iter
      71              : }
      72              : 
      73              : type simpleMergingIter struct {
      74              :         levels   []simpleMergingIterLevel
      75              :         snapshot base.SeqNum
      76              :         heap     simpleMergingIterHeap
      77              :         // The last point's key and level. For validation.
      78              :         lastKey     InternalKey
      79              :         lastLevel   int
      80              :         lastIterMsg string
      81              :         // A non-nil valueMerger means MERGE record processing is ongoing.
      82              :         valueMerger base.ValueMerger
      83              :         // The first error will cause step() to return false.
      84              :         err       error
      85              :         numPoints int64
      86              :         merge     Merge
      87              :         formatKey base.FormatKey
      88              : }
      89              : 
      90              : func (m *simpleMergingIter) init(
      91              :         merge Merge,
      92              :         cmp Compare,
      93              :         snapshot base.SeqNum,
      94              :         formatKey base.FormatKey,
      95              :         levels ...simpleMergingIterLevel,
      96            1 : ) {
      97            1 :         m.levels = levels
      98            1 :         m.formatKey = formatKey
      99            1 :         m.merge = merge
     100            1 :         m.snapshot = snapshot
     101            1 :         m.lastLevel = -1
     102            1 :         m.heap.cmp = cmp
     103            1 :         m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
     104            1 :         for i := range m.levels {
     105            1 :                 l := &m.levels[i]
     106            1 :                 l.iterKV = l.iter.First()
     107            1 :                 if l.iterKV != nil {
     108            1 :                         item := simpleMergingIterItem{
     109            1 :                                 index: i,
     110            1 :                                 kv:    *l.iterKV,
     111            1 :                         }
     112            1 :                         item.kv.K = l.iterKV.K.Clone()
     113            1 :                         m.heap.items = append(m.heap.items, item)
     114            1 :                 }
     115              :         }
     116            1 :         m.heap.init()
     117            1 : 
     118            1 :         if m.heap.len() == 0 {
     119            1 :                 return
     120            1 :         }
     121            1 :         m.positionRangeDels()
     122              : }
     123              : 
     124              : // Positions all the rangedel iterators at or past the current top of the
     125              : // heap, using SeekGE().
     126            1 : func (m *simpleMergingIter) positionRangeDels() {
     127            1 :         item := &m.heap.items[0]
     128            1 :         for i := range m.levels {
     129            1 :                 l := &m.levels[i]
     130            1 :                 if l.rangeDelIter == nil {
     131            1 :                         continue
     132              :                 }
     133            1 :                 t, err := l.rangeDelIter.SeekGE(item.kv.K.UserKey)
     134            1 :                 m.err = firstError(m.err, err)
     135            1 :                 l.tombstone = t
     136              :         }
     137              : }
     138              : 
     139              : // Returns true if not yet done.
     140            1 : func (m *simpleMergingIter) step() bool {
     141            1 :         if m.heap.len() == 0 || m.err != nil {
     142            1 :                 return false
     143            1 :         }
     144            1 :         item := &m.heap.items[0]
     145            1 :         l := &m.levels[item.index]
     146            1 :         // Sentinels are not relevant for this point checking.
     147            1 :         if !item.kv.K.IsExclusiveSentinel() && item.kv.K.Visible(m.snapshot, base.SeqNumMax) {
     148            1 :                 // This is a visible point key.
     149            1 :                 if !m.handleVisiblePoint(item, l) {
     150            0 :                         return false
     151            0 :                 }
     152              :         }
     153              : 
     154              :         // The iterator for the current level may be closed in the following call to
     155              :         // Next(). We save its debug string for potential use after it is closed -
     156              :         // either in this current step() invocation or on the next invocation.
     157            1 :         m.lastIterMsg = l.iter.String()
     158            1 : 
     159            1 :         // Step to the next point.
     160            1 :         l.iterKV = l.iter.Next()
     161            1 :         if l.iterKV == nil {
     162            1 :                 m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
     163            1 :                 l.iter = nil
     164            1 :                 m.heap.pop()
     165            1 :         } else {
     166            1 :                 // Check point keys in an sstable are ordered. Although not required, we check
     167            1 :                 // for memtables as well. A subtle check here is that successive sstables of
     168            1 :                 // L1 and higher levels are ordered. This happens when levelIter moves to the
     169            1 :                 // next sstable in the level, in which case item.key is previous sstable's
     170            1 :                 // last point key.
     171            1 :                 if !l.iterKV.K.IsExclusiveSentinel() && base.InternalCompare(m.heap.cmp, item.kv.K, l.iterKV.K) >= 0 {
     172            0 :                         m.err = errors.Errorf("out of order keys %s >= %s in %s",
     173            0 :                                 item.kv.K.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
     174            0 :                         return false
     175            0 :                 }
     176            1 :                 userKeyBuf := item.kv.K.UserKey[:0]
     177            1 :                 item.kv = *l.iterKV
     178            1 :                 item.kv.K.UserKey = append(userKeyBuf, l.iterKV.K.UserKey...)
     179            1 :                 if m.heap.len() > 1 {
     180            1 :                         m.heap.fix(0)
     181            1 :                 }
     182              :         }
     183            1 :         if m.err != nil {
     184            0 :                 return false
     185            0 :         }
     186            1 :         if m.heap.len() == 0 {
     187            1 :                 // If m.valueMerger != nil, the last record was a MERGE record.
     188            1 :                 if m.valueMerger != nil {
     189            1 :                         var closer io.Closer
     190            1 :                         var err error
     191            1 :                         _, closer, err = m.valueMerger.Finish(true /* includesBase */)
     192            1 :                         if closer != nil {
     193            0 :                                 err = errors.CombineErrors(err, closer.Close())
     194            0 :                         }
     195            1 :                         if err != nil {
     196            0 :                                 m.err = errors.CombineErrors(m.err,
     197            0 :                                         errors.Wrapf(err, "merge processing error on key %s in %s",
     198            0 :                                                 item.kv.K.Pretty(m.formatKey), m.lastIterMsg))
     199            0 :                         }
     200            1 :                         m.valueMerger = nil
     201              :                 }
     202            1 :                 return false
     203              :         }
     204            1 :         m.positionRangeDels()
     205            1 :         return true
     206              : }
     207              : 
     208              : // handleVisiblePoint returns true if validation succeeded and level checking
     209              : // can continue.
     210              : func (m *simpleMergingIter) handleVisiblePoint(
     211              :         item *simpleMergingIterItem, l *simpleMergingIterLevel,
     212            1 : ) (ok bool) {
     213            1 :         m.numPoints++
     214            1 :         keyChanged := m.heap.cmp(item.kv.K.UserKey, m.lastKey.UserKey) != 0
     215            1 :         if !keyChanged {
     216            1 :                 // At the same user key. We will see them in decreasing seqnum
     217            1 :                 // order so the lastLevel must not be lower.
     218            1 :                 if m.lastLevel > item.index {
     219            0 :                         m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
     220            0 :                                 item.kv.K.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
     221            0 :                                 m.lastIterMsg)
     222            0 :                         return false
     223            0 :                 }
     224            1 :                 m.lastLevel = item.index
     225            1 :         } else {
     226            1 :                 // The user key has changed.
     227            1 :                 m.lastKey.Trailer = item.kv.K.Trailer
     228            1 :                 m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.kv.K.UserKey...)
     229            1 :                 m.lastLevel = item.index
     230            1 :         }
     231              :         // Ongoing series of MERGE records ends with a MERGE record.
     232            1 :         if keyChanged && m.valueMerger != nil {
     233            1 :                 var closer io.Closer
     234            1 :                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     235            1 :                 if m.err == nil && closer != nil {
     236            0 :                         m.err = closer.Close()
     237            0 :                 }
     238            1 :                 m.valueMerger = nil
     239              :         }
     240            1 :         itemValue, _, err := item.kv.Value(nil)
     241            1 :         if err != nil {
     242            0 :                 m.err = err
     243            0 :                 return false
     244            0 :         }
     245            1 :         if m.valueMerger != nil {
     246            1 :                 // Ongoing series of MERGE records.
     247            1 :                 switch item.kv.K.Kind() {
     248            1 :                 case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
     249            1 :                         var closer io.Closer
     250            1 :                         _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     251            1 :                         if m.err == nil && closer != nil {
     252            0 :                                 m.err = closer.Close()
     253            0 :                         }
     254            1 :                         m.valueMerger = nil
     255            1 :                 case InternalKeyKindSet, InternalKeyKindSetWithDelete:
     256            1 :                         m.err = m.valueMerger.MergeOlder(itemValue)
     257            1 :                         if m.err == nil {
     258            1 :                                 var closer io.Closer
     259            1 :                                 _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
     260            1 :                                 if m.err == nil && closer != nil {
     261            0 :                                         m.err = closer.Close()
     262            0 :                                 }
     263              :                         }
     264            1 :                         m.valueMerger = nil
     265            1 :                 case InternalKeyKindMerge:
     266            1 :                         m.err = m.valueMerger.MergeOlder(itemValue)
     267            0 :                 default:
     268            0 :                         m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
     269            0 :                                 item.kv.K.Pretty(m.formatKey),
     270            0 :                                 l.iter)
     271            0 :                         return false
     272              :                 }
     273            1 :         } else if item.kv.K.Kind() == InternalKeyKindMerge && m.err == nil {
     274            1 :                 // New series of MERGE records.
     275            1 :                 m.valueMerger, m.err = m.merge(item.kv.K.UserKey, itemValue)
     276            1 :         }
     277            1 :         if m.err != nil {
     278            0 :                 m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
     279            0 :                         item.kv.K.Pretty(m.formatKey), l.iter)
     280            0 :                 return false
     281            0 :         }
     282              :         // Is this point covered by a tombstone at a lower level? Note that all these
     283              :         // iterators must be positioned at a key > item.key.
     284            1 :         for level := item.index + 1; level < len(m.levels); level++ {
     285            1 :                 lvl := &m.levels[level]
     286            1 :                 if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
     287            1 :                         continue
     288              :                 }
     289            1 :                 if lvl.tombstone.Contains(m.heap.cmp, item.kv.K.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.kv.K.SeqNum()) {
     290            0 :                         m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
     291            0 :                                 lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.kv.K.Pretty(m.formatKey),
     292            0 :                                 l.iter)
     293            0 :                         return false
     294            0 :                 }
     295              :         }
     296            1 :         return true
     297              : }
     298              : 
     299              : // Checking that range tombstones are mutually consistent is performed by
     300              : // checkRangeTombstones(). See the overview comment at the top of the file.
     301              : //
     302              : // We do this check as follows:
     303              : // - Collect the tombstones for each level, put them into one pool of tombstones
     304              : //   along with their level information (addTombstonesFromIter()).
     305              : // - Collect the start and end user keys from all these tombstones
     306              : //   (collectAllUserKey()) and use them to fragment all the tombstones
     307              : //   (fragmentUsingUserKey()).
     308              : // - Sort tombstones by start key and decreasing seqnum (all tombstones that
     309              : //   have the same start key will have the same end key because they have been
     310              : //   fragmented)
     311              : // - Iterate and check (iterateAndCheckTombstones()).
     312              : //
     313              : // Note that this simple approach requires holding all the tombstones across all
     314              : // levels in-memory. A more sophisticated incremental approach could be devised,
     315              : // if necessary.
     316              : 
     317              : // A tombstone and the corresponding level it was found in.
     318              : type tombstoneWithLevel struct {
     319              :         keyspan.Span
     320              :         level int
     321              :         // The level in LSM. A -1 means it's a memtable.
     322              :         lsmLevel int
     323              :         tableNum base.TableNum
     324              : }
     325              : 
     326              : func iterateAndCheckTombstones(
     327              :         cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
     328            1 : ) error {
     329            1 :         slices.SortFunc(tombstones, func(a, b tombstoneWithLevel) int {
     330            1 :                 if v := cmp(a.Start, b.Start); v != 0 {
     331            1 :                         return v
     332            1 :                 }
     333            1 :                 return stdcmp.Compare(b.LargestSeqNum(), a.LargestSeqNum())
     334              :         })
     335              : 
     336              :         // For a sequence of tombstones that share the same start UserKey, we will
     337              :         // encounter them in non-increasing seqnum order and so should encounter them
     338              :         // in non-decreasing level order.
     339            1 :         lastTombstone := tombstoneWithLevel{}
     340            1 :         for _, t := range tombstones {
     341            1 :                 if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
     342            0 :                         return errors.Errorf("encountered tombstone %s in %s"+
     343            0 :                                 " that has a lower seqnum than the same tombstone in %s",
     344            0 :                                 t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.tableNum),
     345            0 :                                 levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.tableNum))
     346            0 :                 }
     347            1 :                 lastTombstone = t
     348              :         }
     349            1 :         return nil
     350              : }
     351              : 
     352              : type checkConfig struct {
     353              :         logger    Logger
     354              :         comparer  *Comparer
     355              :         readState *readState
     356              :         newIters  tableNewIters
     357              :         seqNum    base.SeqNum
     358              :         stats     *CheckLevelsStats
     359              :         merge     Merge
     360              :         formatKey base.FormatKey
     361              :         readEnv   block.ReadEnv
     362              :         // blobValueFetcher is the ValueFetcher to use when retrieving values stored
     363              :         // externally in blob files.
     364              :         blobValueFetcher blob.ValueFetcher
     365              :         fileCache        *fileCacheHandle
     366              : }
     367              : 
     368              : // cmp is shorthand for comparer.Compare.
     369            1 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
     370              : 
     371            1 : func checkRangeTombstones(c *checkConfig) error {
     372            1 :         var level int
     373            1 :         var tombstones []tombstoneWithLevel
     374            1 :         var err error
     375            1 : 
     376            1 :         memtables := c.readState.memtables
     377            1 :         for i := len(memtables) - 1; i >= 0; i-- {
     378            1 :                 iter := memtables[i].newRangeDelIter(nil)
     379            1 :                 if iter == nil {
     380            1 :                         continue
     381              :                 }
     382            1 :                 tombstones, err = addTombstonesFromIter(
     383            1 :                         iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
     384            1 :                 )
     385            1 :                 iter.Close()
     386            1 :                 if err != nil {
     387            0 :                         return err
     388            0 :                 }
     389            1 :                 level++
     390              :         }
     391              : 
     392            1 :         current := c.readState.current
     393            1 :         addTombstonesFromLevel := func(files iter.Seq[*manifest.TableMetadata], lsmLevel int) error {
     394            1 :                 for f := range files {
     395            1 :                         iters, err := c.newIters(
     396            1 :                                 context.Background(), f, &IterOptions{layer: manifest.Level(lsmLevel)},
     397            1 :                                 internalIterOpts{}, iterRangeDeletions)
     398            1 :                         if err != nil {
     399            0 :                                 return err
     400            0 :                         }
     401            1 :                         tombstones, err = addTombstonesFromIter(iters.RangeDeletion(), level, lsmLevel, f.TableNum,
     402            1 :                                 tombstones, c.seqNum, c.cmp, c.formatKey)
     403            1 :                         _ = iters.CloseAll()
     404            1 : 
     405            1 :                         if err != nil {
     406            0 :                                 return err
     407            0 :                         }
     408              :                 }
     409            1 :                 return nil
     410              :         }
     411              :         // Now the levels with untruncated tombsones.
     412            1 :         for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
     413            1 :                 if current.L0SublevelFiles[i].Empty() {
     414            0 :                         continue
     415              :                 }
     416            1 :                 err := addTombstonesFromLevel(current.L0SublevelFiles[i].All(), 0)
     417            1 :                 if err != nil {
     418            0 :                         return err
     419            0 :                 }
     420            1 :                 level++
     421              :         }
     422            1 :         for i := 1; i < len(current.Levels); i++ {
     423            1 :                 if err := addTombstonesFromLevel(current.Levels[i].All(), i); err != nil {
     424            0 :                         return err
     425            0 :                 }
     426            1 :                 level++
     427              :         }
     428            1 :         if c.stats != nil {
     429            0 :                 c.stats.NumTombstones = len(tombstones)
     430            0 :         }
     431              :         // We now have truncated tombstones.
     432              :         // Fragment them all.
     433            1 :         userKeys := collectAllUserKeys(c.cmp, tombstones)
     434            1 :         tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
     435            1 :         return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
     436              : }
     437              : 
     438            0 : func levelOrMemtable(lsmLevel int, tableNum base.TableNum) string {
     439            0 :         if lsmLevel == -1 {
     440            0 :                 return "memtable"
     441            0 :         }
     442            0 :         return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, tableNum)
     443              : }
     444              : 
     445              : func addTombstonesFromIter(
     446              :         iter keyspan.FragmentIterator,
     447              :         level int,
     448              :         lsmLevel int,
     449              :         tableNum base.TableNum,
     450              :         tombstones []tombstoneWithLevel,
     451              :         seqNum base.SeqNum,
     452              :         cmp Compare,
     453              :         formatKey base.FormatKey,
     454            1 : ) (_ []tombstoneWithLevel, err error) {
     455            1 :         var prevTombstone keyspan.Span
     456            1 :         tomb, err := iter.First()
     457            1 :         for ; tomb != nil; tomb, err = iter.Next() {
     458            1 :                 t := tomb.Visible(seqNum)
     459            1 :                 if t.Empty() {
     460            1 :                         continue
     461              :                 }
     462            1 :                 t = t.Clone()
     463            1 :                 // This is mainly a test for rangeDelV2 formatted blocks which are expected to
     464            1 :                 // be ordered and fragmented on disk. But we anyways check for memtables,
     465            1 :                 // rangeDelV1 as well.
     466            1 :                 if cmp(prevTombstone.End, t.Start) > 0 {
     467            0 :                         return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
     468            0 :                                 prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, tableNum))
     469            0 :                 }
     470            1 :                 prevTombstone = t
     471            1 : 
     472            1 :                 if !t.Empty() {
     473            1 :                         tombstones = append(tombstones, tombstoneWithLevel{
     474            1 :                                 Span:     t,
     475            1 :                                 level:    level,
     476            1 :                                 lsmLevel: lsmLevel,
     477            1 :                                 tableNum: tableNum,
     478            1 :                         })
     479            1 :                 }
     480              :         }
     481            1 :         if err != nil {
     482            0 :                 return nil, err
     483            0 :         }
     484            1 :         return tombstones, nil
     485              : }
     486              : 
     487            1 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
     488            1 :         keys := make([][]byte, 0, len(tombstones)*2)
     489            1 :         for _, t := range tombstones {
     490            1 :                 keys = append(keys, t.Start, t.End)
     491            1 :         }
     492            1 :         slices.SortFunc(keys, cmp)
     493            1 :         return slices.CompactFunc(keys, func(a, b []byte) bool {
     494            1 :                 return cmp(a, b) == 0
     495            1 :         })
     496              : }
     497              : 
     498              : func fragmentUsingUserKeys(
     499              :         cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
     500            1 : ) []tombstoneWithLevel {
     501            1 :         var buf []tombstoneWithLevel
     502            1 :         for _, t := range tombstones {
     503            1 :                 // Find the first position with tombstone start < user key
     504            1 :                 i := sort.Search(len(userKeys), func(i int) bool {
     505            1 :                         return cmp(t.Start, userKeys[i]) < 0
     506            1 :                 })
     507            1 :                 for ; i < len(userKeys); i++ {
     508            1 :                         if cmp(userKeys[i], t.End) >= 0 {
     509            1 :                                 break
     510              :                         }
     511            1 :                         tPartial := t
     512            1 :                         tPartial.End = userKeys[i]
     513            1 :                         buf = append(buf, tPartial)
     514            1 :                         t.Start = userKeys[i]
     515              :                 }
     516            1 :                 buf = append(buf, t)
     517              :         }
     518            1 :         return buf
     519              : }
     520              : 
     521              : // CheckLevelsStats provides basic stats on points and tombstones encountered.
     522              : type CheckLevelsStats struct {
     523              :         NumPoints     int64
     524              :         NumTombstones int
     525              : }
     526              : 
     527              : // CheckLevels checks:
     528              : //   - Every entry in the DB is consistent with the level invariant. See the
     529              : //     comment at the top of the file.
     530              : //   - Point keys in sstables are ordered.
     531              : //   - Range delete tombstones in sstables are ordered and fragmented.
     532              : //   - Successful processing of all MERGE records.
     533              : //   - Each sstable's blob reference liveness block is valid.
     534            1 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
     535            1 :         // Grab and reference the current readState.
     536            1 :         readState := d.loadReadState()
     537            1 :         defer readState.unref()
     538            1 : 
     539            1 :         // Determine the seqnum to read at after grabbing the read state (current and
     540            1 :         // memtables) above.
     541            1 :         seqNum := d.mu.versions.visibleSeqNum.Load()
     542            1 : 
     543            1 :         checkConfig := &checkConfig{
     544            1 :                 logger:    d.opts.Logger,
     545            1 :                 comparer:  d.opts.Comparer,
     546            1 :                 readState: readState,
     547            1 :                 newIters:  d.newIters,
     548            1 :                 seqNum:    seqNum,
     549            1 :                 stats:     stats,
     550            1 :                 merge:     d.merge,
     551            1 :                 formatKey: d.opts.Comparer.FormatKey,
     552            1 :                 readEnv:   block.ReadEnv{
     553            1 :                         // TODO(jackson): Add categorized stats.
     554            1 :                 },
     555            1 :                 fileCache: d.fileCache,
     556            1 :         }
     557            1 :         checkConfig.blobValueFetcher.Init(&readState.current.BlobFiles, checkConfig.fileCache, checkConfig.readEnv)
     558            1 :         defer func() { _ = checkConfig.blobValueFetcher.Close() }()
     559            1 :         return checkLevelsInternal(checkConfig)
     560              : }
     561              : 
     562            1 : func checkLevelsInternal(c *checkConfig) (err error) {
     563            1 :         internalOpts := internalIterOpts{
     564            1 :                 readEnv:          sstable.ReadEnv{Block: c.readEnv},
     565            1 :                 blobValueFetcher: &c.blobValueFetcher,
     566            1 :         }
     567            1 : 
     568            1 :         // Phase 1: Use a simpleMergingIter to step through all the points and ensure
     569            1 :         // that points with the same user key at different levels are not inverted
     570            1 :         // wrt sequence numbers and the same holds for tombstones that cover points.
     571            1 :         // To do this, one needs to construct a simpleMergingIter which is similar to
     572            1 :         // how one constructs a mergingIter.
     573            1 : 
     574            1 :         // Add mem tables from newest to oldest.
     575            1 :         var mlevels []simpleMergingIterLevel
     576            1 :         defer func() {
     577            1 :                 for i := range mlevels {
     578            1 :                         l := &mlevels[i]
     579            1 :                         if l.iter != nil {
     580            1 :                                 err = firstError(err, l.iter.Close())
     581            1 :                                 l.iter = nil
     582            1 :                         }
     583            1 :                         if l.rangeDelIter != nil {
     584            1 :                                 l.rangeDelIter.Close()
     585            1 :                                 l.rangeDelIter = nil
     586            1 :                         }
     587              :                 }
     588              :         }()
     589              : 
     590            1 :         memtables := c.readState.memtables
     591            1 :         for i := len(memtables) - 1; i >= 0; i-- {
     592            1 :                 mem := memtables[i]
     593            1 :                 mlevels = append(mlevels, simpleMergingIterLevel{
     594            1 :                         iter:         mem.newIter(nil),
     595            1 :                         rangeDelIter: mem.newRangeDelIter(nil),
     596            1 :                 })
     597            1 :         }
     598              : 
     599            1 :         current := c.readState.current
     600            1 :         // Determine the final size for mlevels so that there are no more
     601            1 :         // reallocations. levelIter will hold a pointer to elements in mlevels.
     602            1 :         start := len(mlevels)
     603            1 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     604            1 :                 if current.L0SublevelFiles[sublevel].Empty() {
     605            0 :                         continue
     606              :                 }
     607            1 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     608              :         }
     609            1 :         for level := 1; level < len(current.Levels); level++ {
     610            1 :                 if current.Levels[level].Empty() {
     611            1 :                         continue
     612              :                 }
     613            1 :                 mlevels = append(mlevels, simpleMergingIterLevel{})
     614              :         }
     615            1 :         mlevelAlloc := mlevels[start:]
     616            1 :         var allTables []*manifest.TableMetadata
     617            1 : 
     618            1 :         // Add L0 files by sublevel.
     619            1 :         for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
     620            1 :                 if current.L0SublevelFiles[sublevel].Empty() {
     621            0 :                         continue
     622              :                 }
     623            1 :                 manifestIter := current.L0SublevelFiles[sublevel].Iter()
     624            1 :                 iterOpts := IterOptions{logger: c.logger}
     625            1 :                 li := &levelIter{}
     626            1 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
     627            1 :                         manifest.L0Sublevel(sublevel), internalOpts)
     628            1 :                 li.initRangeDel(&mlevelAlloc[0])
     629            1 :                 mlevelAlloc[0].iter = li
     630            1 :                 mlevelAlloc = mlevelAlloc[1:]
     631            1 :                 for f := range current.L0SublevelFiles[sublevel].All() {
     632            1 :                         allTables = append(allTables, f)
     633            1 :                 }
     634              :         }
     635            1 :         for level := 1; level < len(current.Levels); level++ {
     636            1 :                 if current.Levels[level].Empty() {
     637            1 :                         continue
     638              :                 }
     639            1 :                 iterOpts := IterOptions{logger: c.logger}
     640            1 :                 li := &levelIter{}
     641            1 :                 li.init(context.Background(), iterOpts, c.comparer, c.newIters,
     642            1 :                         current.Levels[level].Iter(), manifest.Level(level), internalOpts)
     643            1 :                 li.initRangeDel(&mlevelAlloc[0])
     644            1 :                 mlevelAlloc[0].iter = li
     645            1 :                 mlevelAlloc = mlevelAlloc[1:]
     646            1 :                 for f := range current.Levels[level].All() {
     647            1 :                         allTables = append(allTables, f)
     648            1 :                 }
     649              :         }
     650              : 
     651            1 :         mergingIter := &simpleMergingIter{}
     652            1 :         mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
     653            1 :         for cont := mergingIter.step(); cont; cont = mergingIter.step() {
     654            1 :         }
     655            1 :         if err := mergingIter.err; err != nil {
     656            0 :                 return err
     657            0 :         }
     658            1 :         if c.stats != nil {
     659            0 :                 c.stats.NumPoints = mergingIter.numPoints
     660            0 :         }
     661              : 
     662              :         // Phase 2: Check that the tombstones are mutually consistent.
     663            1 :         if err := checkRangeTombstones(c); err != nil {
     664            0 :                 return err
     665            0 :         }
     666              : 
     667              :         // Phase 3: Validate blob value liveness block for all tables in the LSM.
     668              :         // TODO(annie): This is a very expensive operation. We should try to reduce
     669              :         // the amount of work performed. One possibility is to have the caller
     670              :         // pass in a prng seed and use that to choose which tables to validate.
     671            1 :         if err := validateBlobValueLiveness(allTables, c.fileCache, c.readEnv, &c.blobValueFetcher); err != nil {
     672            0 :                 return err
     673            0 :         }
     674              : 
     675            1 :         return nil
     676              : }
     677              : 
     678              : type valuesInfo struct {
     679              :         valueIDs  []int
     680              :         totalSize int
     681              : }
     682              : 
     683              : // gatherBlobHandles gathers all the blob handles in an sstable, returning a
     684              : // slice of maps; indexing into the slice at `i` is equivalent to retrieving
     685              : // each blob.BlockID's referenced blob.BlockValueID for the `i`th blob reference.
     686              : func gatherBlobHandles(
     687              :         ctx context.Context,
     688              :         r *sstable.Reader,
     689              :         blobRefs manifest.BlobReferences,
     690              :         valueFetcher base.ValueFetcher,
     691            1 : ) ([]map[blob.BlockID]valuesInfo, error) {
     692            1 :         iter, err := r.NewPointIter(ctx, sstable.IterOptions{
     693            1 :                 BlobContext: sstable.TableBlobContext{
     694            1 :                         ValueFetcher: valueFetcher,
     695            1 :                         References:   &blobRefs,
     696            1 :                 },
     697            1 :         })
     698            1 :         if err != nil {
     699            0 :                 return nil, err
     700            0 :         }
     701            1 :         defer func() { _ = iter.Close() }()
     702              : 
     703            1 :         referenced := make([]map[blob.BlockID]valuesInfo, len(blobRefs))
     704            1 :         for i := range referenced {
     705            1 :                 referenced[i] = make(map[blob.BlockID]valuesInfo)
     706            1 :         }
     707            1 :         for kv := iter.First(); kv != nil; kv = iter.Next() {
     708            1 :                 if kv.V.IsBlobValueHandle() {
     709            1 :                         lv := kv.V.LazyValue()
     710            1 :                         handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
     711            1 :                         refID, ok := blobRefs.IDByBlobFileID(lv.Fetcher.BlobFileID)
     712            1 :                         if !ok {
     713            0 :                                 return nil, errors.Errorf("blob file ID %d not found in blob references", lv.Fetcher.BlobFileID)
     714            0 :                         }
     715            1 :                         blockID := handleSuffix.BlockID
     716            1 :                         valueID := int(handleSuffix.ValueID)
     717            1 :                         vi := referenced[refID][blockID]
     718            1 :                         vi.valueIDs = append(vi.valueIDs, valueID)
     719            1 :                         vi.totalSize += lv.Len()
     720            1 :                         referenced[refID][blockID] = vi
     721              :                 }
     722              :         }
     723            1 :         return referenced, nil
     724              : }
     725              : 
     726              : func performValidationForSSTable(
     727              :         decoder colblk.ReferenceLivenessBlockDecoder,
     728              :         tableNum base.TableNum,
     729              :         referenced []map[blob.BlockID]valuesInfo,
     730            1 : ) error {
     731            1 :         if len(referenced) != decoder.BlockDecoder().Rows() {
     732            0 :                 return errors.Errorf("mismatch in number of references in blob value "+
     733            0 :                         "liveness block: expected=%d found=%d", len(referenced),
     734            0 :                         decoder.BlockDecoder().Rows())
     735            0 :         }
     736            1 :         for refID, blockValues := range referenced {
     737            1 :                 bitmapEncodings := slices.Clone(decoder.LivenessAtReference(refID))
     738            1 :                 for _, blockEnc := range sstable.DecodeBlobRefLivenessEncoding(bitmapEncodings) {
     739            1 :                         blockID := blockEnc.BlockID
     740            1 :                         vi, ok := blockValues[blockID]
     741            1 :                         if !ok {
     742            0 :                                 return errors.Errorf("dangling refID=%d blockID=%d in blob value "+
     743            0 :                                         "liveness encoding for sstable %d", refID, blockID, tableNum)
     744            0 :                         }
     745            1 :                         encodedVals := slices.Collect(sstable.IterSetBitsInRunLengthBitmap(blockEnc.Bitmap))
     746            1 :                         if !slices.Equal(vi.valueIDs, encodedVals) {
     747            0 :                                 return errors.Errorf("bitmap mismatch for refID=%d blockID=%d: "+
     748            0 :                                         "expected=%v encoded=%v for sstable %d", refID, blockID, vi.valueIDs,
     749            0 :                                         encodedVals, tableNum)
     750            0 :                         }
     751            1 :                         if vi.totalSize != blockEnc.ValuesSize {
     752            0 :                                 return errors.Errorf("value size mismatch for refID=%d blockID=%d: "+
     753            0 :                                         "expected=%d encoded=%d for sstable %d", refID, blockID, vi.totalSize,
     754            0 :                                         blockEnc.ValuesSize, tableNum)
     755            0 :                         }
     756              :                         // Remove the processed blockID from the map so that later,
     757              :                         // we can check if we processed everything. This is to
     758              :                         // ensure that we do not have any missing references in the
     759              :                         // blob reference liveness block for any of the references
     760              :                         // in the sstable.
     761            1 :                         delete(blockValues, blockID)
     762              :                 }
     763            1 :                 if len(blockValues) > 0 {
     764            0 :                         return errors.Errorf("refID=%d blockIDs=%v referenced by sstable %d "+
     765            0 :                                 "is/are not present in blob reference liveness block", refID,
     766            0 :                                 slices.Collect(maps.Keys(blockValues)), tableNum)
     767            0 :                 }
     768              :         }
     769            1 :         return nil
     770              : }
     771              : 
     772              : // validateBlobValueLiveness iterates through each table,
     773              : // gathering all the blob handles, and then compares the values encoded in the
     774              : // blob reference liveness block to the values referenced by the blob handles.
     775              : func validateBlobValueLiveness(
     776              :         tables []*manifest.TableMetadata,
     777              :         fc *fileCacheHandle,
     778              :         readEnv block.ReadEnv,
     779              :         valueFetcher base.ValueFetcher,
     780            1 : ) error {
     781            1 :         ctx := context.TODO()
     782            1 :         var decoder colblk.ReferenceLivenessBlockDecoder
     783            1 :         for _, t := range tables {
     784            1 :                 if len(t.BlobReferences) == 0 {
     785            1 :                         continue
     786              :                 }
     787            1 :                 if err := fc.withReader(ctx, readEnv, t, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
     788            1 :                         // For this sstable, gather all the blob handles -- tracking
     789            1 :                         // each blob.ReferenceID + blob.BlockID's referenced
     790            1 :                         // blob.BlockValueIDs.
     791            1 :                         referenced, err := gatherBlobHandles(ctx, r, t.BlobReferences, valueFetcher)
     792            1 :                         if err != nil {
     793            0 :                                 return err
     794            0 :                         }
     795            1 :                         h, err := r.ReadBlobRefIndexBlock(ctx, readEnv.Block)
     796            1 :                         if err != nil {
     797            0 :                                 return err
     798            0 :                         }
     799            1 :                         defer h.Release()
     800            1 :                         decoder.Init(h.BlockData())
     801            1 :                         return performValidationForSSTable(decoder, t.TableNum, referenced)
     802            0 :                 }); err != nil {
     803            0 :                         return err
     804            0 :                 }
     805              :         }
     806            1 :         return nil
     807              : }
     808              : 
     809              : type simpleMergingIterItem struct {
     810              :         index int
     811              :         kv    base.InternalKV
     812              : }
     813              : 
     814              : type simpleMergingIterHeap struct {
     815              :         cmp     Compare
     816              :         reverse bool
     817              :         items   []simpleMergingIterItem
     818              : }
     819              : 
     820            1 : func (h *simpleMergingIterHeap) len() int {
     821            1 :         return len(h.items)
     822            1 : }
     823              : 
     824            1 : func (h *simpleMergingIterHeap) less(i, j int) bool {
     825            1 :         ikey, jkey := h.items[i].kv.K, h.items[j].kv.K
     826            1 :         if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
     827            1 :                 if h.reverse {
     828            0 :                         return c > 0
     829            0 :                 }
     830            1 :                 return c < 0
     831              :         }
     832            1 :         if h.reverse {
     833            0 :                 return ikey.Trailer < jkey.Trailer
     834            0 :         }
     835            1 :         return ikey.Trailer > jkey.Trailer
     836              : }
     837              : 
     838            1 : func (h *simpleMergingIterHeap) swap(i, j int) {
     839            1 :         h.items[i], h.items[j] = h.items[j], h.items[i]
     840            1 : }
     841              : 
     842              : // init, fix, up and down are copied from the go stdlib.
     843            1 : func (h *simpleMergingIterHeap) init() {
     844            1 :         // heapify
     845            1 :         n := h.len()
     846            1 :         for i := n/2 - 1; i >= 0; i-- {
     847            1 :                 h.down(i, n)
     848            1 :         }
     849              : }
     850              : 
     851            1 : func (h *simpleMergingIterHeap) fix(i int) {
     852            1 :         if !h.down(i, h.len()) {
     853            1 :                 h.up(i)
     854            1 :         }
     855              : }
     856              : 
     857            1 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
     858            1 :         n := h.len() - 1
     859            1 :         h.swap(0, n)
     860            1 :         h.down(0, n)
     861            1 :         item := &h.items[n]
     862            1 :         h.items = h.items[:n]
     863            1 :         return item
     864            1 : }
     865              : 
     866            1 : func (h *simpleMergingIterHeap) up(j int) {
     867            1 :         for {
     868            1 :                 i := (j - 1) / 2 // parent
     869            1 :                 if i == j || !h.less(j, i) {
     870            1 :                         break
     871              :                 }
     872            0 :                 h.swap(i, j)
     873            0 :                 j = i
     874              :         }
     875              : }
     876              : 
     877            1 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
     878            1 :         i := i0
     879            1 :         for {
     880            1 :                 j1 := 2*i + 1
     881            1 :                 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
     882            1 :                         break
     883              :                 }
     884            1 :                 j := j1 // left child
     885            1 :                 if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
     886            1 :                         j = j2 // = 2*i + 2  // right child
     887            1 :                 }
     888            1 :                 if !h.less(j, i) {
     889            1 :                         break
     890              :                 }
     891            1 :                 h.swap(i, j)
     892            1 :                 i = j
     893              :         }
     894            1 :         return i > i0
     895              : }
        

Generated by: LCOV version 2.0-1