Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "sort"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : )
18 :
19 : // This file implements DB.CheckLevels() which checks that every entry in the
20 : // DB is consistent with respect to the level invariant: any point (or the
21 : // infinite number of points in a range tombstone) has a seqnum such that a
22 : // point with the same UserKey at a lower level has a lower seqnum. This is an
23 : // expensive check since it involves iterating over all the entries in the DB,
24 : // hence only intended for tests or tools.
25 : //
26 : // If we ignore range tombstones, the consistency checking of points can be
27 : // done with a simplified version of mergingIter. simpleMergingIter is that
28 : // simplified version of mergingIter that only needs to step through points
29 : // (analogous to only doing Next()). It can also easily accommodate
30 : // consistency checking of points relative to range tombstones.
31 : // simpleMergingIter does not do any seek optimizations present in mergingIter
32 : // (it minimally needs to seek the range delete iterators to position them at
33 : // or past the current point) since it does not want to miss points for
34 : // purposes of consistency checking.
35 : //
36 : // Mutual consistency of range tombstones is non-trivial to check. One needs
37 : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
38 : // a lower level. The start key of the former is not contained in the latter
39 : // and we can't use the exclusive end key, c, for a containment check since it
40 : // is the sentinel key. We observe that if these tombstones were fragmented
41 : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
42 : // [b, c)#10 at the lower level and then it is is trivial to compare the two
43 : // [b, c) tombstones. Note that this fragmentation needs to take into account
44 : // that tombstones in a file may be untruncated and need to act within the
45 : // bounds of the file. This checking is performed by checkRangeTombstones()
46 : // and its helper functions.
47 :
48 : // The per-level structure used by simpleMergingIter.
49 : type simpleMergingIterLevel struct {
50 : iter internalIterator
51 : rangeDelIter keyspan.FragmentIterator
52 : levelIterBoundaryContext
53 :
54 : iterKV *base.InternalKV
55 : tombstone *keyspan.Span
56 : }
57 :
58 : type simpleMergingIter struct {
59 : levels []simpleMergingIterLevel
60 : snapshot uint64
61 : heap simpleMergingIterHeap
62 : // The last point's key and level. For validation.
63 : lastKey InternalKey
64 : lastLevel int
65 : lastIterMsg string
66 : // A non-nil valueMerger means MERGE record processing is ongoing.
67 : valueMerger base.ValueMerger
68 : // The first error will cause step() to return false.
69 : err error
70 : numPoints int64
71 : merge Merge
72 : formatKey base.FormatKey
73 : }
74 :
75 : func (m *simpleMergingIter) init(
76 : merge Merge,
77 : cmp Compare,
78 : snapshot uint64,
79 : formatKey base.FormatKey,
80 : levels ...simpleMergingIterLevel,
81 2 : ) {
82 2 : m.levels = levels
83 2 : m.formatKey = formatKey
84 2 : m.merge = merge
85 2 : m.snapshot = snapshot
86 2 : m.lastLevel = -1
87 2 : m.heap.cmp = cmp
88 2 : m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
89 2 : for i := range m.levels {
90 2 : l := &m.levels[i]
91 2 : l.iterKV = l.iter.First()
92 2 : if l.iterKV != nil {
93 2 : item := simpleMergingIterItem{
94 2 : index: i,
95 2 : value: l.iterKV.V,
96 2 : }
97 2 : item.key = l.iterKV.K.Clone()
98 2 : m.heap.items = append(m.heap.items, item)
99 2 : }
100 : }
101 2 : m.heap.init()
102 2 :
103 2 : if m.heap.len() == 0 {
104 2 : return
105 2 : }
106 2 : m.positionRangeDels()
107 : }
108 :
109 : // Positions all the rangedel iterators at or past the current top of the
110 : // heap, using SeekGE().
111 2 : func (m *simpleMergingIter) positionRangeDels() {
112 2 : item := &m.heap.items[0]
113 2 : for i := range m.levels {
114 2 : l := &m.levels[i]
115 2 : if l.rangeDelIter == nil {
116 2 : continue
117 : }
118 2 : t, err := l.rangeDelIter.SeekGE(item.key.UserKey)
119 2 : m.err = firstError(m.err, err)
120 2 : l.tombstone = t
121 : }
122 : }
123 :
124 : // Returns true if not yet done.
125 2 : func (m *simpleMergingIter) step() bool {
126 2 : if m.heap.len() == 0 || m.err != nil {
127 2 : return false
128 2 : }
129 2 : item := &m.heap.items[0]
130 2 : l := &m.levels[item.index]
131 2 : // Sentinels are not relevant for this point checking.
132 2 : if !l.isIgnorableBoundaryKey && !item.key.IsExclusiveSentinel() &&
133 2 : item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
134 2 : m.numPoints++
135 2 : keyChanged := m.heap.cmp(item.key.UserKey, m.lastKey.UserKey) != 0
136 2 : if !keyChanged {
137 2 : // At the same user key. We will see them in decreasing seqnum
138 2 : // order so the lastLevel must not be lower.
139 2 : if m.lastLevel > item.index {
140 1 : m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
141 1 : item.key.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
142 1 : m.lastIterMsg)
143 1 : return false
144 1 : }
145 2 : m.lastLevel = item.index
146 2 : } else {
147 2 : // The user key has changed.
148 2 : m.lastKey.Trailer = item.key.Trailer
149 2 : m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.key.UserKey...)
150 2 : m.lastLevel = item.index
151 2 : }
152 : // Ongoing series of MERGE records ends with a MERGE record.
153 2 : if keyChanged && m.valueMerger != nil {
154 2 : var closer io.Closer
155 2 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
156 2 : if m.err == nil && closer != nil {
157 0 : m.err = closer.Close()
158 0 : }
159 2 : m.valueMerger = nil
160 : }
161 2 : itemValue, _, err := item.value.Value(nil)
162 2 : if err != nil {
163 0 : m.err = err
164 0 : return false
165 0 : }
166 2 : if m.valueMerger != nil {
167 2 : // Ongoing series of MERGE records.
168 2 : switch item.key.Kind() {
169 2 : case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
170 2 : var closer io.Closer
171 2 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
172 2 : if m.err == nil && closer != nil {
173 1 : m.err = closer.Close()
174 1 : }
175 2 : m.valueMerger = nil
176 2 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
177 2 : m.err = m.valueMerger.MergeOlder(itemValue)
178 2 : if m.err == nil {
179 2 : var closer io.Closer
180 2 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
181 2 : if m.err == nil && closer != nil {
182 1 : m.err = closer.Close()
183 1 : }
184 : }
185 2 : m.valueMerger = nil
186 2 : case InternalKeyKindMerge:
187 2 : m.err = m.valueMerger.MergeOlder(itemValue)
188 0 : default:
189 0 : m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
190 0 : item.key.Pretty(m.formatKey),
191 0 : l.iter)
192 0 : return false
193 : }
194 2 : } else if item.key.Kind() == InternalKeyKindMerge && m.err == nil {
195 2 : // New series of MERGE records.
196 2 : m.valueMerger, m.err = m.merge(item.key.UserKey, itemValue)
197 2 : }
198 2 : if m.err != nil {
199 1 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
200 1 : item.key.Pretty(m.formatKey), l.iter)
201 1 : return false
202 1 : }
203 : // Is this point covered by a tombstone at a lower level? Note that all these
204 : // iterators must be positioned at a key > item.key.
205 2 : for level := item.index + 1; level < len(m.levels); level++ {
206 2 : lvl := &m.levels[level]
207 2 : if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
208 2 : continue
209 : }
210 2 : if lvl.tombstone.Contains(m.heap.cmp, item.key.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.key.SeqNum()) {
211 1 : m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
212 1 : lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.key.Pretty(m.formatKey),
213 1 : l.iter)
214 1 : return false
215 1 : }
216 : }
217 : }
218 :
219 : // The iterator for the current level may be closed in the following call to
220 : // Next(). We save its debug string for potential use after it is closed -
221 : // either in this current step() invocation or on the next invocation.
222 2 : m.lastIterMsg = l.iter.String()
223 2 :
224 2 : // Step to the next point.
225 2 : l.iterKV = l.iter.Next()
226 2 : if !l.isIgnorableBoundaryKey {
227 2 : if l.iterKV != nil {
228 2 : // Check point keys in an sstable are ordered. Although not required, we check
229 2 : // for memtables as well. A subtle check here is that successive sstables of
230 2 : // L1 and higher levels are ordered. This happens when levelIter moves to the
231 2 : // next sstable in the level, in which case item.key is previous sstable's
232 2 : // last point key.
233 2 : if base.InternalCompare(m.heap.cmp, item.key, l.iterKV.K) >= 0 {
234 1 : m.err = errors.Errorf("out of order keys %s >= %s in %s",
235 1 : item.key.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
236 1 : return false
237 1 : }
238 2 : item.key = base.InternalKey{
239 2 : Trailer: l.iterKV.K.Trailer,
240 2 : UserKey: append(item.key.UserKey[:0], l.iterKV.K.UserKey...),
241 2 : }
242 2 : item.value = l.iterKV.V
243 2 : if m.heap.len() > 1 {
244 2 : m.heap.fix(0)
245 2 : }
246 2 : } else {
247 2 : m.err = l.iter.Close()
248 2 : l.iter = nil
249 2 : m.heap.pop()
250 2 : }
251 2 : if m.err != nil {
252 0 : return false
253 0 : }
254 2 : if m.heap.len() == 0 {
255 2 : // Last record was a MERGE record.
256 2 : if m.valueMerger != nil {
257 2 : var closer io.Closer
258 2 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
259 2 : if m.err == nil && closer != nil {
260 0 : m.err = closer.Close()
261 0 : }
262 2 : if m.err != nil {
263 1 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
264 1 : item.key.Pretty(m.formatKey), m.lastIterMsg)
265 1 : }
266 2 : m.valueMerger = nil
267 : }
268 2 : return false
269 : }
270 : }
271 2 : m.positionRangeDels()
272 2 : return true
273 : }
274 :
275 : // Checking that range tombstones are mutually consistent is performed by
276 : // checkRangeTombstones(). See the overview comment at the top of the file.
277 : //
278 : // We do this check as follows:
279 : // - Collect the tombstones for each level, put them into one pool of tombstones
280 : // along with their level information (addTombstonesFromIter()).
281 : // - Collect the start and end user keys from all these tombstones
282 : // (collectAllUserKey()) and use them to fragment all the tombstones
283 : // (fragmentUsingUserKey()).
284 : // - Sort tombstones by start key and decreasing seqnum
285 : // (tombstonesByStartKeyAndSeqnum) - all tombstones that have the same start
286 : // key will have the same end key because they have been fragmented.
287 : // - Iterate and check (iterateAndCheckTombstones()).
288 : //
289 : // Note that this simple approach requires holding all the tombstones across all
290 : // levels in-memory. A more sophisticated incremental approach could be devised,
291 : // if necessary.
292 :
293 : // A tombstone and the corresponding level it was found in.
294 : type tombstoneWithLevel struct {
295 : keyspan.Span
296 : level int
297 : // The level in LSM. A -1 means it's a memtable.
298 : lsmLevel int
299 : fileNum FileNum
300 : }
301 :
302 : // For sorting tombstoneWithLevels in increasing order of start UserKey and
303 : // for the same start UserKey in decreasing order of seqnum.
304 : type tombstonesByStartKeyAndSeqnum struct {
305 : cmp Compare
306 : buf []tombstoneWithLevel
307 : }
308 :
309 2 : func (v *tombstonesByStartKeyAndSeqnum) Len() int { return len(v.buf) }
310 2 : func (v *tombstonesByStartKeyAndSeqnum) Less(i, j int) bool {
311 2 : less := v.cmp(v.buf[i].Start, v.buf[j].Start)
312 2 : if less == 0 {
313 2 : return v.buf[i].LargestSeqNum() > v.buf[j].LargestSeqNum()
314 2 : }
315 2 : return less < 0
316 : }
317 2 : func (v *tombstonesByStartKeyAndSeqnum) Swap(i, j int) {
318 2 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
319 2 : }
320 :
321 : func iterateAndCheckTombstones(
322 : cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
323 2 : ) error {
324 2 : sortBuf := tombstonesByStartKeyAndSeqnum{
325 2 : cmp: cmp,
326 2 : buf: tombstones,
327 2 : }
328 2 : sort.Sort(&sortBuf)
329 2 :
330 2 : // For a sequence of tombstones that share the same start UserKey, we will
331 2 : // encounter them in non-increasing seqnum order and so should encounter them
332 2 : // in non-decreasing level order.
333 2 : lastTombstone := tombstoneWithLevel{}
334 2 : for _, t := range tombstones {
335 2 : if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
336 1 : return errors.Errorf("encountered tombstone %s in %s"+
337 1 : " that has a lower seqnum than the same tombstone in %s",
338 1 : t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.fileNum),
339 1 : levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.fileNum))
340 1 : }
341 2 : lastTombstone = t
342 : }
343 2 : return nil
344 : }
345 :
346 : type checkConfig struct {
347 : logger Logger
348 : comparer *Comparer
349 : readState *readState
350 : newIters tableNewIters
351 : seqNum uint64
352 : stats *CheckLevelsStats
353 : merge Merge
354 : formatKey base.FormatKey
355 : }
356 :
357 : // cmp is shorthand for comparer.Compare.
358 2 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
359 :
360 2 : func checkRangeTombstones(c *checkConfig) error {
361 2 : var level int
362 2 : var tombstones []tombstoneWithLevel
363 2 : var err error
364 2 :
365 2 : memtables := c.readState.memtables
366 2 : for i := len(memtables) - 1; i >= 0; i-- {
367 2 : iter := memtables[i].newRangeDelIter(nil)
368 2 : if iter == nil {
369 2 : continue
370 : }
371 2 : tombstones, err = addTombstonesFromIter(
372 2 : iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
373 2 : )
374 2 : if err != nil {
375 0 : return err
376 0 : }
377 2 : level++
378 : }
379 :
380 2 : current := c.readState.current
381 2 : addTombstonesFromLevel := func(files manifest.LevelIterator, lsmLevel int) error {
382 2 : for f := files.First(); f != nil; f = files.Next() {
383 2 : lf := files.Take()
384 2 : //lower, upper := manifest.KeyRange(c.cmp, lf.Iter())
385 2 : iterToClose, iter, err := c.newIters.TODO(
386 2 : context.Background(), lf.FileMetadata, &IterOptions{level: manifest.Level(lsmLevel)}, internalIterOpts{})
387 2 : if err != nil {
388 0 : return err
389 0 : }
390 2 : iterToClose.Close()
391 2 : if iter == nil {
392 2 : continue
393 : }
394 2 : if tombstones, err = addTombstonesFromIter(iter, level, lsmLevel, f.FileNum,
395 2 : tombstones, c.seqNum, c.cmp, c.formatKey); err != nil {
396 1 : return err
397 1 : }
398 : }
399 2 : return nil
400 : }
401 : // Now the levels with untruncated tombsones.
402 2 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
403 2 : if current.L0SublevelFiles[i].Empty() {
404 0 : continue
405 : }
406 2 : err := addTombstonesFromLevel(current.L0SublevelFiles[i].Iter(), 0)
407 2 : if err != nil {
408 0 : return err
409 0 : }
410 2 : level++
411 : }
412 2 : for i := 1; i < len(current.Levels); i++ {
413 2 : if err := addTombstonesFromLevel(current.Levels[i].Iter(), i); err != nil {
414 1 : return err
415 1 : }
416 2 : level++
417 : }
418 2 : if c.stats != nil {
419 1 : c.stats.NumTombstones = len(tombstones)
420 1 : }
421 : // We now have truncated tombstones.
422 : // Fragment them all.
423 2 : userKeys := collectAllUserKeys(c.cmp, tombstones)
424 2 : tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
425 2 : return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
426 : }
427 :
428 1 : func levelOrMemtable(lsmLevel int, fileNum FileNum) string {
429 1 : if lsmLevel == -1 {
430 0 : return "memtable"
431 0 : }
432 1 : return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, fileNum)
433 : }
434 :
435 : func addTombstonesFromIter(
436 : iter keyspan.FragmentIterator,
437 : level int,
438 : lsmLevel int,
439 : fileNum FileNum,
440 : tombstones []tombstoneWithLevel,
441 : seqNum uint64,
442 : cmp Compare,
443 : formatKey base.FormatKey,
444 2 : ) (_ []tombstoneWithLevel, err error) {
445 2 : defer func() {
446 2 : err = firstError(err, iter.Close())
447 2 : }()
448 :
449 2 : var prevTombstone keyspan.Span
450 2 : tomb, err := iter.First()
451 2 : for ; tomb != nil; tomb, err = iter.Next() {
452 2 : t := tomb.Visible(seqNum)
453 2 : if t.Empty() {
454 2 : continue
455 : }
456 2 : t = t.DeepClone()
457 2 : // This is mainly a test for rangeDelV2 formatted blocks which are expected to
458 2 : // be ordered and fragmented on disk. But we anyways check for memtables,
459 2 : // rangeDelV1 as well.
460 2 : if cmp(prevTombstone.End, t.Start) > 0 {
461 1 : return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
462 1 : prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, fileNum))
463 1 : }
464 2 : prevTombstone = t
465 2 :
466 2 : if !t.Empty() {
467 2 : tombstones = append(tombstones, tombstoneWithLevel{
468 2 : Span: t,
469 2 : level: level,
470 2 : lsmLevel: lsmLevel,
471 2 : fileNum: fileNum,
472 2 : })
473 2 : }
474 : }
475 2 : if err != nil {
476 0 : return nil, err
477 0 : }
478 2 : return tombstones, nil
479 : }
480 :
481 : type userKeysSort struct {
482 : cmp Compare
483 : buf [][]byte
484 : }
485 :
486 2 : func (v *userKeysSort) Len() int { return len(v.buf) }
487 2 : func (v *userKeysSort) Less(i, j int) bool {
488 2 : return v.cmp(v.buf[i], v.buf[j]) < 0
489 2 : }
490 2 : func (v *userKeysSort) Swap(i, j int) {
491 2 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
492 2 : }
493 2 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
494 2 : keys := make([][]byte, 0, len(tombstones)*2)
495 2 : for _, t := range tombstones {
496 2 : keys = append(keys, t.Start)
497 2 : keys = append(keys, t.End)
498 2 : }
499 2 : sorter := userKeysSort{
500 2 : cmp: cmp,
501 2 : buf: keys,
502 2 : }
503 2 : sort.Sort(&sorter)
504 2 : var last, curr int
505 2 : for last, curr = -1, 0; curr < len(keys); curr++ {
506 2 : if last < 0 || cmp(keys[last], keys[curr]) != 0 {
507 2 : last++
508 2 : keys[last] = keys[curr]
509 2 : }
510 : }
511 2 : keys = keys[:last+1]
512 2 : return keys
513 : }
514 :
515 : func fragmentUsingUserKeys(
516 : cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
517 2 : ) []tombstoneWithLevel {
518 2 : var buf []tombstoneWithLevel
519 2 : for _, t := range tombstones {
520 2 : // Find the first position with tombstone start < user key
521 2 : i := sort.Search(len(userKeys), func(i int) bool {
522 2 : return cmp(t.Start, userKeys[i]) < 0
523 2 : })
524 2 : for ; i < len(userKeys); i++ {
525 2 : if cmp(userKeys[i], t.End) >= 0 {
526 2 : break
527 : }
528 2 : tPartial := t
529 2 : tPartial.End = userKeys[i]
530 2 : buf = append(buf, tPartial)
531 2 : t.Start = userKeys[i]
532 : }
533 2 : buf = append(buf, t)
534 : }
535 2 : return buf
536 : }
537 :
538 : // CheckLevelsStats provides basic stats on points and tombstones encountered.
539 : type CheckLevelsStats struct {
540 : NumPoints int64
541 : NumTombstones int
542 : }
543 :
544 : // CheckLevels checks:
545 : // - Every entry in the DB is consistent with the level invariant. See the
546 : // comment at the top of the file.
547 : // - Point keys in sstables are ordered.
548 : // - Range delete tombstones in sstables are ordered and fragmented.
549 : // - Successful processing of all MERGE records.
550 2 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
551 2 : // Grab and reference the current readState.
552 2 : readState := d.loadReadState()
553 2 : defer readState.unref()
554 2 :
555 2 : // Determine the seqnum to read at after grabbing the read state (current and
556 2 : // memtables) above.
557 2 : seqNum := d.mu.versions.visibleSeqNum.Load()
558 2 :
559 2 : checkConfig := &checkConfig{
560 2 : logger: d.opts.Logger,
561 2 : comparer: d.opts.Comparer,
562 2 : readState: readState,
563 2 : newIters: d.newIters,
564 2 : seqNum: seqNum,
565 2 : stats: stats,
566 2 : merge: d.merge,
567 2 : formatKey: d.opts.Comparer.FormatKey,
568 2 : }
569 2 : return checkLevelsInternal(checkConfig)
570 2 : }
571 :
572 2 : func checkLevelsInternal(c *checkConfig) (err error) {
573 2 : // Phase 1: Use a simpleMergingIter to step through all the points and ensure
574 2 : // that points with the same user key at different levels are not inverted
575 2 : // wrt sequence numbers and the same holds for tombstones that cover points.
576 2 : // To do this, one needs to construct a simpleMergingIter which is similar to
577 2 : // how one constructs a mergingIter.
578 2 :
579 2 : // Add mem tables from newest to oldest.
580 2 : var mlevels []simpleMergingIterLevel
581 2 : defer func() {
582 2 : for i := range mlevels {
583 2 : l := &mlevels[i]
584 2 : if l.iter != nil {
585 2 : err = firstError(err, l.iter.Close())
586 2 : l.iter = nil
587 2 : }
588 2 : if l.rangeDelIter != nil {
589 2 : err = firstError(err, l.rangeDelIter.Close())
590 2 : l.rangeDelIter = nil
591 2 : }
592 : }
593 : }()
594 :
595 2 : memtables := c.readState.memtables
596 2 : for i := len(memtables) - 1; i >= 0; i-- {
597 2 : mem := memtables[i]
598 2 : mlevels = append(mlevels, simpleMergingIterLevel{
599 2 : iter: mem.newIter(nil),
600 2 : rangeDelIter: mem.newRangeDelIter(nil),
601 2 : })
602 2 : }
603 :
604 2 : current := c.readState.current
605 2 : // Determine the final size for mlevels so that there are no more
606 2 : // reallocations. levelIter will hold a pointer to elements in mlevels.
607 2 : start := len(mlevels)
608 2 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
609 2 : if current.L0SublevelFiles[sublevel].Empty() {
610 0 : continue
611 : }
612 2 : mlevels = append(mlevels, simpleMergingIterLevel{})
613 : }
614 2 : for level := 1; level < len(current.Levels); level++ {
615 2 : if current.Levels[level].Empty() {
616 2 : continue
617 : }
618 2 : mlevels = append(mlevels, simpleMergingIterLevel{})
619 : }
620 2 : mlevelAlloc := mlevels[start:]
621 2 : // Add L0 files by sublevel.
622 2 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
623 2 : if current.L0SublevelFiles[sublevel].Empty() {
624 0 : continue
625 : }
626 2 : manifestIter := current.L0SublevelFiles[sublevel].Iter()
627 2 : iterOpts := IterOptions{logger: c.logger}
628 2 : li := &levelIter{}
629 2 : li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
630 2 : manifest.L0Sublevel(sublevel), internalIterOpts{})
631 2 : li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
632 2 : li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
633 2 : mlevelAlloc[0].iter = li
634 2 : mlevelAlloc = mlevelAlloc[1:]
635 : }
636 2 : for level := 1; level < len(current.Levels); level++ {
637 2 : if current.Levels[level].Empty() {
638 2 : continue
639 : }
640 :
641 2 : iterOpts := IterOptions{logger: c.logger}
642 2 : li := &levelIter{}
643 2 : li.init(context.Background(), iterOpts, c.comparer, c.newIters,
644 2 : current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
645 2 : li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
646 2 : li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
647 2 : mlevelAlloc[0].iter = li
648 2 : mlevelAlloc = mlevelAlloc[1:]
649 : }
650 :
651 2 : mergingIter := &simpleMergingIter{}
652 2 : mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
653 2 : for cont := mergingIter.step(); cont; cont = mergingIter.step() {
654 2 : }
655 2 : if err := mergingIter.err; err != nil {
656 1 : return err
657 1 : }
658 2 : if c.stats != nil {
659 1 : c.stats.NumPoints = mergingIter.numPoints
660 1 : }
661 :
662 : // Phase 2: Check that the tombstones are mutually consistent.
663 2 : return checkRangeTombstones(c)
664 : }
665 :
666 : type simpleMergingIterItem struct {
667 : index int
668 : key InternalKey
669 : value base.LazyValue
670 : }
671 :
672 : type simpleMergingIterHeap struct {
673 : cmp Compare
674 : reverse bool
675 : items []simpleMergingIterItem
676 : }
677 :
678 2 : func (h *simpleMergingIterHeap) len() int {
679 2 : return len(h.items)
680 2 : }
681 :
682 2 : func (h *simpleMergingIterHeap) less(i, j int) bool {
683 2 : ikey, jkey := h.items[i].key, h.items[j].key
684 2 : if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
685 2 : if h.reverse {
686 0 : return c > 0
687 0 : }
688 2 : return c < 0
689 : }
690 2 : if h.reverse {
691 0 : return ikey.Trailer < jkey.Trailer
692 0 : }
693 2 : return ikey.Trailer > jkey.Trailer
694 : }
695 :
696 2 : func (h *simpleMergingIterHeap) swap(i, j int) {
697 2 : h.items[i], h.items[j] = h.items[j], h.items[i]
698 2 : }
699 :
700 : // init, fix, up and down are copied from the go stdlib.
701 2 : func (h *simpleMergingIterHeap) init() {
702 2 : // heapify
703 2 : n := h.len()
704 2 : for i := n/2 - 1; i >= 0; i-- {
705 2 : h.down(i, n)
706 2 : }
707 : }
708 :
709 2 : func (h *simpleMergingIterHeap) fix(i int) {
710 2 : if !h.down(i, h.len()) {
711 2 : h.up(i)
712 2 : }
713 : }
714 :
715 2 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
716 2 : n := h.len() - 1
717 2 : h.swap(0, n)
718 2 : h.down(0, n)
719 2 : item := &h.items[n]
720 2 : h.items = h.items[:n]
721 2 : return item
722 2 : }
723 :
724 2 : func (h *simpleMergingIterHeap) up(j int) {
725 2 : for {
726 2 : i := (j - 1) / 2 // parent
727 2 : if i == j || !h.less(j, i) {
728 2 : break
729 : }
730 0 : h.swap(i, j)
731 0 : j = i
732 : }
733 : }
734 :
735 2 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
736 2 : i := i0
737 2 : for {
738 2 : j1 := 2*i + 1
739 2 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
740 2 : break
741 : }
742 2 : j := j1 // left child
743 2 : if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
744 2 : j = j2 // = 2*i + 2 // right child
745 2 : }
746 2 : if !h.less(j, i) {
747 2 : break
748 : }
749 2 : h.swap(i, j)
750 2 : i = j
751 : }
752 2 : return i > i0
753 : }
|