Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "sort"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : )
18 :
19 : // This file implements DB.CheckLevels() which checks that every entry in the
20 : // DB is consistent with respect to the level invariant: any point (or the
21 : // infinite number of points in a range tombstone) has a seqnum such that a
22 : // point with the same UserKey at a lower level has a lower seqnum. This is an
23 : // expensive check since it involves iterating over all the entries in the DB,
24 : // hence only intended for tests or tools.
25 : //
26 : // If we ignore range tombstones, the consistency checking of points can be
27 : // done with a simplified version of mergingIter. simpleMergingIter is that
28 : // simplified version of mergingIter that only needs to step through points
29 : // (analogous to only doing Next()). It can also easily accommodate
30 : // consistency checking of points relative to range tombstones.
31 : // simpleMergingIter does not do any seek optimizations present in mergingIter
32 : // (it minimally needs to seek the range delete iterators to position them at
33 : // or past the current point) since it does not want to miss points for
34 : // purposes of consistency checking.
35 : //
36 : // Mutual consistency of range tombstones is non-trivial to check. One needs
37 : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
38 : // a lower level. The start key of the former is not contained in the latter
39 : // and we can't use the exclusive end key, c, for a containment check since it
40 : // is the sentinel key. We observe that if these tombstones were fragmented
41 : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
42 : // [b, c)#10 at the lower level and then it is is trivial to compare the two
43 : // [b, c) tombstones. Note that this fragmentation needs to take into account
44 : // that tombstones in a file may be untruncated and need to act within the
45 : // bounds of the file. This checking is performed by checkRangeTombstones()
46 : // and its helper functions.
47 :
48 : // The per-level structure used by simpleMergingIter.
49 : type simpleMergingIterLevel struct {
50 : iter internalIterator
51 : rangeDelIter keyspan.FragmentIterator
52 : levelIterBoundaryContext
53 :
54 : iterKey *InternalKey
55 : iterValue base.LazyValue
56 : tombstone *keyspan.Span
57 : }
58 :
59 : type simpleMergingIter struct {
60 : levels []simpleMergingIterLevel
61 : snapshot uint64
62 : heap simpleMergingIterHeap
63 : // The last point's key and level. For validation.
64 : lastKey InternalKey
65 : lastLevel int
66 : lastIterMsg string
67 : // A non-nil valueMerger means MERGE record processing is ongoing.
68 : valueMerger base.ValueMerger
69 : // The first error will cause step() to return false.
70 : err error
71 : numPoints int64
72 : merge Merge
73 : formatKey base.FormatKey
74 : }
75 :
76 : func (m *simpleMergingIter) init(
77 : merge Merge,
78 : cmp Compare,
79 : snapshot uint64,
80 : formatKey base.FormatKey,
81 : levels ...simpleMergingIterLevel,
82 1 : ) {
83 1 : m.levels = levels
84 1 : m.formatKey = formatKey
85 1 : m.merge = merge
86 1 : m.snapshot = snapshot
87 1 : m.lastLevel = -1
88 1 : m.heap.cmp = cmp
89 1 : m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
90 1 : for i := range m.levels {
91 1 : l := &m.levels[i]
92 1 : l.iterKey, l.iterValue = l.iter.First()
93 1 : if l.iterKey != nil {
94 1 : item := simpleMergingIterItem{
95 1 : index: i,
96 1 : value: l.iterValue,
97 1 : }
98 1 : item.key.Trailer = l.iterKey.Trailer
99 1 : item.key.UserKey = append(item.key.UserKey[:0], l.iterKey.UserKey...)
100 1 : m.heap.items = append(m.heap.items, item)
101 1 : }
102 : }
103 1 : m.heap.init()
104 1 :
105 1 : if m.heap.len() == 0 {
106 1 : return
107 1 : }
108 1 : m.positionRangeDels()
109 : }
110 :
111 : // Positions all the rangedel iterators at or past the current top of the
112 : // heap, using SeekGE().
113 1 : func (m *simpleMergingIter) positionRangeDels() {
114 1 : item := &m.heap.items[0]
115 1 : for i := range m.levels {
116 1 : l := &m.levels[i]
117 1 : if l.rangeDelIter == nil {
118 1 : continue
119 : }
120 1 : l.tombstone = l.rangeDelIter.SeekGE(item.key.UserKey)
121 : }
122 : }
123 :
124 : // Returns true if not yet done.
125 1 : func (m *simpleMergingIter) step() bool {
126 1 : if m.heap.len() == 0 || m.err != nil {
127 1 : return false
128 1 : }
129 1 : item := &m.heap.items[0]
130 1 : l := &m.levels[item.index]
131 1 : // Sentinels are not relevant for this point checking.
132 1 : if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
133 1 : m.numPoints++
134 1 : keyChanged := m.heap.cmp(item.key.UserKey, m.lastKey.UserKey) != 0
135 1 : if !keyChanged {
136 1 : // At the same user key. We will see them in decreasing seqnum
137 1 : // order so the lastLevel must not be lower.
138 1 : if m.lastLevel > item.index {
139 0 : m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
140 0 : item.key.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
141 0 : m.lastIterMsg)
142 0 : return false
143 0 : }
144 1 : m.lastLevel = item.index
145 1 : } else {
146 1 : // The user key has changed.
147 1 : m.lastKey.Trailer = item.key.Trailer
148 1 : m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.key.UserKey...)
149 1 : m.lastLevel = item.index
150 1 : }
151 : // Ongoing series of MERGE records ends with a MERGE record.
152 1 : if keyChanged && m.valueMerger != nil {
153 1 : var closer io.Closer
154 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
155 1 : if m.err == nil && closer != nil {
156 0 : m.err = closer.Close()
157 0 : }
158 1 : m.valueMerger = nil
159 : }
160 1 : itemValue, _, err := item.value.Value(nil)
161 1 : if err != nil {
162 0 : m.err = err
163 0 : return false
164 0 : }
165 1 : if m.valueMerger != nil {
166 1 : // Ongoing series of MERGE records.
167 1 : switch item.key.Kind() {
168 1 : case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
169 1 : var closer io.Closer
170 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
171 1 : if m.err == nil && closer != nil {
172 0 : m.err = closer.Close()
173 0 : }
174 1 : m.valueMerger = nil
175 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
176 1 : m.err = m.valueMerger.MergeOlder(itemValue)
177 1 : if m.err == nil {
178 1 : var closer io.Closer
179 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
180 1 : if m.err == nil && closer != nil {
181 0 : m.err = closer.Close()
182 0 : }
183 : }
184 1 : m.valueMerger = nil
185 1 : case InternalKeyKindMerge:
186 1 : m.err = m.valueMerger.MergeOlder(itemValue)
187 0 : default:
188 0 : m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
189 0 : item.key.Pretty(m.formatKey),
190 0 : l.iter)
191 0 : return false
192 : }
193 1 : } else if item.key.Kind() == InternalKeyKindMerge && m.err == nil {
194 1 : // New series of MERGE records.
195 1 : m.valueMerger, m.err = m.merge(item.key.UserKey, itemValue)
196 1 : }
197 1 : if m.err != nil {
198 0 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
199 0 : item.key.Pretty(m.formatKey), l.iter)
200 0 : return false
201 0 : }
202 : // Is this point covered by a tombstone at a lower level? Note that all these
203 : // iterators must be positioned at a key > item.key. So the Largest key bound
204 : // of the sstable containing the tombstone >= item.key. So the upper limit of
205 : // the tombstone cannot be file-bounds-constrained to < item.key. But it is
206 : // possible that item.key < smallest key bound of the sstable, in which case
207 : // this tombstone should be ignored.
208 1 : for level := item.index + 1; level < len(m.levels); level++ {
209 1 : lvl := &m.levels[level]
210 1 : if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
211 1 : continue
212 : }
213 1 : if (lvl.smallestUserKey == nil || m.heap.cmp(lvl.smallestUserKey, item.key.UserKey) <= 0) &&
214 1 : lvl.tombstone.Contains(m.heap.cmp, item.key.UserKey) {
215 1 : if lvl.tombstone.CoversAt(m.snapshot, item.key.SeqNum()) {
216 0 : m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
217 0 : lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.key.Pretty(m.formatKey),
218 0 : l.iter)
219 0 : return false
220 0 : }
221 : }
222 : }
223 : }
224 :
225 : // The iterator for the current level may be closed in the following call to
226 : // Next(). We save its debug string for potential use after it is closed -
227 : // either in this current step() invocation or on the next invocation.
228 1 : m.lastIterMsg = l.iter.String()
229 1 :
230 1 : // Step to the next point.
231 1 : if l.iterKey, l.iterValue = l.iter.Next(); l.iterKey != nil {
232 1 : // Check point keys in an sstable are ordered. Although not required, we check
233 1 : // for memtables as well. A subtle check here is that successive sstables of
234 1 : // L1 and higher levels are ordered. This happens when levelIter moves to the
235 1 : // next sstable in the level, in which case item.key is previous sstable's
236 1 : // last point key.
237 1 : if base.InternalCompare(m.heap.cmp, item.key, *l.iterKey) >= 0 {
238 0 : m.err = errors.Errorf("out of order keys %s >= %s in %s",
239 0 : item.key.Pretty(m.formatKey), l.iterKey.Pretty(m.formatKey), l.iter)
240 0 : return false
241 0 : }
242 1 : item.key.Trailer = l.iterKey.Trailer
243 1 : item.key.UserKey = append(item.key.UserKey[:0], l.iterKey.UserKey...)
244 1 : item.value = l.iterValue
245 1 : if m.heap.len() > 1 {
246 1 : m.heap.fix(0)
247 1 : }
248 1 : } else {
249 1 : m.err = l.iter.Close()
250 1 : l.iter = nil
251 1 : m.heap.pop()
252 1 : }
253 1 : if m.err != nil {
254 0 : return false
255 0 : }
256 1 : if m.heap.len() == 0 {
257 1 : // Last record was a MERGE record.
258 1 : if m.valueMerger != nil {
259 1 : var closer io.Closer
260 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
261 1 : if m.err == nil && closer != nil {
262 0 : m.err = closer.Close()
263 0 : }
264 1 : if m.err != nil {
265 0 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
266 0 : item.key.Pretty(m.formatKey), m.lastIterMsg)
267 0 : }
268 1 : m.valueMerger = nil
269 : }
270 1 : return false
271 : }
272 1 : m.positionRangeDels()
273 1 : return true
274 : }
275 :
276 : // Checking that range tombstones are mutually consistent is performed by checkRangeTombstones().
277 : // See the overview comment at the top of the file.
278 : //
279 : // We do this check as follows:
280 : // - For each level that can have untruncated tombstones, compute the atomic compaction
281 : // bounds (getAtomicUnitBounds()) and use them to truncate tombstones.
282 : // - Now that we have a set of truncated tombstones for each level, put them into one
283 : // pool of tombstones along with their level information (addTombstonesFromIter()).
284 : // - Collect the start and end user keys from all these tombstones (collectAllUserKey()) and use
285 : // them to fragment all the tombstones (fragmentUsingUserKey()).
286 : // - Sort tombstones by start key and decreasing seqnum (tombstonesByStartKeyAndSeqnum) -- all
287 : // tombstones that have the same start key will have the same end key because they have been
288 : // fragmented.
289 : // - Iterate and check (iterateAndCheckTombstones()).
290 : // Note that this simple approach requires holding all the tombstones across all levels in-memory.
291 : // A more sophisticated incremental approach could be devised, if necessary.
292 :
293 : // A tombstone and the corresponding level it was found in.
294 : type tombstoneWithLevel struct {
295 : keyspan.Span
296 : level int
297 : // The level in LSM. A -1 means it's a memtable.
298 : lsmLevel int
299 : fileNum FileNum
300 : }
301 :
302 : // For sorting tombstoneWithLevels in increasing order of start UserKey and
303 : // for the same start UserKey in decreasing order of seqnum.
304 : type tombstonesByStartKeyAndSeqnum struct {
305 : cmp Compare
306 : buf []tombstoneWithLevel
307 : }
308 :
309 1 : func (v *tombstonesByStartKeyAndSeqnum) Len() int { return len(v.buf) }
310 1 : func (v *tombstonesByStartKeyAndSeqnum) Less(i, j int) bool {
311 1 : less := v.cmp(v.buf[i].Start, v.buf[j].Start)
312 1 : if less == 0 {
313 1 : return v.buf[i].LargestSeqNum() > v.buf[j].LargestSeqNum()
314 1 : }
315 1 : return less < 0
316 : }
317 1 : func (v *tombstonesByStartKeyAndSeqnum) Swap(i, j int) {
318 1 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
319 1 : }
320 :
321 : func iterateAndCheckTombstones(
322 : cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
323 1 : ) error {
324 1 : sortBuf := tombstonesByStartKeyAndSeqnum{
325 1 : cmp: cmp,
326 1 : buf: tombstones,
327 1 : }
328 1 : sort.Sort(&sortBuf)
329 1 :
330 1 : // For a sequence of tombstones that share the same start UserKey, we will
331 1 : // encounter them in non-increasing seqnum order and so should encounter them
332 1 : // in non-decreasing level order.
333 1 : lastTombstone := tombstoneWithLevel{}
334 1 : for _, t := range tombstones {
335 1 : if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
336 0 : return errors.Errorf("encountered tombstone %s in %s"+
337 0 : " that has a lower seqnum than the same tombstone in %s",
338 0 : t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.fileNum),
339 0 : levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.fileNum))
340 0 : }
341 1 : lastTombstone = t
342 : }
343 1 : return nil
344 : }
345 :
346 : type checkConfig struct {
347 : logger Logger
348 : comparer *Comparer
349 : readState *readState
350 : newIters tableNewIters
351 : seqNum uint64
352 : stats *CheckLevelsStats
353 : merge Merge
354 : formatKey base.FormatKey
355 : }
356 :
357 : // cmp is shorthand for comparer.Compare.
358 1 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
359 :
360 1 : func checkRangeTombstones(c *checkConfig) error {
361 1 : var level int
362 1 : var tombstones []tombstoneWithLevel
363 1 : var err error
364 1 :
365 1 : memtables := c.readState.memtables
366 1 : for i := len(memtables) - 1; i >= 0; i-- {
367 1 : iter := memtables[i].newRangeDelIter(nil)
368 1 : if iter == nil {
369 1 : continue
370 : }
371 1 : if tombstones, err = addTombstonesFromIter(iter, level, -1, 0, tombstones,
372 1 : c.seqNum, c.cmp, c.formatKey, nil); err != nil {
373 0 : return err
374 0 : }
375 1 : level++
376 : }
377 :
378 1 : current := c.readState.current
379 1 : addTombstonesFromLevel := func(files manifest.LevelIterator, lsmLevel int) error {
380 1 : for f := files.First(); f != nil; f = files.Next() {
381 1 : lf := files.Take()
382 1 : atomicUnit, _ := expandToAtomicUnit(c.cmp, lf.Slice(), true /* disableIsCompacting */)
383 1 : lower, upper := manifest.KeyRange(c.cmp, atomicUnit.Iter())
384 1 : iterToClose, iter, err := c.newIters(
385 1 : context.Background(), lf.FileMetadata, &IterOptions{level: manifest.Level(lsmLevel)}, internalIterOpts{})
386 1 : if err != nil {
387 0 : return err
388 0 : }
389 1 : iterToClose.Close()
390 1 : if iter == nil {
391 1 : continue
392 : }
393 1 : truncate := func(t keyspan.Span) keyspan.Span {
394 1 : // Same checks as in keyspan.Truncate.
395 1 : if c.cmp(t.Start, lower.UserKey) < 0 {
396 0 : t.Start = lower.UserKey
397 0 : }
398 1 : if c.cmp(t.End, upper.UserKey) > 0 {
399 0 : t.End = upper.UserKey
400 0 : }
401 1 : if c.cmp(t.Start, t.End) >= 0 {
402 0 : // Remove the keys.
403 0 : t.Keys = t.Keys[:0]
404 0 : }
405 1 : return t
406 : }
407 1 : if tombstones, err = addTombstonesFromIter(iter, level, lsmLevel, f.FileNum,
408 1 : tombstones, c.seqNum, c.cmp, c.formatKey, truncate); err != nil {
409 0 : return err
410 0 : }
411 : }
412 1 : return nil
413 : }
414 : // Now the levels with untruncated tombsones.
415 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
416 1 : if current.L0SublevelFiles[i].Empty() {
417 0 : continue
418 : }
419 1 : err := addTombstonesFromLevel(current.L0SublevelFiles[i].Iter(), 0)
420 1 : if err != nil {
421 0 : return err
422 0 : }
423 1 : level++
424 : }
425 1 : for i := 1; i < len(current.Levels); i++ {
426 1 : if err := addTombstonesFromLevel(current.Levels[i].Iter(), i); err != nil {
427 0 : return err
428 0 : }
429 1 : level++
430 : }
431 1 : if c.stats != nil {
432 0 : c.stats.NumTombstones = len(tombstones)
433 0 : }
434 : // We now have truncated tombstones.
435 : // Fragment them all.
436 1 : userKeys := collectAllUserKeys(c.cmp, tombstones)
437 1 : tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
438 1 : return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
439 : }
440 :
441 0 : func levelOrMemtable(lsmLevel int, fileNum FileNum) string {
442 0 : if lsmLevel == -1 {
443 0 : return "memtable"
444 0 : }
445 0 : return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, fileNum)
446 : }
447 :
448 : func addTombstonesFromIter(
449 : iter keyspan.FragmentIterator,
450 : level int,
451 : lsmLevel int,
452 : fileNum FileNum,
453 : tombstones []tombstoneWithLevel,
454 : seqNum uint64,
455 : cmp Compare,
456 : formatKey base.FormatKey,
457 : truncate func(tombstone keyspan.Span) keyspan.Span,
458 1 : ) (_ []tombstoneWithLevel, err error) {
459 1 : defer func() {
460 1 : err = firstError(err, iter.Close())
461 1 : }()
462 :
463 1 : var prevTombstone keyspan.Span
464 1 : for tomb := iter.First(); tomb != nil; tomb = iter.Next() {
465 1 : t := tomb.Visible(seqNum)
466 1 : if t.Empty() {
467 1 : continue
468 : }
469 1 : t = t.DeepClone()
470 1 : // This is mainly a test for rangeDelV2 formatted blocks which are expected to
471 1 : // be ordered and fragmented on disk. But we anyways check for memtables,
472 1 : // rangeDelV1 as well.
473 1 : if cmp(prevTombstone.End, t.Start) > 0 {
474 0 : return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
475 0 : prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, fileNum))
476 0 : }
477 1 : prevTombstone = t
478 1 :
479 1 : // Truncation of a tombstone must happen after checking its ordering,
480 1 : // fragmentation wrt previous tombstone. Since it is possible that after
481 1 : // truncation the tombstone is ordered, fragmented when it originally wasn't.
482 1 : if truncate != nil {
483 1 : t = truncate(t)
484 1 : }
485 1 : if !t.Empty() {
486 1 : tombstones = append(tombstones, tombstoneWithLevel{
487 1 : Span: t,
488 1 : level: level,
489 1 : lsmLevel: lsmLevel,
490 1 : fileNum: fileNum,
491 1 : })
492 1 : }
493 : }
494 1 : return tombstones, nil
495 : }
496 :
497 : type userKeysSort struct {
498 : cmp Compare
499 : buf [][]byte
500 : }
501 :
502 1 : func (v *userKeysSort) Len() int { return len(v.buf) }
503 1 : func (v *userKeysSort) Less(i, j int) bool {
504 1 : return v.cmp(v.buf[i], v.buf[j]) < 0
505 1 : }
506 1 : func (v *userKeysSort) Swap(i, j int) {
507 1 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
508 1 : }
509 1 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
510 1 : keys := make([][]byte, 0, len(tombstones)*2)
511 1 : for _, t := range tombstones {
512 1 : keys = append(keys, t.Start)
513 1 : keys = append(keys, t.End)
514 1 : }
515 1 : sorter := userKeysSort{
516 1 : cmp: cmp,
517 1 : buf: keys,
518 1 : }
519 1 : sort.Sort(&sorter)
520 1 : var last, curr int
521 1 : for last, curr = -1, 0; curr < len(keys); curr++ {
522 1 : if last < 0 || cmp(keys[last], keys[curr]) != 0 {
523 1 : last++
524 1 : keys[last] = keys[curr]
525 1 : }
526 : }
527 1 : keys = keys[:last+1]
528 1 : return keys
529 : }
530 :
531 : func fragmentUsingUserKeys(
532 : cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
533 1 : ) []tombstoneWithLevel {
534 1 : var buf []tombstoneWithLevel
535 1 : for _, t := range tombstones {
536 1 : // Find the first position with tombstone start < user key
537 1 : i := sort.Search(len(userKeys), func(i int) bool {
538 1 : return cmp(t.Start, userKeys[i]) < 0
539 1 : })
540 1 : for ; i < len(userKeys); i++ {
541 1 : if cmp(userKeys[i], t.End) >= 0 {
542 1 : break
543 : }
544 1 : tPartial := t
545 1 : tPartial.End = userKeys[i]
546 1 : buf = append(buf, tPartial)
547 1 : t.Start = userKeys[i]
548 : }
549 1 : buf = append(buf, t)
550 : }
551 1 : return buf
552 : }
553 :
554 : // CheckLevelsStats provides basic stats on points and tombstones encountered.
555 : type CheckLevelsStats struct {
556 : NumPoints int64
557 : NumTombstones int
558 : }
559 :
560 : // CheckLevels checks:
561 : // - Every entry in the DB is consistent with the level invariant. See the
562 : // comment at the top of the file.
563 : // - Point keys in sstables are ordered.
564 : // - Range delete tombstones in sstables are ordered and fragmented.
565 : // - Successful processing of all MERGE records.
566 1 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
567 1 : // Grab and reference the current readState.
568 1 : readState := d.loadReadState()
569 1 : defer readState.unref()
570 1 :
571 1 : // Determine the seqnum to read at after grabbing the read state (current and
572 1 : // memtables) above.
573 1 : seqNum := d.mu.versions.visibleSeqNum.Load()
574 1 :
575 1 : checkConfig := &checkConfig{
576 1 : logger: d.opts.Logger,
577 1 : comparer: d.opts.Comparer,
578 1 : readState: readState,
579 1 : newIters: d.newIters,
580 1 : seqNum: seqNum,
581 1 : stats: stats,
582 1 : merge: d.merge,
583 1 : formatKey: d.opts.Comparer.FormatKey,
584 1 : }
585 1 : return checkLevelsInternal(checkConfig)
586 1 : }
587 :
588 1 : func checkLevelsInternal(c *checkConfig) (err error) {
589 1 : // Phase 1: Use a simpleMergingIter to step through all the points and ensure
590 1 : // that points with the same user key at different levels are not inverted
591 1 : // wrt sequence numbers and the same holds for tombstones that cover points.
592 1 : // To do this, one needs to construct a simpleMergingIter which is similar to
593 1 : // how one constructs a mergingIter.
594 1 :
595 1 : // Add mem tables from newest to oldest.
596 1 : var mlevels []simpleMergingIterLevel
597 1 : defer func() {
598 1 : for i := range mlevels {
599 1 : l := &mlevels[i]
600 1 : if l.iter != nil {
601 1 : err = firstError(err, l.iter.Close())
602 1 : l.iter = nil
603 1 : }
604 1 : if l.rangeDelIter != nil {
605 1 : err = firstError(err, l.rangeDelIter.Close())
606 1 : l.rangeDelIter = nil
607 1 : }
608 : }
609 : }()
610 :
611 1 : memtables := c.readState.memtables
612 1 : for i := len(memtables) - 1; i >= 0; i-- {
613 1 : mem := memtables[i]
614 1 : mlevels = append(mlevels, simpleMergingIterLevel{
615 1 : iter: mem.newIter(nil),
616 1 : rangeDelIter: mem.newRangeDelIter(nil),
617 1 : })
618 1 : }
619 :
620 1 : current := c.readState.current
621 1 : // Determine the final size for mlevels so that there are no more
622 1 : // reallocations. levelIter will hold a pointer to elements in mlevels.
623 1 : start := len(mlevels)
624 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
625 1 : if current.L0SublevelFiles[sublevel].Empty() {
626 0 : continue
627 : }
628 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
629 : }
630 1 : for level := 1; level < len(current.Levels); level++ {
631 1 : if current.Levels[level].Empty() {
632 1 : continue
633 : }
634 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
635 : }
636 1 : mlevelAlloc := mlevels[start:]
637 1 : // Add L0 files by sublevel.
638 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
639 1 : if current.L0SublevelFiles[sublevel].Empty() {
640 0 : continue
641 : }
642 1 : manifestIter := current.L0SublevelFiles[sublevel].Iter()
643 1 : iterOpts := IterOptions{logger: c.logger}
644 1 : li := &levelIter{}
645 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
646 1 : manifest.L0Sublevel(sublevel), internalIterOpts{})
647 1 : li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
648 1 : li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
649 1 : mlevelAlloc[0].iter = li
650 1 : mlevelAlloc = mlevelAlloc[1:]
651 : }
652 1 : for level := 1; level < len(current.Levels); level++ {
653 1 : if current.Levels[level].Empty() {
654 1 : continue
655 : }
656 :
657 1 : iterOpts := IterOptions{logger: c.logger}
658 1 : li := &levelIter{}
659 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters,
660 1 : current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
661 1 : li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
662 1 : li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
663 1 : mlevelAlloc[0].iter = li
664 1 : mlevelAlloc = mlevelAlloc[1:]
665 : }
666 :
667 1 : mergingIter := &simpleMergingIter{}
668 1 : mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
669 1 : for cont := mergingIter.step(); cont; cont = mergingIter.step() {
670 1 : }
671 1 : if err := mergingIter.err; err != nil {
672 0 : return err
673 0 : }
674 1 : if c.stats != nil {
675 0 : c.stats.NumPoints = mergingIter.numPoints
676 0 : }
677 :
678 : // Phase 2: Check that the tombstones are mutually consistent.
679 1 : return checkRangeTombstones(c)
680 : }
681 :
682 : type simpleMergingIterItem struct {
683 : index int
684 : key InternalKey
685 : value base.LazyValue
686 : }
687 :
688 : type simpleMergingIterHeap struct {
689 : cmp Compare
690 : reverse bool
691 : items []simpleMergingIterItem
692 : }
693 :
694 1 : func (h *simpleMergingIterHeap) len() int {
695 1 : return len(h.items)
696 1 : }
697 :
698 1 : func (h *simpleMergingIterHeap) less(i, j int) bool {
699 1 : ikey, jkey := h.items[i].key, h.items[j].key
700 1 : if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
701 1 : if h.reverse {
702 0 : return c > 0
703 0 : }
704 1 : return c < 0
705 : }
706 1 : if h.reverse {
707 0 : return ikey.Trailer < jkey.Trailer
708 0 : }
709 1 : return ikey.Trailer > jkey.Trailer
710 : }
711 :
712 1 : func (h *simpleMergingIterHeap) swap(i, j int) {
713 1 : h.items[i], h.items[j] = h.items[j], h.items[i]
714 1 : }
715 :
716 : // init, fix, up and down are copied from the go stdlib.
717 1 : func (h *simpleMergingIterHeap) init() {
718 1 : // heapify
719 1 : n := h.len()
720 1 : for i := n/2 - 1; i >= 0; i-- {
721 1 : h.down(i, n)
722 1 : }
723 : }
724 :
725 1 : func (h *simpleMergingIterHeap) fix(i int) {
726 1 : if !h.down(i, h.len()) {
727 1 : h.up(i)
728 1 : }
729 : }
730 :
731 1 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
732 1 : n := h.len() - 1
733 1 : h.swap(0, n)
734 1 : h.down(0, n)
735 1 : item := &h.items[n]
736 1 : h.items = h.items[:n]
737 1 : return item
738 1 : }
739 :
740 1 : func (h *simpleMergingIterHeap) up(j int) {
741 1 : for {
742 1 : i := (j - 1) / 2 // parent
743 1 : if i == j || !h.less(j, i) {
744 1 : break
745 : }
746 0 : h.swap(i, j)
747 0 : j = i
748 : }
749 : }
750 :
751 1 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
752 1 : i := i0
753 1 : for {
754 1 : j1 := 2*i + 1
755 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
756 1 : break
757 : }
758 1 : j := j1 // left child
759 1 : if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
760 1 : j = j2 // = 2*i + 2 // right child
761 1 : }
762 1 : if !h.less(j, i) {
763 1 : break
764 : }
765 1 : h.swap(i, j)
766 1 : i = j
767 : }
768 1 : return i > i0
769 : }
|