Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "sort"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : )
18 :
19 : // This file implements DB.CheckLevels() which checks that every entry in the
20 : // DB is consistent with respect to the level invariant: any point (or the
21 : // infinite number of points in a range tombstone) has a seqnum such that a
22 : // point with the same UserKey at a lower level has a lower seqnum. This is an
23 : // expensive check since it involves iterating over all the entries in the DB,
24 : // hence only intended for tests or tools.
25 : //
26 : // If we ignore range tombstones, the consistency checking of points can be
27 : // done with a simplified version of mergingIter. simpleMergingIter is that
28 : // simplified version of mergingIter that only needs to step through points
29 : // (analogous to only doing Next()). It can also easily accommodate
30 : // consistency checking of points relative to range tombstones.
31 : // simpleMergingIter does not do any seek optimizations present in mergingIter
32 : // (it minimally needs to seek the range delete iterators to position them at
33 : // or past the current point) since it does not want to miss points for
34 : // purposes of consistency checking.
35 : //
36 : // Mutual consistency of range tombstones is non-trivial to check. One needs
37 : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
38 : // a lower level. The start key of the former is not contained in the latter
39 : // and we can't use the exclusive end key, c, for a containment check since it
40 : // is the sentinel key. We observe that if these tombstones were fragmented
41 : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
42 : // [b, c)#10 at the lower level and then it is is trivial to compare the two
43 : // [b, c) tombstones. Note that this fragmentation needs to take into account
44 : // that tombstones in a file may be untruncated and need to act within the
45 : // bounds of the file. This checking is performed by checkRangeTombstones()
46 : // and its helper functions.
47 :
48 : // The per-level structure used by simpleMergingIter.
49 : type simpleMergingIterLevel struct {
50 : iter internalIterator
51 : rangeDelIter keyspan.FragmentIterator
52 : levelIterBoundaryContext
53 :
54 : iterKey *InternalKey
55 : iterValue base.LazyValue
56 : tombstone *keyspan.Span
57 : }
58 :
59 : type simpleMergingIter struct {
60 : levels []simpleMergingIterLevel
61 : snapshot uint64
62 : heap simpleMergingIterHeap
63 : // The last point's key and level. For validation.
64 : lastKey InternalKey
65 : lastLevel int
66 : lastIterMsg string
67 : // A non-nil valueMerger means MERGE record processing is ongoing.
68 : valueMerger base.ValueMerger
69 : // The first error will cause step() to return false.
70 : err error
71 : numPoints int64
72 : merge Merge
73 : formatKey base.FormatKey
74 : }
75 :
76 : func (m *simpleMergingIter) init(
77 : merge Merge,
78 : cmp Compare,
79 : snapshot uint64,
80 : formatKey base.FormatKey,
81 : levels ...simpleMergingIterLevel,
82 1 : ) {
83 1 : m.levels = levels
84 1 : m.formatKey = formatKey
85 1 : m.merge = merge
86 1 : m.snapshot = snapshot
87 1 : m.lastLevel = -1
88 1 : m.heap.cmp = cmp
89 1 : m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
90 1 : for i := range m.levels {
91 1 : l := &m.levels[i]
92 1 : l.iterKey, l.iterValue = l.iter.First()
93 1 : if l.iterKey != nil {
94 1 : item := simpleMergingIterItem{
95 1 : index: i,
96 1 : value: l.iterValue,
97 1 : }
98 1 : item.key.Trailer = l.iterKey.Trailer
99 1 : item.key.UserKey = append(item.key.UserKey[:0], l.iterKey.UserKey...)
100 1 : m.heap.items = append(m.heap.items, item)
101 1 : }
102 : }
103 1 : m.heap.init()
104 1 :
105 1 : if m.heap.len() == 0 {
106 1 : return
107 1 : }
108 1 : m.positionRangeDels()
109 : }
110 :
111 : // Positions all the rangedel iterators at or past the current top of the
112 : // heap, using SeekGE().
113 1 : func (m *simpleMergingIter) positionRangeDels() {
114 1 : item := &m.heap.items[0]
115 1 : for i := range m.levels {
116 1 : l := &m.levels[i]
117 1 : if l.rangeDelIter == nil {
118 1 : continue
119 : }
120 1 : t, err := l.rangeDelIter.SeekGE(item.key.UserKey)
121 1 : m.err = firstError(m.err, err)
122 1 : l.tombstone = t
123 : }
124 : }
125 :
126 : // Returns true if not yet done.
127 1 : func (m *simpleMergingIter) step() bool {
128 1 : if m.heap.len() == 0 || m.err != nil {
129 1 : return false
130 1 : }
131 1 : item := &m.heap.items[0]
132 1 : l := &m.levels[item.index]
133 1 : // Sentinels are not relevant for this point checking.
134 1 : if !item.key.IsExclusiveSentinel() && item.key.Visible(m.snapshot, base.InternalKeySeqNumMax) {
135 1 : m.numPoints++
136 1 : keyChanged := m.heap.cmp(item.key.UserKey, m.lastKey.UserKey) != 0
137 1 : if !keyChanged {
138 1 : // At the same user key. We will see them in decreasing seqnum
139 1 : // order so the lastLevel must not be lower.
140 1 : if m.lastLevel > item.index {
141 1 : m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
142 1 : item.key.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
143 1 : m.lastIterMsg)
144 1 : return false
145 1 : }
146 1 : m.lastLevel = item.index
147 1 : } else {
148 1 : // The user key has changed.
149 1 : m.lastKey.Trailer = item.key.Trailer
150 1 : m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.key.UserKey...)
151 1 : m.lastLevel = item.index
152 1 : }
153 : // Ongoing series of MERGE records ends with a MERGE record.
154 1 : if keyChanged && m.valueMerger != nil {
155 1 : var closer io.Closer
156 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
157 1 : if m.err == nil && closer != nil {
158 0 : m.err = closer.Close()
159 0 : }
160 1 : m.valueMerger = nil
161 : }
162 1 : itemValue, _, err := item.value.Value(nil)
163 1 : if err != nil {
164 0 : m.err = err
165 0 : return false
166 0 : }
167 1 : if m.valueMerger != nil {
168 1 : // Ongoing series of MERGE records.
169 1 : switch item.key.Kind() {
170 1 : case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
171 1 : var closer io.Closer
172 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
173 1 : if m.err == nil && closer != nil {
174 1 : m.err = closer.Close()
175 1 : }
176 1 : m.valueMerger = nil
177 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
178 1 : m.err = m.valueMerger.MergeOlder(itemValue)
179 1 : if m.err == nil {
180 1 : var closer io.Closer
181 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
182 1 : if m.err == nil && closer != nil {
183 1 : m.err = closer.Close()
184 1 : }
185 : }
186 1 : m.valueMerger = nil
187 1 : case InternalKeyKindMerge:
188 1 : m.err = m.valueMerger.MergeOlder(itemValue)
189 0 : default:
190 0 : m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
191 0 : item.key.Pretty(m.formatKey),
192 0 : l.iter)
193 0 : return false
194 : }
195 1 : } else if item.key.Kind() == InternalKeyKindMerge && m.err == nil {
196 1 : // New series of MERGE records.
197 1 : m.valueMerger, m.err = m.merge(item.key.UserKey, itemValue)
198 1 : }
199 1 : if m.err != nil {
200 1 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
201 1 : item.key.Pretty(m.formatKey), l.iter)
202 1 : return false
203 1 : }
204 : // Is this point covered by a tombstone at a lower level? Note that all these
205 : // iterators must be positioned at a key > item.key.
206 1 : for level := item.index + 1; level < len(m.levels); level++ {
207 1 : lvl := &m.levels[level]
208 1 : if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
209 1 : continue
210 : }
211 1 : if lvl.tombstone.Contains(m.heap.cmp, item.key.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.key.SeqNum()) {
212 1 : m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
213 1 : lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.key.Pretty(m.formatKey),
214 1 : l.iter)
215 1 : return false
216 1 : }
217 : }
218 : }
219 :
220 : // The iterator for the current level may be closed in the following call to
221 : // Next(). We save its debug string for potential use after it is closed -
222 : // either in this current step() invocation or on the next invocation.
223 1 : m.lastIterMsg = l.iter.String()
224 1 :
225 1 : // Step to the next point.
226 1 : if l.iterKey, l.iterValue = l.iter.Next(); l.iterKey != nil {
227 1 : // Check point keys in an sstable are ordered. Although not required, we check
228 1 : // for memtables as well. A subtle check here is that successive sstables of
229 1 : // L1 and higher levels are ordered. This happens when levelIter moves to the
230 1 : // next sstable in the level, in which case item.key is previous sstable's
231 1 : // last point key.
232 1 : if base.InternalCompare(m.heap.cmp, item.key, *l.iterKey) >= 0 {
233 1 : m.err = errors.Errorf("out of order keys %s >= %s in %s",
234 1 : item.key.Pretty(m.formatKey), l.iterKey.Pretty(m.formatKey), l.iter)
235 1 : return false
236 1 : }
237 1 : item.key.Trailer = l.iterKey.Trailer
238 1 : item.key.UserKey = append(item.key.UserKey[:0], l.iterKey.UserKey...)
239 1 : item.value = l.iterValue
240 1 : if m.heap.len() > 1 {
241 1 : m.heap.fix(0)
242 1 : }
243 1 : } else {
244 1 : m.err = l.iter.Close()
245 1 : l.iter = nil
246 1 : m.heap.pop()
247 1 : }
248 1 : if m.err != nil {
249 0 : return false
250 0 : }
251 1 : if m.heap.len() == 0 {
252 1 : // Last record was a MERGE record.
253 1 : if m.valueMerger != nil {
254 1 : var closer io.Closer
255 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
256 1 : if m.err == nil && closer != nil {
257 0 : m.err = closer.Close()
258 0 : }
259 1 : if m.err != nil {
260 1 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
261 1 : item.key.Pretty(m.formatKey), m.lastIterMsg)
262 1 : }
263 1 : m.valueMerger = nil
264 : }
265 1 : return false
266 : }
267 1 : m.positionRangeDels()
268 1 : return true
269 : }
270 :
271 : // Checking that range tombstones are mutually consistent is performed by
272 : // checkRangeTombstones(). See the overview comment at the top of the file.
273 : //
274 : // We do this check as follows:
275 : // - Collect the tombstones for each level, put them into one pool of tombstones
276 : // along with their level information (addTombstonesFromIter()).
277 : // - Collect the start and end user keys from all these tombstones
278 : // (collectAllUserKey()) and use them to fragment all the tombstones
279 : // (fragmentUsingUserKey()).
280 : // - Sort tombstones by start key and decreasing seqnum
281 : // (tombstonesByStartKeyAndSeqnum) - all tombstones that have the same start
282 : // key will have the same end key because they have been fragmented.
283 : // - Iterate and check (iterateAndCheckTombstones()).
284 : //
285 : // Note that this simple approach requires holding all the tombstones across all
286 : // levels in-memory. A more sophisticated incremental approach could be devised,
287 : // if necessary.
288 :
289 : // A tombstone and the corresponding level it was found in.
290 : type tombstoneWithLevel struct {
291 : keyspan.Span
292 : level int
293 : // The level in LSM. A -1 means it's a memtable.
294 : lsmLevel int
295 : fileNum FileNum
296 : }
297 :
298 : // For sorting tombstoneWithLevels in increasing order of start UserKey and
299 : // for the same start UserKey in decreasing order of seqnum.
300 : type tombstonesByStartKeyAndSeqnum struct {
301 : cmp Compare
302 : buf []tombstoneWithLevel
303 : }
304 :
305 1 : func (v *tombstonesByStartKeyAndSeqnum) Len() int { return len(v.buf) }
306 1 : func (v *tombstonesByStartKeyAndSeqnum) Less(i, j int) bool {
307 1 : less := v.cmp(v.buf[i].Start, v.buf[j].Start)
308 1 : if less == 0 {
309 1 : return v.buf[i].LargestSeqNum() > v.buf[j].LargestSeqNum()
310 1 : }
311 1 : return less < 0
312 : }
313 1 : func (v *tombstonesByStartKeyAndSeqnum) Swap(i, j int) {
314 1 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
315 1 : }
316 :
317 : func iterateAndCheckTombstones(
318 : cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
319 1 : ) error {
320 1 : sortBuf := tombstonesByStartKeyAndSeqnum{
321 1 : cmp: cmp,
322 1 : buf: tombstones,
323 1 : }
324 1 : sort.Sort(&sortBuf)
325 1 :
326 1 : // For a sequence of tombstones that share the same start UserKey, we will
327 1 : // encounter them in non-increasing seqnum order and so should encounter them
328 1 : // in non-decreasing level order.
329 1 : lastTombstone := tombstoneWithLevel{}
330 1 : for _, t := range tombstones {
331 1 : if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
332 1 : return errors.Errorf("encountered tombstone %s in %s"+
333 1 : " that has a lower seqnum than the same tombstone in %s",
334 1 : t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.fileNum),
335 1 : levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.fileNum))
336 1 : }
337 1 : lastTombstone = t
338 : }
339 1 : return nil
340 : }
341 :
342 : type checkConfig struct {
343 : logger Logger
344 : comparer *Comparer
345 : readState *readState
346 : newIters tableNewIters
347 : seqNum uint64
348 : stats *CheckLevelsStats
349 : merge Merge
350 : formatKey base.FormatKey
351 : }
352 :
353 : // cmp is shorthand for comparer.Compare.
354 1 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
355 :
356 1 : func checkRangeTombstones(c *checkConfig) error {
357 1 : var level int
358 1 : var tombstones []tombstoneWithLevel
359 1 : var err error
360 1 :
361 1 : memtables := c.readState.memtables
362 1 : for i := len(memtables) - 1; i >= 0; i-- {
363 1 : iter := memtables[i].newRangeDelIter(nil)
364 1 : if iter == nil {
365 1 : continue
366 : }
367 1 : tombstones, err = addTombstonesFromIter(
368 1 : iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
369 1 : )
370 1 : if err != nil {
371 0 : return err
372 0 : }
373 1 : level++
374 : }
375 :
376 1 : current := c.readState.current
377 1 : addTombstonesFromLevel := func(files manifest.LevelIterator, lsmLevel int) error {
378 1 : for f := files.First(); f != nil; f = files.Next() {
379 1 : lf := files.Take()
380 1 : //lower, upper := manifest.KeyRange(c.cmp, lf.Iter())
381 1 : iterToClose, iter, err := c.newIters.TODO(
382 1 : context.Background(), lf.FileMetadata, &IterOptions{level: manifest.Level(lsmLevel)}, internalIterOpts{})
383 1 : if err != nil {
384 0 : return err
385 0 : }
386 1 : iterToClose.Close()
387 1 : if iter == nil {
388 1 : continue
389 : }
390 1 : if tombstones, err = addTombstonesFromIter(iter, level, lsmLevel, f.FileNum,
391 1 : tombstones, c.seqNum, c.cmp, c.formatKey); err != nil {
392 1 : return err
393 1 : }
394 : }
395 1 : return nil
396 : }
397 : // Now the levels with untruncated tombsones.
398 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
399 1 : if current.L0SublevelFiles[i].Empty() {
400 0 : continue
401 : }
402 1 : err := addTombstonesFromLevel(current.L0SublevelFiles[i].Iter(), 0)
403 1 : if err != nil {
404 0 : return err
405 0 : }
406 1 : level++
407 : }
408 1 : for i := 1; i < len(current.Levels); i++ {
409 1 : if err := addTombstonesFromLevel(current.Levels[i].Iter(), i); err != nil {
410 1 : return err
411 1 : }
412 1 : level++
413 : }
414 1 : if c.stats != nil {
415 1 : c.stats.NumTombstones = len(tombstones)
416 1 : }
417 : // We now have truncated tombstones.
418 : // Fragment them all.
419 1 : userKeys := collectAllUserKeys(c.cmp, tombstones)
420 1 : tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
421 1 : return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
422 : }
423 :
424 1 : func levelOrMemtable(lsmLevel int, fileNum FileNum) string {
425 1 : if lsmLevel == -1 {
426 0 : return "memtable"
427 0 : }
428 1 : return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, fileNum)
429 : }
430 :
431 : func addTombstonesFromIter(
432 : iter keyspan.FragmentIterator,
433 : level int,
434 : lsmLevel int,
435 : fileNum FileNum,
436 : tombstones []tombstoneWithLevel,
437 : seqNum uint64,
438 : cmp Compare,
439 : formatKey base.FormatKey,
440 1 : ) (_ []tombstoneWithLevel, err error) {
441 1 : defer func() {
442 1 : err = firstError(err, iter.Close())
443 1 : }()
444 :
445 1 : var prevTombstone keyspan.Span
446 1 : tomb, err := iter.First()
447 1 : for ; tomb != nil; tomb, err = iter.Next() {
448 1 : t := tomb.Visible(seqNum)
449 1 : if t.Empty() {
450 1 : continue
451 : }
452 1 : t = t.DeepClone()
453 1 : // This is mainly a test for rangeDelV2 formatted blocks which are expected to
454 1 : // be ordered and fragmented on disk. But we anyways check for memtables,
455 1 : // rangeDelV1 as well.
456 1 : if cmp(prevTombstone.End, t.Start) > 0 {
457 1 : return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
458 1 : prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, fileNum))
459 1 : }
460 1 : prevTombstone = t
461 1 :
462 1 : if !t.Empty() {
463 1 : tombstones = append(tombstones, tombstoneWithLevel{
464 1 : Span: t,
465 1 : level: level,
466 1 : lsmLevel: lsmLevel,
467 1 : fileNum: fileNum,
468 1 : })
469 1 : }
470 : }
471 1 : if err != nil {
472 0 : return nil, err
473 0 : }
474 1 : return tombstones, nil
475 : }
476 :
477 : type userKeysSort struct {
478 : cmp Compare
479 : buf [][]byte
480 : }
481 :
482 1 : func (v *userKeysSort) Len() int { return len(v.buf) }
483 1 : func (v *userKeysSort) Less(i, j int) bool {
484 1 : return v.cmp(v.buf[i], v.buf[j]) < 0
485 1 : }
486 1 : func (v *userKeysSort) Swap(i, j int) {
487 1 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
488 1 : }
489 1 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
490 1 : keys := make([][]byte, 0, len(tombstones)*2)
491 1 : for _, t := range tombstones {
492 1 : keys = append(keys, t.Start)
493 1 : keys = append(keys, t.End)
494 1 : }
495 1 : sorter := userKeysSort{
496 1 : cmp: cmp,
497 1 : buf: keys,
498 1 : }
499 1 : sort.Sort(&sorter)
500 1 : var last, curr int
501 1 : for last, curr = -1, 0; curr < len(keys); curr++ {
502 1 : if last < 0 || cmp(keys[last], keys[curr]) != 0 {
503 1 : last++
504 1 : keys[last] = keys[curr]
505 1 : }
506 : }
507 1 : keys = keys[:last+1]
508 1 : return keys
509 : }
510 :
511 : func fragmentUsingUserKeys(
512 : cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
513 1 : ) []tombstoneWithLevel {
514 1 : var buf []tombstoneWithLevel
515 1 : for _, t := range tombstones {
516 1 : // Find the first position with tombstone start < user key
517 1 : i := sort.Search(len(userKeys), func(i int) bool {
518 1 : return cmp(t.Start, userKeys[i]) < 0
519 1 : })
520 1 : for ; i < len(userKeys); i++ {
521 1 : if cmp(userKeys[i], t.End) >= 0 {
522 1 : break
523 : }
524 1 : tPartial := t
525 1 : tPartial.End = userKeys[i]
526 1 : buf = append(buf, tPartial)
527 1 : t.Start = userKeys[i]
528 : }
529 1 : buf = append(buf, t)
530 : }
531 1 : return buf
532 : }
533 :
534 : // CheckLevelsStats provides basic stats on points and tombstones encountered.
535 : type CheckLevelsStats struct {
536 : NumPoints int64
537 : NumTombstones int
538 : }
539 :
540 : // CheckLevels checks:
541 : // - Every entry in the DB is consistent with the level invariant. See the
542 : // comment at the top of the file.
543 : // - Point keys in sstables are ordered.
544 : // - Range delete tombstones in sstables are ordered and fragmented.
545 : // - Successful processing of all MERGE records.
546 1 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
547 1 : // Grab and reference the current readState.
548 1 : readState := d.loadReadState()
549 1 : defer readState.unref()
550 1 :
551 1 : // Determine the seqnum to read at after grabbing the read state (current and
552 1 : // memtables) above.
553 1 : seqNum := d.mu.versions.visibleSeqNum.Load()
554 1 :
555 1 : checkConfig := &checkConfig{
556 1 : logger: d.opts.Logger,
557 1 : comparer: d.opts.Comparer,
558 1 : readState: readState,
559 1 : newIters: d.newIters,
560 1 : seqNum: seqNum,
561 1 : stats: stats,
562 1 : merge: d.merge,
563 1 : formatKey: d.opts.Comparer.FormatKey,
564 1 : }
565 1 : return checkLevelsInternal(checkConfig)
566 1 : }
567 :
568 1 : func checkLevelsInternal(c *checkConfig) (err error) {
569 1 : // Phase 1: Use a simpleMergingIter to step through all the points and ensure
570 1 : // that points with the same user key at different levels are not inverted
571 1 : // wrt sequence numbers and the same holds for tombstones that cover points.
572 1 : // To do this, one needs to construct a simpleMergingIter which is similar to
573 1 : // how one constructs a mergingIter.
574 1 :
575 1 : // Add mem tables from newest to oldest.
576 1 : var mlevels []simpleMergingIterLevel
577 1 : defer func() {
578 1 : for i := range mlevels {
579 1 : l := &mlevels[i]
580 1 : if l.iter != nil {
581 1 : err = firstError(err, l.iter.Close())
582 1 : l.iter = nil
583 1 : }
584 1 : if l.rangeDelIter != nil {
585 1 : err = firstError(err, l.rangeDelIter.Close())
586 1 : l.rangeDelIter = nil
587 1 : }
588 : }
589 : }()
590 :
591 1 : memtables := c.readState.memtables
592 1 : for i := len(memtables) - 1; i >= 0; i-- {
593 1 : mem := memtables[i]
594 1 : mlevels = append(mlevels, simpleMergingIterLevel{
595 1 : iter: mem.newIter(nil),
596 1 : rangeDelIter: mem.newRangeDelIter(nil),
597 1 : })
598 1 : }
599 :
600 1 : current := c.readState.current
601 1 : // Determine the final size for mlevels so that there are no more
602 1 : // reallocations. levelIter will hold a pointer to elements in mlevels.
603 1 : start := len(mlevels)
604 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
605 1 : if current.L0SublevelFiles[sublevel].Empty() {
606 0 : continue
607 : }
608 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
609 : }
610 1 : for level := 1; level < len(current.Levels); level++ {
611 1 : if current.Levels[level].Empty() {
612 1 : continue
613 : }
614 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
615 : }
616 1 : mlevelAlloc := mlevels[start:]
617 1 : // Add L0 files by sublevel.
618 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
619 1 : if current.L0SublevelFiles[sublevel].Empty() {
620 0 : continue
621 : }
622 1 : manifestIter := current.L0SublevelFiles[sublevel].Iter()
623 1 : iterOpts := IterOptions{logger: c.logger}
624 1 : li := &levelIter{}
625 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
626 1 : manifest.L0Sublevel(sublevel), internalIterOpts{})
627 1 : li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
628 1 : li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
629 1 : mlevelAlloc[0].iter = li
630 1 : mlevelAlloc = mlevelAlloc[1:]
631 : }
632 1 : for level := 1; level < len(current.Levels); level++ {
633 1 : if current.Levels[level].Empty() {
634 1 : continue
635 : }
636 :
637 1 : iterOpts := IterOptions{logger: c.logger}
638 1 : li := &levelIter{}
639 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters,
640 1 : current.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
641 1 : li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
642 1 : li.initBoundaryContext(&mlevelAlloc[0].levelIterBoundaryContext)
643 1 : mlevelAlloc[0].iter = li
644 1 : mlevelAlloc = mlevelAlloc[1:]
645 : }
646 :
647 1 : mergingIter := &simpleMergingIter{}
648 1 : mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
649 1 : for cont := mergingIter.step(); cont; cont = mergingIter.step() {
650 1 : }
651 1 : if err := mergingIter.err; err != nil {
652 1 : return err
653 1 : }
654 1 : if c.stats != nil {
655 1 : c.stats.NumPoints = mergingIter.numPoints
656 1 : }
657 :
658 : // Phase 2: Check that the tombstones are mutually consistent.
659 1 : return checkRangeTombstones(c)
660 : }
661 :
662 : type simpleMergingIterItem struct {
663 : index int
664 : key InternalKey
665 : value base.LazyValue
666 : }
667 :
668 : type simpleMergingIterHeap struct {
669 : cmp Compare
670 : reverse bool
671 : items []simpleMergingIterItem
672 : }
673 :
674 1 : func (h *simpleMergingIterHeap) len() int {
675 1 : return len(h.items)
676 1 : }
677 :
678 1 : func (h *simpleMergingIterHeap) less(i, j int) bool {
679 1 : ikey, jkey := h.items[i].key, h.items[j].key
680 1 : if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
681 1 : if h.reverse {
682 0 : return c > 0
683 0 : }
684 1 : return c < 0
685 : }
686 1 : if h.reverse {
687 0 : return ikey.Trailer < jkey.Trailer
688 0 : }
689 1 : return ikey.Trailer > jkey.Trailer
690 : }
691 :
692 1 : func (h *simpleMergingIterHeap) swap(i, j int) {
693 1 : h.items[i], h.items[j] = h.items[j], h.items[i]
694 1 : }
695 :
696 : // init, fix, up and down are copied from the go stdlib.
697 1 : func (h *simpleMergingIterHeap) init() {
698 1 : // heapify
699 1 : n := h.len()
700 1 : for i := n/2 - 1; i >= 0; i-- {
701 1 : h.down(i, n)
702 1 : }
703 : }
704 :
705 1 : func (h *simpleMergingIterHeap) fix(i int) {
706 1 : if !h.down(i, h.len()) {
707 1 : h.up(i)
708 1 : }
709 : }
710 :
711 1 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
712 1 : n := h.len() - 1
713 1 : h.swap(0, n)
714 1 : h.down(0, n)
715 1 : item := &h.items[n]
716 1 : h.items = h.items[:n]
717 1 : return item
718 1 : }
719 :
720 1 : func (h *simpleMergingIterHeap) up(j int) {
721 1 : for {
722 1 : i := (j - 1) / 2 // parent
723 1 : if i == j || !h.less(j, i) {
724 1 : break
725 : }
726 0 : h.swap(i, j)
727 0 : j = i
728 : }
729 : }
730 :
731 1 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
732 1 : i := i0
733 1 : for {
734 1 : j1 := 2*i + 1
735 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
736 1 : break
737 : }
738 1 : j := j1 // left child
739 1 : if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
740 1 : j = j2 // = 2*i + 2 // right child
741 1 : }
742 1 : if !h.less(j, i) {
743 1 : break
744 : }
745 1 : h.swap(i, j)
746 1 : i = j
747 : }
748 1 : return i > i0
749 : }
|