Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "sort"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : )
18 :
19 : // This file implements DB.CheckLevels() which checks that every entry in the
20 : // DB is consistent with respect to the level invariant: any point (or the
21 : // infinite number of points in a range tombstone) has a seqnum such that a
22 : // point with the same UserKey at a lower level has a lower seqnum. This is an
23 : // expensive check since it involves iterating over all the entries in the DB,
24 : // hence only intended for tests or tools.
25 : //
26 : // If we ignore range tombstones, the consistency checking of points can be
27 : // done with a simplified version of mergingIter. simpleMergingIter is that
28 : // simplified version of mergingIter that only needs to step through points
29 : // (analogous to only doing Next()). It can also easily accommodate
30 : // consistency checking of points relative to range tombstones.
31 : // simpleMergingIter does not do any seek optimizations present in mergingIter
32 : // (it minimally needs to seek the range delete iterators to position them at
33 : // or past the current point) since it does not want to miss points for
34 : // purposes of consistency checking.
35 : //
36 : // Mutual consistency of range tombstones is non-trivial to check. One needs
37 : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
38 : // a lower level. The start key of the former is not contained in the latter
39 : // and we can't use the exclusive end key, c, for a containment check since it
40 : // is the sentinel key. We observe that if these tombstones were fragmented
41 : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
42 : // [b, c)#10 at the lower level and then it is is trivial to compare the two
43 : // [b, c) tombstones. Note that this fragmentation needs to take into account
44 : // that tombstones in a file may be untruncated and need to act within the
45 : // bounds of the file. This checking is performed by checkRangeTombstones()
46 : // and its helper functions.
47 :
48 : // The per-level structure used by simpleMergingIter.
49 : type simpleMergingIterLevel struct {
50 : iter internalIterator
51 : rangeDelIter keyspan.FragmentIterator
52 :
53 : iterKV *base.InternalKV
54 : tombstone *keyspan.Span
55 : }
56 :
57 1 : func (ml *simpleMergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
58 1 : ml.tombstone = nil
59 1 : if ml.rangeDelIter != nil {
60 1 : ml.rangeDelIter.Close()
61 1 : }
62 1 : ml.rangeDelIter = iter
63 : }
64 :
65 : type simpleMergingIter struct {
66 : levels []simpleMergingIterLevel
67 : snapshot base.SeqNum
68 : heap simpleMergingIterHeap
69 : // The last point's key and level. For validation.
70 : lastKey InternalKey
71 : lastLevel int
72 : lastIterMsg string
73 : // A non-nil valueMerger means MERGE record processing is ongoing.
74 : valueMerger base.ValueMerger
75 : // The first error will cause step() to return false.
76 : err error
77 : numPoints int64
78 : merge Merge
79 : formatKey base.FormatKey
80 : }
81 :
82 : func (m *simpleMergingIter) init(
83 : merge Merge,
84 : cmp Compare,
85 : snapshot base.SeqNum,
86 : formatKey base.FormatKey,
87 : levels ...simpleMergingIterLevel,
88 1 : ) {
89 1 : m.levels = levels
90 1 : m.formatKey = formatKey
91 1 : m.merge = merge
92 1 : m.snapshot = snapshot
93 1 : m.lastLevel = -1
94 1 : m.heap.cmp = cmp
95 1 : m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
96 1 : for i := range m.levels {
97 1 : l := &m.levels[i]
98 1 : l.iterKV = l.iter.First()
99 1 : if l.iterKV != nil {
100 1 : item := simpleMergingIterItem{
101 1 : index: i,
102 1 : kv: *l.iterKV,
103 1 : }
104 1 : item.kv.K = l.iterKV.K.Clone()
105 1 : m.heap.items = append(m.heap.items, item)
106 1 : }
107 : }
108 1 : m.heap.init()
109 1 :
110 1 : if m.heap.len() == 0 {
111 1 : return
112 1 : }
113 1 : m.positionRangeDels()
114 : }
115 :
116 : // Positions all the rangedel iterators at or past the current top of the
117 : // heap, using SeekGE().
118 1 : func (m *simpleMergingIter) positionRangeDels() {
119 1 : item := &m.heap.items[0]
120 1 : for i := range m.levels {
121 1 : l := &m.levels[i]
122 1 : if l.rangeDelIter == nil {
123 1 : continue
124 : }
125 1 : t, err := l.rangeDelIter.SeekGE(item.kv.K.UserKey)
126 1 : m.err = firstError(m.err, err)
127 1 : l.tombstone = t
128 : }
129 : }
130 :
131 : // Returns true if not yet done.
132 1 : func (m *simpleMergingIter) step() bool {
133 1 : if m.heap.len() == 0 || m.err != nil {
134 1 : return false
135 1 : }
136 1 : item := &m.heap.items[0]
137 1 : l := &m.levels[item.index]
138 1 : // Sentinels are not relevant for this point checking.
139 1 : if !item.kv.K.IsExclusiveSentinel() && item.kv.K.Visible(m.snapshot, base.SeqNumMax) {
140 1 : // This is a visible point key.
141 1 : if !m.handleVisiblePoint(item, l) {
142 0 : return false
143 0 : }
144 : }
145 :
146 : // The iterator for the current level may be closed in the following call to
147 : // Next(). We save its debug string for potential use after it is closed -
148 : // either in this current step() invocation or on the next invocation.
149 1 : m.lastIterMsg = l.iter.String()
150 1 :
151 1 : // Step to the next point.
152 1 : l.iterKV = l.iter.Next()
153 1 : if l.iterKV == nil {
154 1 : m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
155 1 : l.iter = nil
156 1 : m.heap.pop()
157 1 : } else {
158 1 : // Check point keys in an sstable are ordered. Although not required, we check
159 1 : // for memtables as well. A subtle check here is that successive sstables of
160 1 : // L1 and higher levels are ordered. This happens when levelIter moves to the
161 1 : // next sstable in the level, in which case item.key is previous sstable's
162 1 : // last point key.
163 1 : if !l.iterKV.K.IsExclusiveSentinel() && base.InternalCompare(m.heap.cmp, item.kv.K, l.iterKV.K) >= 0 {
164 0 : m.err = errors.Errorf("out of order keys %s >= %s in %s",
165 0 : item.kv.K.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
166 0 : return false
167 0 : }
168 1 : userKeyBuf := item.kv.K.UserKey[:0]
169 1 : item.kv = *l.iterKV
170 1 : item.kv.K.UserKey = append(userKeyBuf, l.iterKV.K.UserKey...)
171 1 : if m.heap.len() > 1 {
172 1 : m.heap.fix(0)
173 1 : }
174 : }
175 1 : if m.err != nil {
176 0 : return false
177 0 : }
178 1 : if m.heap.len() == 0 {
179 1 : // If m.valueMerger != nil, the last record was a MERGE record.
180 1 : if m.valueMerger != nil {
181 1 : var closer io.Closer
182 1 : var err error
183 1 : _, closer, err = m.valueMerger.Finish(true /* includesBase */)
184 1 : if closer != nil {
185 0 : err = errors.CombineErrors(err, closer.Close())
186 0 : }
187 1 : if err != nil {
188 0 : m.err = errors.CombineErrors(m.err,
189 0 : errors.Wrapf(err, "merge processing error on key %s in %s",
190 0 : item.kv.K.Pretty(m.formatKey), m.lastIterMsg))
191 0 : }
192 1 : m.valueMerger = nil
193 : }
194 1 : return false
195 : }
196 1 : m.positionRangeDels()
197 1 : return true
198 : }
199 :
200 : // handleVisiblePoint returns true if validation succeeded and level checking
201 : // can continue.
202 : func (m *simpleMergingIter) handleVisiblePoint(
203 : item *simpleMergingIterItem, l *simpleMergingIterLevel,
204 1 : ) (ok bool) {
205 1 : m.numPoints++
206 1 : keyChanged := m.heap.cmp(item.kv.K.UserKey, m.lastKey.UserKey) != 0
207 1 : if !keyChanged {
208 1 : // At the same user key. We will see them in decreasing seqnum
209 1 : // order so the lastLevel must not be lower.
210 1 : if m.lastLevel > item.index {
211 0 : m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
212 0 : item.kv.K.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
213 0 : m.lastIterMsg)
214 0 : return false
215 0 : }
216 1 : m.lastLevel = item.index
217 1 : } else {
218 1 : // The user key has changed.
219 1 : m.lastKey.Trailer = item.kv.K.Trailer
220 1 : m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.kv.K.UserKey...)
221 1 : m.lastLevel = item.index
222 1 : }
223 : // Ongoing series of MERGE records ends with a MERGE record.
224 1 : if keyChanged && m.valueMerger != nil {
225 1 : var closer io.Closer
226 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
227 1 : if m.err == nil && closer != nil {
228 0 : m.err = closer.Close()
229 0 : }
230 1 : m.valueMerger = nil
231 : }
232 1 : itemValue, _, err := item.kv.Value(nil)
233 1 : if err != nil {
234 0 : m.err = err
235 0 : return false
236 0 : }
237 1 : if m.valueMerger != nil {
238 1 : // Ongoing series of MERGE records.
239 1 : switch item.kv.K.Kind() {
240 1 : case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
241 1 : var closer io.Closer
242 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
243 1 : if m.err == nil && closer != nil {
244 0 : m.err = closer.Close()
245 0 : }
246 1 : m.valueMerger = nil
247 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
248 1 : m.err = m.valueMerger.MergeOlder(itemValue)
249 1 : if m.err == nil {
250 1 : var closer io.Closer
251 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
252 1 : if m.err == nil && closer != nil {
253 0 : m.err = closer.Close()
254 0 : }
255 : }
256 1 : m.valueMerger = nil
257 1 : case InternalKeyKindMerge:
258 1 : m.err = m.valueMerger.MergeOlder(itemValue)
259 0 : default:
260 0 : m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
261 0 : item.kv.K.Pretty(m.formatKey),
262 0 : l.iter)
263 0 : return false
264 : }
265 1 : } else if item.kv.K.Kind() == InternalKeyKindMerge && m.err == nil {
266 1 : // New series of MERGE records.
267 1 : m.valueMerger, m.err = m.merge(item.kv.K.UserKey, itemValue)
268 1 : }
269 1 : if m.err != nil {
270 0 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
271 0 : item.kv.K.Pretty(m.formatKey), l.iter)
272 0 : return false
273 0 : }
274 : // Is this point covered by a tombstone at a lower level? Note that all these
275 : // iterators must be positioned at a key > item.key.
276 1 : for level := item.index + 1; level < len(m.levels); level++ {
277 1 : lvl := &m.levels[level]
278 1 : if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
279 1 : continue
280 : }
281 1 : if lvl.tombstone.Contains(m.heap.cmp, item.kv.K.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.kv.K.SeqNum()) {
282 0 : m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
283 0 : lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.kv.K.Pretty(m.formatKey),
284 0 : l.iter)
285 0 : return false
286 0 : }
287 : }
288 1 : return true
289 : }
290 :
291 : // Checking that range tombstones are mutually consistent is performed by
292 : // checkRangeTombstones(). See the overview comment at the top of the file.
293 : //
294 : // We do this check as follows:
295 : // - Collect the tombstones for each level, put them into one pool of tombstones
296 : // along with their level information (addTombstonesFromIter()).
297 : // - Collect the start and end user keys from all these tombstones
298 : // (collectAllUserKey()) and use them to fragment all the tombstones
299 : // (fragmentUsingUserKey()).
300 : // - Sort tombstones by start key and decreasing seqnum
301 : // (tombstonesByStartKeyAndSeqnum) - all tombstones that have the same start
302 : // key will have the same end key because they have been fragmented.
303 : // - Iterate and check (iterateAndCheckTombstones()).
304 : //
305 : // Note that this simple approach requires holding all the tombstones across all
306 : // levels in-memory. A more sophisticated incremental approach could be devised,
307 : // if necessary.
308 :
309 : // A tombstone and the corresponding level it was found in.
310 : type tombstoneWithLevel struct {
311 : keyspan.Span
312 : level int
313 : // The level in LSM. A -1 means it's a memtable.
314 : lsmLevel int
315 : fileNum base.FileNum
316 : }
317 :
318 : // For sorting tombstoneWithLevels in increasing order of start UserKey and
319 : // for the same start UserKey in decreasing order of seqnum.
320 : type tombstonesByStartKeyAndSeqnum struct {
321 : cmp Compare
322 : buf []tombstoneWithLevel
323 : }
324 :
325 1 : func (v *tombstonesByStartKeyAndSeqnum) Len() int { return len(v.buf) }
326 1 : func (v *tombstonesByStartKeyAndSeqnum) Less(i, j int) bool {
327 1 : less := v.cmp(v.buf[i].Start, v.buf[j].Start)
328 1 : if less == 0 {
329 1 : return v.buf[i].LargestSeqNum() > v.buf[j].LargestSeqNum()
330 1 : }
331 1 : return less < 0
332 : }
333 1 : func (v *tombstonesByStartKeyAndSeqnum) Swap(i, j int) {
334 1 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
335 1 : }
336 :
337 : func iterateAndCheckTombstones(
338 : cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
339 1 : ) error {
340 1 : sortBuf := tombstonesByStartKeyAndSeqnum{
341 1 : cmp: cmp,
342 1 : buf: tombstones,
343 1 : }
344 1 : sort.Sort(&sortBuf)
345 1 :
346 1 : // For a sequence of tombstones that share the same start UserKey, we will
347 1 : // encounter them in non-increasing seqnum order and so should encounter them
348 1 : // in non-decreasing level order.
349 1 : lastTombstone := tombstoneWithLevel{}
350 1 : for _, t := range tombstones {
351 1 : if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
352 0 : return errors.Errorf("encountered tombstone %s in %s"+
353 0 : " that has a lower seqnum than the same tombstone in %s",
354 0 : t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.fileNum),
355 0 : levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.fileNum))
356 0 : }
357 1 : lastTombstone = t
358 : }
359 1 : return nil
360 : }
361 :
362 : type checkConfig struct {
363 : logger Logger
364 : comparer *Comparer
365 : readState *readState
366 : newIters tableNewIters
367 : seqNum base.SeqNum
368 : stats *CheckLevelsStats
369 : merge Merge
370 : formatKey base.FormatKey
371 : }
372 :
373 : // cmp is shorthand for comparer.Compare.
374 1 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
375 :
376 1 : func checkRangeTombstones(c *checkConfig) error {
377 1 : var level int
378 1 : var tombstones []tombstoneWithLevel
379 1 : var err error
380 1 :
381 1 : memtables := c.readState.memtables
382 1 : for i := len(memtables) - 1; i >= 0; i-- {
383 1 : iter := memtables[i].newRangeDelIter(nil)
384 1 : if iter == nil {
385 1 : continue
386 : }
387 1 : tombstones, err = addTombstonesFromIter(
388 1 : iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
389 1 : )
390 1 : iter.Close()
391 1 : if err != nil {
392 0 : return err
393 0 : }
394 1 : level++
395 : }
396 :
397 1 : current := c.readState.current
398 1 : addTombstonesFromLevel := func(files manifest.LevelIterator, lsmLevel int) error {
399 1 : for f := files.First(); f != nil; f = files.Next() {
400 1 : lf := files.Take()
401 1 : iters, err := c.newIters(
402 1 : context.Background(), lf.TableMetadata, &IterOptions{layer: manifest.Level(lsmLevel)},
403 1 : internalIterOpts{}, iterRangeDeletions)
404 1 : if err != nil {
405 0 : return err
406 0 : }
407 1 : tombstones, err = addTombstonesFromIter(iters.RangeDeletion(), level, lsmLevel, f.FileNum,
408 1 : tombstones, c.seqNum, c.cmp, c.formatKey)
409 1 : iters.CloseAll()
410 1 :
411 1 : if err != nil {
412 0 : return err
413 0 : }
414 : }
415 1 : return nil
416 : }
417 : // Now the levels with untruncated tombsones.
418 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
419 1 : if current.L0SublevelFiles[i].Empty() {
420 0 : continue
421 : }
422 1 : err := addTombstonesFromLevel(current.L0SublevelFiles[i].Iter(), 0)
423 1 : if err != nil {
424 0 : return err
425 0 : }
426 1 : level++
427 : }
428 1 : for i := 1; i < len(current.Levels); i++ {
429 1 : if err := addTombstonesFromLevel(current.Levels[i].Iter(), i); err != nil {
430 0 : return err
431 0 : }
432 1 : level++
433 : }
434 1 : if c.stats != nil {
435 0 : c.stats.NumTombstones = len(tombstones)
436 0 : }
437 : // We now have truncated tombstones.
438 : // Fragment them all.
439 1 : userKeys := collectAllUserKeys(c.cmp, tombstones)
440 1 : tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
441 1 : return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
442 : }
443 :
444 0 : func levelOrMemtable(lsmLevel int, fileNum base.FileNum) string {
445 0 : if lsmLevel == -1 {
446 0 : return "memtable"
447 0 : }
448 0 : return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, fileNum)
449 : }
450 :
451 : func addTombstonesFromIter(
452 : iter keyspan.FragmentIterator,
453 : level int,
454 : lsmLevel int,
455 : fileNum base.FileNum,
456 : tombstones []tombstoneWithLevel,
457 : seqNum base.SeqNum,
458 : cmp Compare,
459 : formatKey base.FormatKey,
460 1 : ) (_ []tombstoneWithLevel, err error) {
461 1 : var prevTombstone keyspan.Span
462 1 : tomb, err := iter.First()
463 1 : for ; tomb != nil; tomb, err = iter.Next() {
464 1 : t := tomb.Visible(seqNum)
465 1 : if t.Empty() {
466 1 : continue
467 : }
468 1 : t = t.Clone()
469 1 : // This is mainly a test for rangeDelV2 formatted blocks which are expected to
470 1 : // be ordered and fragmented on disk. But we anyways check for memtables,
471 1 : // rangeDelV1 as well.
472 1 : if cmp(prevTombstone.End, t.Start) > 0 {
473 0 : return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
474 0 : prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, fileNum))
475 0 : }
476 1 : prevTombstone = t
477 1 :
478 1 : if !t.Empty() {
479 1 : tombstones = append(tombstones, tombstoneWithLevel{
480 1 : Span: t,
481 1 : level: level,
482 1 : lsmLevel: lsmLevel,
483 1 : fileNum: fileNum,
484 1 : })
485 1 : }
486 : }
487 1 : if err != nil {
488 0 : return nil, err
489 0 : }
490 1 : return tombstones, nil
491 : }
492 :
493 : type userKeysSort struct {
494 : cmp Compare
495 : buf [][]byte
496 : }
497 :
498 1 : func (v *userKeysSort) Len() int { return len(v.buf) }
499 1 : func (v *userKeysSort) Less(i, j int) bool {
500 1 : return v.cmp(v.buf[i], v.buf[j]) < 0
501 1 : }
502 1 : func (v *userKeysSort) Swap(i, j int) {
503 1 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
504 1 : }
505 1 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
506 1 : keys := make([][]byte, 0, len(tombstones)*2)
507 1 : for _, t := range tombstones {
508 1 : keys = append(keys, t.Start)
509 1 : keys = append(keys, t.End)
510 1 : }
511 1 : sorter := userKeysSort{
512 1 : cmp: cmp,
513 1 : buf: keys,
514 1 : }
515 1 : sort.Sort(&sorter)
516 1 : var last, curr int
517 1 : for last, curr = -1, 0; curr < len(keys); curr++ {
518 1 : if last < 0 || cmp(keys[last], keys[curr]) != 0 {
519 1 : last++
520 1 : keys[last] = keys[curr]
521 1 : }
522 : }
523 1 : keys = keys[:last+1]
524 1 : return keys
525 : }
526 :
527 : func fragmentUsingUserKeys(
528 : cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
529 1 : ) []tombstoneWithLevel {
530 1 : var buf []tombstoneWithLevel
531 1 : for _, t := range tombstones {
532 1 : // Find the first position with tombstone start < user key
533 1 : i := sort.Search(len(userKeys), func(i int) bool {
534 1 : return cmp(t.Start, userKeys[i]) < 0
535 1 : })
536 1 : for ; i < len(userKeys); i++ {
537 1 : if cmp(userKeys[i], t.End) >= 0 {
538 1 : break
539 : }
540 1 : tPartial := t
541 1 : tPartial.End = userKeys[i]
542 1 : buf = append(buf, tPartial)
543 1 : t.Start = userKeys[i]
544 : }
545 1 : buf = append(buf, t)
546 : }
547 1 : return buf
548 : }
549 :
550 : // CheckLevelsStats provides basic stats on points and tombstones encountered.
551 : type CheckLevelsStats struct {
552 : NumPoints int64
553 : NumTombstones int
554 : }
555 :
556 : // CheckLevels checks:
557 : // - Every entry in the DB is consistent with the level invariant. See the
558 : // comment at the top of the file.
559 : // - Point keys in sstables are ordered.
560 : // - Range delete tombstones in sstables are ordered and fragmented.
561 : // - Successful processing of all MERGE records.
562 1 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
563 1 : // Grab and reference the current readState.
564 1 : readState := d.loadReadState()
565 1 : defer readState.unref()
566 1 :
567 1 : // Determine the seqnum to read at after grabbing the read state (current and
568 1 : // memtables) above.
569 1 : seqNum := d.mu.versions.visibleSeqNum.Load()
570 1 :
571 1 : checkConfig := &checkConfig{
572 1 : logger: d.opts.Logger,
573 1 : comparer: d.opts.Comparer,
574 1 : readState: readState,
575 1 : newIters: d.newIters,
576 1 : seqNum: seqNum,
577 1 : stats: stats,
578 1 : merge: d.merge,
579 1 : formatKey: d.opts.Comparer.FormatKey,
580 1 : }
581 1 : return checkLevelsInternal(checkConfig)
582 1 : }
583 :
584 1 : func checkLevelsInternal(c *checkConfig) (err error) {
585 1 : // Phase 1: Use a simpleMergingIter to step through all the points and ensure
586 1 : // that points with the same user key at different levels are not inverted
587 1 : // wrt sequence numbers and the same holds for tombstones that cover points.
588 1 : // To do this, one needs to construct a simpleMergingIter which is similar to
589 1 : // how one constructs a mergingIter.
590 1 :
591 1 : // Add mem tables from newest to oldest.
592 1 : var mlevels []simpleMergingIterLevel
593 1 : defer func() {
594 1 : for i := range mlevels {
595 1 : l := &mlevels[i]
596 1 : if l.iter != nil {
597 1 : err = firstError(err, l.iter.Close())
598 1 : l.iter = nil
599 1 : }
600 1 : if l.rangeDelIter != nil {
601 1 : l.rangeDelIter.Close()
602 1 : l.rangeDelIter = nil
603 1 : }
604 : }
605 : }()
606 :
607 1 : memtables := c.readState.memtables
608 1 : for i := len(memtables) - 1; i >= 0; i-- {
609 1 : mem := memtables[i]
610 1 : mlevels = append(mlevels, simpleMergingIterLevel{
611 1 : iter: mem.newIter(nil),
612 1 : rangeDelIter: mem.newRangeDelIter(nil),
613 1 : })
614 1 : }
615 :
616 1 : current := c.readState.current
617 1 : // Determine the final size for mlevels so that there are no more
618 1 : // reallocations. levelIter will hold a pointer to elements in mlevels.
619 1 : start := len(mlevels)
620 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
621 1 : if current.L0SublevelFiles[sublevel].Empty() {
622 0 : continue
623 : }
624 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
625 : }
626 1 : for level := 1; level < len(current.Levels); level++ {
627 1 : if current.Levels[level].Empty() {
628 1 : continue
629 : }
630 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
631 : }
632 1 : mlevelAlloc := mlevels[start:]
633 1 : // Add L0 files by sublevel.
634 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
635 1 : if current.L0SublevelFiles[sublevel].Empty() {
636 0 : continue
637 : }
638 1 : manifestIter := current.L0SublevelFiles[sublevel].Iter()
639 1 : iterOpts := IterOptions{logger: c.logger}
640 1 : li := &levelIter{}
641 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
642 1 : manifest.L0Sublevel(sublevel), internalIterOpts{})
643 1 : li.initRangeDel(&mlevelAlloc[0])
644 1 : mlevelAlloc[0].iter = li
645 1 : mlevelAlloc = mlevelAlloc[1:]
646 : }
647 1 : for level := 1; level < len(current.Levels); level++ {
648 1 : if current.Levels[level].Empty() {
649 1 : continue
650 : }
651 :
652 1 : iterOpts := IterOptions{logger: c.logger}
653 1 : li := &levelIter{}
654 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters,
655 1 : current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
656 1 : li.initRangeDel(&mlevelAlloc[0])
657 1 : mlevelAlloc[0].iter = li
658 1 : mlevelAlloc = mlevelAlloc[1:]
659 : }
660 :
661 1 : mergingIter := &simpleMergingIter{}
662 1 : mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
663 1 : for cont := mergingIter.step(); cont; cont = mergingIter.step() {
664 1 : }
665 1 : if err := mergingIter.err; err != nil {
666 0 : return err
667 0 : }
668 1 : if c.stats != nil {
669 0 : c.stats.NumPoints = mergingIter.numPoints
670 0 : }
671 :
672 : // Phase 2: Check that the tombstones are mutually consistent.
673 1 : return checkRangeTombstones(c)
674 : }
675 :
676 : type simpleMergingIterItem struct {
677 : index int
678 : kv base.InternalKV
679 : }
680 :
681 : type simpleMergingIterHeap struct {
682 : cmp Compare
683 : reverse bool
684 : items []simpleMergingIterItem
685 : }
686 :
687 1 : func (h *simpleMergingIterHeap) len() int {
688 1 : return len(h.items)
689 1 : }
690 :
691 1 : func (h *simpleMergingIterHeap) less(i, j int) bool {
692 1 : ikey, jkey := h.items[i].kv.K, h.items[j].kv.K
693 1 : if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
694 1 : if h.reverse {
695 0 : return c > 0
696 0 : }
697 1 : return c < 0
698 : }
699 1 : if h.reverse {
700 0 : return ikey.Trailer < jkey.Trailer
701 0 : }
702 1 : return ikey.Trailer > jkey.Trailer
703 : }
704 :
705 1 : func (h *simpleMergingIterHeap) swap(i, j int) {
706 1 : h.items[i], h.items[j] = h.items[j], h.items[i]
707 1 : }
708 :
709 : // init, fix, up and down are copied from the go stdlib.
710 1 : func (h *simpleMergingIterHeap) init() {
711 1 : // heapify
712 1 : n := h.len()
713 1 : for i := n/2 - 1; i >= 0; i-- {
714 1 : h.down(i, n)
715 1 : }
716 : }
717 :
718 1 : func (h *simpleMergingIterHeap) fix(i int) {
719 1 : if !h.down(i, h.len()) {
720 1 : h.up(i)
721 1 : }
722 : }
723 :
724 1 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
725 1 : n := h.len() - 1
726 1 : h.swap(0, n)
727 1 : h.down(0, n)
728 1 : item := &h.items[n]
729 1 : h.items = h.items[:n]
730 1 : return item
731 1 : }
732 :
733 1 : func (h *simpleMergingIterHeap) up(j int) {
734 1 : for {
735 1 : i := (j - 1) / 2 // parent
736 1 : if i == j || !h.less(j, i) {
737 1 : break
738 : }
739 0 : h.swap(i, j)
740 0 : j = i
741 : }
742 : }
743 :
744 1 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
745 1 : i := i0
746 1 : for {
747 1 : j1 := 2*i + 1
748 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
749 1 : break
750 : }
751 1 : j := j1 // left child
752 1 : if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
753 1 : j = j2 // = 2*i + 2 // right child
754 1 : }
755 1 : if !h.less(j, i) {
756 1 : break
757 : }
758 1 : h.swap(i, j)
759 1 : i = j
760 : }
761 1 : return i > i0
762 : }
|