Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : stdcmp "cmp"
9 : "context"
10 : "fmt"
11 : "io"
12 : "iter"
13 : "slices"
14 : "sort"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/keyspan"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/sstable"
21 : "github.com/cockroachdb/pebble/sstable/blob"
22 : "github.com/cockroachdb/pebble/sstable/block"
23 : )
24 :
25 : // This file implements DB.CheckLevels() which checks that every entry in the
26 : // DB is consistent with respect to the level invariant: any point (or the
27 : // infinite number of points in a range tombstone) has a seqnum such that a
28 : // point with the same UserKey at a lower level has a lower seqnum. This is an
29 : // expensive check since it involves iterating over all the entries in the DB,
30 : // hence only intended for tests or tools.
31 : //
32 : // If we ignore range tombstones, the consistency checking of points can be
33 : // done with a simplified version of mergingIter. simpleMergingIter is that
34 : // simplified version of mergingIter that only needs to step through points
35 : // (analogous to only doing Next()). It can also easily accommodate
36 : // consistency checking of points relative to range tombstones.
37 : // simpleMergingIter does not do any seek optimizations present in mergingIter
38 : // (it minimally needs to seek the range delete iterators to position them at
39 : // or past the current point) since it does not want to miss points for
40 : // purposes of consistency checking.
41 : //
42 : // Mutual consistency of range tombstones is non-trivial to check. One needs
43 : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
44 : // a lower level. The start key of the former is not contained in the latter
45 : // and we can't use the exclusive end key, c, for a containment check since it
46 : // is the sentinel key. We observe that if these tombstones were fragmented
47 : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
48 : // [b, c)#10 at the lower level and then it is is trivial to compare the two
49 : // [b, c) tombstones. Note that this fragmentation needs to take into account
50 : // that tombstones in a file may be untruncated and need to act within the
51 : // bounds of the file. This checking is performed by checkRangeTombstones()
52 : // and its helper functions.
53 :
54 : // The per-level structure used by simpleMergingIter.
55 : type simpleMergingIterLevel struct {
56 : iter internalIterator
57 : rangeDelIter keyspan.FragmentIterator
58 :
59 : iterKV *base.InternalKV
60 : tombstone *keyspan.Span
61 : }
62 :
63 1 : func (ml *simpleMergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
64 1 : ml.tombstone = nil
65 1 : if ml.rangeDelIter != nil {
66 1 : ml.rangeDelIter.Close()
67 1 : }
68 1 : ml.rangeDelIter = iter
69 : }
70 :
71 : type simpleMergingIter struct {
72 : levels []simpleMergingIterLevel
73 : snapshot base.SeqNum
74 : heap simpleMergingIterHeap
75 : // The last point's key and level. For validation.
76 : lastKey InternalKey
77 : lastLevel int
78 : lastIterMsg string
79 : // A non-nil valueMerger means MERGE record processing is ongoing.
80 : valueMerger base.ValueMerger
81 : // The first error will cause step() to return false.
82 : err error
83 : numPoints int64
84 : merge Merge
85 : formatKey base.FormatKey
86 : }
87 :
88 : func (m *simpleMergingIter) init(
89 : merge Merge,
90 : cmp Compare,
91 : snapshot base.SeqNum,
92 : formatKey base.FormatKey,
93 : levels ...simpleMergingIterLevel,
94 1 : ) {
95 1 : m.levels = levels
96 1 : m.formatKey = formatKey
97 1 : m.merge = merge
98 1 : m.snapshot = snapshot
99 1 : m.lastLevel = -1
100 1 : m.heap.cmp = cmp
101 1 : m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
102 1 : for i := range m.levels {
103 1 : l := &m.levels[i]
104 1 : l.iterKV = l.iter.First()
105 1 : if l.iterKV != nil {
106 1 : item := simpleMergingIterItem{
107 1 : index: i,
108 1 : kv: *l.iterKV,
109 1 : }
110 1 : item.kv.K = l.iterKV.K.Clone()
111 1 : m.heap.items = append(m.heap.items, item)
112 1 : }
113 : }
114 1 : m.heap.init()
115 1 :
116 1 : if m.heap.len() == 0 {
117 1 : return
118 1 : }
119 1 : m.positionRangeDels()
120 : }
121 :
122 : // Positions all the rangedel iterators at or past the current top of the
123 : // heap, using SeekGE().
124 1 : func (m *simpleMergingIter) positionRangeDels() {
125 1 : item := &m.heap.items[0]
126 1 : for i := range m.levels {
127 1 : l := &m.levels[i]
128 1 : if l.rangeDelIter == nil {
129 1 : continue
130 : }
131 1 : t, err := l.rangeDelIter.SeekGE(item.kv.K.UserKey)
132 1 : m.err = firstError(m.err, err)
133 1 : l.tombstone = t
134 : }
135 : }
136 :
137 : // Returns true if not yet done.
138 1 : func (m *simpleMergingIter) step() bool {
139 1 : if m.heap.len() == 0 || m.err != nil {
140 1 : return false
141 1 : }
142 1 : item := &m.heap.items[0]
143 1 : l := &m.levels[item.index]
144 1 : // Sentinels are not relevant for this point checking.
145 1 : if !item.kv.K.IsExclusiveSentinel() && item.kv.K.Visible(m.snapshot, base.SeqNumMax) {
146 1 : // This is a visible point key.
147 1 : if !m.handleVisiblePoint(item, l) {
148 0 : return false
149 0 : }
150 : }
151 :
152 : // The iterator for the current level may be closed in the following call to
153 : // Next(). We save its debug string for potential use after it is closed -
154 : // either in this current step() invocation or on the next invocation.
155 1 : m.lastIterMsg = l.iter.String()
156 1 :
157 1 : // Step to the next point.
158 1 : l.iterKV = l.iter.Next()
159 1 : if l.iterKV == nil {
160 1 : m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
161 1 : l.iter = nil
162 1 : m.heap.pop()
163 1 : } else {
164 1 : // Check point keys in an sstable are ordered. Although not required, we check
165 1 : // for memtables as well. A subtle check here is that successive sstables of
166 1 : // L1 and higher levels are ordered. This happens when levelIter moves to the
167 1 : // next sstable in the level, in which case item.key is previous sstable's
168 1 : // last point key.
169 1 : if !l.iterKV.K.IsExclusiveSentinel() && base.InternalCompare(m.heap.cmp, item.kv.K, l.iterKV.K) >= 0 {
170 0 : m.err = errors.Errorf("out of order keys %s >= %s in %s",
171 0 : item.kv.K.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
172 0 : return false
173 0 : }
174 1 : userKeyBuf := item.kv.K.UserKey[:0]
175 1 : item.kv = *l.iterKV
176 1 : item.kv.K.UserKey = append(userKeyBuf, l.iterKV.K.UserKey...)
177 1 : if m.heap.len() > 1 {
178 1 : m.heap.fix(0)
179 1 : }
180 : }
181 1 : if m.err != nil {
182 0 : return false
183 0 : }
184 1 : if m.heap.len() == 0 {
185 1 : // If m.valueMerger != nil, the last record was a MERGE record.
186 1 : if m.valueMerger != nil {
187 1 : var closer io.Closer
188 1 : var err error
189 1 : _, closer, err = m.valueMerger.Finish(true /* includesBase */)
190 1 : if closer != nil {
191 0 : err = errors.CombineErrors(err, closer.Close())
192 0 : }
193 1 : if err != nil {
194 0 : m.err = errors.CombineErrors(m.err,
195 0 : errors.Wrapf(err, "merge processing error on key %s in %s",
196 0 : item.kv.K.Pretty(m.formatKey), m.lastIterMsg))
197 0 : }
198 1 : m.valueMerger = nil
199 : }
200 1 : return false
201 : }
202 1 : m.positionRangeDels()
203 1 : return true
204 : }
205 :
206 : // handleVisiblePoint returns true if validation succeeded and level checking
207 : // can continue.
208 : func (m *simpleMergingIter) handleVisiblePoint(
209 : item *simpleMergingIterItem, l *simpleMergingIterLevel,
210 1 : ) (ok bool) {
211 1 : m.numPoints++
212 1 : keyChanged := m.heap.cmp(item.kv.K.UserKey, m.lastKey.UserKey) != 0
213 1 : if !keyChanged {
214 1 : // At the same user key. We will see them in decreasing seqnum
215 1 : // order so the lastLevel must not be lower.
216 1 : if m.lastLevel > item.index {
217 0 : m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
218 0 : item.kv.K.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
219 0 : m.lastIterMsg)
220 0 : return false
221 0 : }
222 1 : m.lastLevel = item.index
223 1 : } else {
224 1 : // The user key has changed.
225 1 : m.lastKey.Trailer = item.kv.K.Trailer
226 1 : m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.kv.K.UserKey...)
227 1 : m.lastLevel = item.index
228 1 : }
229 : // Ongoing series of MERGE records ends with a MERGE record.
230 1 : if keyChanged && m.valueMerger != nil {
231 1 : var closer io.Closer
232 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
233 1 : if m.err == nil && closer != nil {
234 0 : m.err = closer.Close()
235 0 : }
236 1 : m.valueMerger = nil
237 : }
238 1 : itemValue, _, err := item.kv.Value(nil)
239 1 : if err != nil {
240 0 : m.err = err
241 0 : return false
242 0 : }
243 1 : if m.valueMerger != nil {
244 1 : // Ongoing series of MERGE records.
245 1 : switch item.kv.K.Kind() {
246 1 : case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
247 1 : var closer io.Closer
248 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
249 1 : if m.err == nil && closer != nil {
250 0 : m.err = closer.Close()
251 0 : }
252 1 : m.valueMerger = nil
253 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
254 1 : m.err = m.valueMerger.MergeOlder(itemValue)
255 1 : if m.err == nil {
256 1 : var closer io.Closer
257 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
258 1 : if m.err == nil && closer != nil {
259 0 : m.err = closer.Close()
260 0 : }
261 : }
262 1 : m.valueMerger = nil
263 1 : case InternalKeyKindMerge:
264 1 : m.err = m.valueMerger.MergeOlder(itemValue)
265 0 : default:
266 0 : m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
267 0 : item.kv.K.Pretty(m.formatKey),
268 0 : l.iter)
269 0 : return false
270 : }
271 1 : } else if item.kv.K.Kind() == InternalKeyKindMerge && m.err == nil {
272 1 : // New series of MERGE records.
273 1 : m.valueMerger, m.err = m.merge(item.kv.K.UserKey, itemValue)
274 1 : }
275 1 : if m.err != nil {
276 0 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
277 0 : item.kv.K.Pretty(m.formatKey), l.iter)
278 0 : return false
279 0 : }
280 : // Is this point covered by a tombstone at a lower level? Note that all these
281 : // iterators must be positioned at a key > item.key.
282 1 : for level := item.index + 1; level < len(m.levels); level++ {
283 1 : lvl := &m.levels[level]
284 1 : if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
285 1 : continue
286 : }
287 1 : if lvl.tombstone.Contains(m.heap.cmp, item.kv.K.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.kv.K.SeqNum()) {
288 0 : m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
289 0 : lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.kv.K.Pretty(m.formatKey),
290 0 : l.iter)
291 0 : return false
292 0 : }
293 : }
294 1 : return true
295 : }
296 :
297 : // Checking that range tombstones are mutually consistent is performed by
298 : // checkRangeTombstones(). See the overview comment at the top of the file.
299 : //
300 : // We do this check as follows:
301 : // - Collect the tombstones for each level, put them into one pool of tombstones
302 : // along with their level information (addTombstonesFromIter()).
303 : // - Collect the start and end user keys from all these tombstones
304 : // (collectAllUserKey()) and use them to fragment all the tombstones
305 : // (fragmentUsingUserKey()).
306 : // - Sort tombstones by start key and decreasing seqnum (all tombstones that
307 : // have the same start key will have the same end key because they have been
308 : // fragmented)
309 : // - Iterate and check (iterateAndCheckTombstones()).
310 : //
311 : // Note that this simple approach requires holding all the tombstones across all
312 : // levels in-memory. A more sophisticated incremental approach could be devised,
313 : // if necessary.
314 :
315 : // A tombstone and the corresponding level it was found in.
316 : type tombstoneWithLevel struct {
317 : keyspan.Span
318 : level int
319 : // The level in LSM. A -1 means it's a memtable.
320 : lsmLevel int
321 : fileNum base.FileNum
322 : }
323 :
324 : func iterateAndCheckTombstones(
325 : cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
326 1 : ) error {
327 1 : slices.SortFunc(tombstones, func(a, b tombstoneWithLevel) int {
328 1 : if v := cmp(a.Start, b.Start); v != 0 {
329 1 : return v
330 1 : }
331 1 : return stdcmp.Compare(b.LargestSeqNum(), a.LargestSeqNum())
332 : })
333 :
334 : // For a sequence of tombstones that share the same start UserKey, we will
335 : // encounter them in non-increasing seqnum order and so should encounter them
336 : // in non-decreasing level order.
337 1 : lastTombstone := tombstoneWithLevel{}
338 1 : for _, t := range tombstones {
339 1 : if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
340 0 : return errors.Errorf("encountered tombstone %s in %s"+
341 0 : " that has a lower seqnum than the same tombstone in %s",
342 0 : t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.fileNum),
343 0 : levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.fileNum))
344 0 : }
345 1 : lastTombstone = t
346 : }
347 1 : return nil
348 : }
349 :
350 : type checkConfig struct {
351 : logger Logger
352 : comparer *Comparer
353 : readState *readState
354 : newIters tableNewIters
355 : seqNum base.SeqNum
356 : stats *CheckLevelsStats
357 : merge Merge
358 : formatKey base.FormatKey
359 : readEnv block.ReadEnv
360 : // blobValueFetcher is the ValueFetcher to use when retrieving values stored
361 : // externally in blob files.
362 : blobValueFetcher blob.ValueFetcher
363 : }
364 :
365 : // cmp is shorthand for comparer.Compare.
366 1 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
367 :
368 1 : func checkRangeTombstones(c *checkConfig) error {
369 1 : var level int
370 1 : var tombstones []tombstoneWithLevel
371 1 : var err error
372 1 :
373 1 : memtables := c.readState.memtables
374 1 : for i := len(memtables) - 1; i >= 0; i-- {
375 1 : iter := memtables[i].newRangeDelIter(nil)
376 1 : if iter == nil {
377 1 : continue
378 : }
379 1 : tombstones, err = addTombstonesFromIter(
380 1 : iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
381 1 : )
382 1 : iter.Close()
383 1 : if err != nil {
384 0 : return err
385 0 : }
386 1 : level++
387 : }
388 :
389 1 : current := c.readState.current
390 1 : addTombstonesFromLevel := func(files iter.Seq[*manifest.TableMetadata], lsmLevel int) error {
391 1 : for f := range files {
392 1 : iters, err := c.newIters(
393 1 : context.Background(), f, &IterOptions{layer: manifest.Level(lsmLevel)},
394 1 : internalIterOpts{}, iterRangeDeletions)
395 1 : if err != nil {
396 0 : return err
397 0 : }
398 1 : tombstones, err = addTombstonesFromIter(iters.RangeDeletion(), level, lsmLevel, f.FileNum,
399 1 : tombstones, c.seqNum, c.cmp, c.formatKey)
400 1 : _ = iters.CloseAll()
401 1 :
402 1 : if err != nil {
403 0 : return err
404 0 : }
405 : }
406 1 : return nil
407 : }
408 : // Now the levels with untruncated tombsones.
409 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
410 1 : if current.L0SublevelFiles[i].Empty() {
411 0 : continue
412 : }
413 1 : err := addTombstonesFromLevel(current.L0SublevelFiles[i].All(), 0)
414 1 : if err != nil {
415 0 : return err
416 0 : }
417 1 : level++
418 : }
419 1 : for i := 1; i < len(current.Levels); i++ {
420 1 : if err := addTombstonesFromLevel(current.Levels[i].All(), i); err != nil {
421 0 : return err
422 0 : }
423 1 : level++
424 : }
425 1 : if c.stats != nil {
426 1 : c.stats.NumTombstones = len(tombstones)
427 1 : }
428 : // We now have truncated tombstones.
429 : // Fragment them all.
430 1 : userKeys := collectAllUserKeys(c.cmp, tombstones)
431 1 : tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
432 1 : return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
433 : }
434 :
435 0 : func levelOrMemtable(lsmLevel int, fileNum base.FileNum) string {
436 0 : if lsmLevel == -1 {
437 0 : return "memtable"
438 0 : }
439 0 : return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, fileNum)
440 : }
441 :
442 : func addTombstonesFromIter(
443 : iter keyspan.FragmentIterator,
444 : level int,
445 : lsmLevel int,
446 : fileNum base.FileNum,
447 : tombstones []tombstoneWithLevel,
448 : seqNum base.SeqNum,
449 : cmp Compare,
450 : formatKey base.FormatKey,
451 1 : ) (_ []tombstoneWithLevel, err error) {
452 1 : var prevTombstone keyspan.Span
453 1 : tomb, err := iter.First()
454 1 : for ; tomb != nil; tomb, err = iter.Next() {
455 1 : t := tomb.Visible(seqNum)
456 1 : if t.Empty() {
457 1 : continue
458 : }
459 1 : t = t.Clone()
460 1 : // This is mainly a test for rangeDelV2 formatted blocks which are expected to
461 1 : // be ordered and fragmented on disk. But we anyways check for memtables,
462 1 : // rangeDelV1 as well.
463 1 : if cmp(prevTombstone.End, t.Start) > 0 {
464 0 : return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
465 0 : prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, fileNum))
466 0 : }
467 1 : prevTombstone = t
468 1 :
469 1 : if !t.Empty() {
470 1 : tombstones = append(tombstones, tombstoneWithLevel{
471 1 : Span: t,
472 1 : level: level,
473 1 : lsmLevel: lsmLevel,
474 1 : fileNum: fileNum,
475 1 : })
476 1 : }
477 : }
478 1 : if err != nil {
479 0 : return nil, err
480 0 : }
481 1 : return tombstones, nil
482 : }
483 :
484 1 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
485 1 : keys := make([][]byte, 0, len(tombstones)*2)
486 1 : for _, t := range tombstones {
487 1 : keys = append(keys, t.Start, t.End)
488 1 : }
489 1 : slices.SortFunc(keys, cmp)
490 1 : return slices.CompactFunc(keys, func(a, b []byte) bool {
491 1 : return cmp(a, b) == 0
492 1 : })
493 : }
494 :
495 : func fragmentUsingUserKeys(
496 : cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
497 1 : ) []tombstoneWithLevel {
498 1 : var buf []tombstoneWithLevel
499 1 : for _, t := range tombstones {
500 1 : // Find the first position with tombstone start < user key
501 1 : i := sort.Search(len(userKeys), func(i int) bool {
502 1 : return cmp(t.Start, userKeys[i]) < 0
503 1 : })
504 1 : for ; i < len(userKeys); i++ {
505 1 : if cmp(userKeys[i], t.End) >= 0 {
506 1 : break
507 : }
508 1 : tPartial := t
509 1 : tPartial.End = userKeys[i]
510 1 : buf = append(buf, tPartial)
511 1 : t.Start = userKeys[i]
512 : }
513 1 : buf = append(buf, t)
514 : }
515 1 : return buf
516 : }
517 :
518 : // CheckLevelsStats provides basic stats on points and tombstones encountered.
519 : type CheckLevelsStats struct {
520 : NumPoints int64
521 : NumTombstones int
522 : }
523 :
524 : // CheckLevels checks:
525 : // - Every entry in the DB is consistent with the level invariant. See the
526 : // comment at the top of the file.
527 : // - Point keys in sstables are ordered.
528 : // - Range delete tombstones in sstables are ordered and fragmented.
529 : // - Successful processing of all MERGE records.
530 1 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
531 1 : // Grab and reference the current readState.
532 1 : readState := d.loadReadState()
533 1 : defer readState.unref()
534 1 :
535 1 : // Determine the seqnum to read at after grabbing the read state (current and
536 1 : // memtables) above.
537 1 : seqNum := d.mu.versions.visibleSeqNum.Load()
538 1 :
539 1 : checkConfig := &checkConfig{
540 1 : logger: d.opts.Logger,
541 1 : comparer: d.opts.Comparer,
542 1 : readState: readState,
543 1 : newIters: d.newIters,
544 1 : seqNum: seqNum,
545 1 : stats: stats,
546 1 : merge: d.merge,
547 1 : formatKey: d.opts.Comparer.FormatKey,
548 1 : readEnv: block.ReadEnv{
549 1 : // TODO(jackson): Add categorized stats.
550 1 : },
551 1 : }
552 1 : checkConfig.blobValueFetcher.Init(d.fileCache, checkConfig.readEnv)
553 1 : defer func() { _ = checkConfig.blobValueFetcher.Close() }()
554 1 : return checkLevelsInternal(checkConfig)
555 : }
556 :
557 1 : func checkLevelsInternal(c *checkConfig) (err error) {
558 1 : internalOpts := internalIterOpts{
559 1 : readEnv: sstable.ReadEnv{Block: c.readEnv},
560 1 : blobValueFetcher: &c.blobValueFetcher,
561 1 : }
562 1 :
563 1 : // Phase 1: Use a simpleMergingIter to step through all the points and ensure
564 1 : // that points with the same user key at different levels are not inverted
565 1 : // wrt sequence numbers and the same holds for tombstones that cover points.
566 1 : // To do this, one needs to construct a simpleMergingIter which is similar to
567 1 : // how one constructs a mergingIter.
568 1 :
569 1 : // Add mem tables from newest to oldest.
570 1 : var mlevels []simpleMergingIterLevel
571 1 : defer func() {
572 1 : for i := range mlevels {
573 1 : l := &mlevels[i]
574 1 : if l.iter != nil {
575 1 : err = firstError(err, l.iter.Close())
576 1 : l.iter = nil
577 1 : }
578 1 : if l.rangeDelIter != nil {
579 1 : l.rangeDelIter.Close()
580 1 : l.rangeDelIter = nil
581 1 : }
582 : }
583 : }()
584 :
585 1 : memtables := c.readState.memtables
586 1 : for i := len(memtables) - 1; i >= 0; i-- {
587 1 : mem := memtables[i]
588 1 : mlevels = append(mlevels, simpleMergingIterLevel{
589 1 : iter: mem.newIter(nil),
590 1 : rangeDelIter: mem.newRangeDelIter(nil),
591 1 : })
592 1 : }
593 :
594 1 : current := c.readState.current
595 1 : // Determine the final size for mlevels so that there are no more
596 1 : // reallocations. levelIter will hold a pointer to elements in mlevels.
597 1 : start := len(mlevels)
598 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
599 1 : if current.L0SublevelFiles[sublevel].Empty() {
600 0 : continue
601 : }
602 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
603 : }
604 1 : for level := 1; level < len(current.Levels); level++ {
605 1 : if current.Levels[level].Empty() {
606 1 : continue
607 : }
608 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
609 : }
610 1 : mlevelAlloc := mlevels[start:]
611 1 : // Add L0 files by sublevel.
612 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
613 1 : if current.L0SublevelFiles[sublevel].Empty() {
614 0 : continue
615 : }
616 1 : manifestIter := current.L0SublevelFiles[sublevel].Iter()
617 1 : iterOpts := IterOptions{logger: c.logger}
618 1 : li := &levelIter{}
619 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
620 1 : manifest.L0Sublevel(sublevel), internalOpts)
621 1 : li.initRangeDel(&mlevelAlloc[0])
622 1 : mlevelAlloc[0].iter = li
623 1 : mlevelAlloc = mlevelAlloc[1:]
624 : }
625 1 : for level := 1; level < len(current.Levels); level++ {
626 1 : if current.Levels[level].Empty() {
627 1 : continue
628 : }
629 :
630 1 : iterOpts := IterOptions{logger: c.logger}
631 1 : li := &levelIter{}
632 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters,
633 1 : current.Levels[level].Iter(), manifest.Level(level), internalOpts)
634 1 : li.initRangeDel(&mlevelAlloc[0])
635 1 : mlevelAlloc[0].iter = li
636 1 : mlevelAlloc = mlevelAlloc[1:]
637 : }
638 :
639 1 : mergingIter := &simpleMergingIter{}
640 1 : mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
641 1 : for cont := mergingIter.step(); cont; cont = mergingIter.step() {
642 1 : }
643 1 : if err := mergingIter.err; err != nil {
644 0 : return err
645 0 : }
646 1 : if c.stats != nil {
647 1 : c.stats.NumPoints = mergingIter.numPoints
648 1 : }
649 :
650 : // Phase 2: Check that the tombstones are mutually consistent.
651 1 : return checkRangeTombstones(c)
652 : }
653 :
654 : type simpleMergingIterItem struct {
655 : index int
656 : kv base.InternalKV
657 : }
658 :
659 : type simpleMergingIterHeap struct {
660 : cmp Compare
661 : reverse bool
662 : items []simpleMergingIterItem
663 : }
664 :
665 1 : func (h *simpleMergingIterHeap) len() int {
666 1 : return len(h.items)
667 1 : }
668 :
669 1 : func (h *simpleMergingIterHeap) less(i, j int) bool {
670 1 : ikey, jkey := h.items[i].kv.K, h.items[j].kv.K
671 1 : if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
672 1 : if h.reverse {
673 0 : return c > 0
674 0 : }
675 1 : return c < 0
676 : }
677 1 : if h.reverse {
678 0 : return ikey.Trailer < jkey.Trailer
679 0 : }
680 1 : return ikey.Trailer > jkey.Trailer
681 : }
682 :
683 1 : func (h *simpleMergingIterHeap) swap(i, j int) {
684 1 : h.items[i], h.items[j] = h.items[j], h.items[i]
685 1 : }
686 :
687 : // init, fix, up and down are copied from the go stdlib.
688 1 : func (h *simpleMergingIterHeap) init() {
689 1 : // heapify
690 1 : n := h.len()
691 1 : for i := n/2 - 1; i >= 0; i-- {
692 1 : h.down(i, n)
693 1 : }
694 : }
695 :
696 1 : func (h *simpleMergingIterHeap) fix(i int) {
697 1 : if !h.down(i, h.len()) {
698 1 : h.up(i)
699 1 : }
700 : }
701 :
702 1 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
703 1 : n := h.len() - 1
704 1 : h.swap(0, n)
705 1 : h.down(0, n)
706 1 : item := &h.items[n]
707 1 : h.items = h.items[:n]
708 1 : return item
709 1 : }
710 :
711 1 : func (h *simpleMergingIterHeap) up(j int) {
712 1 : for {
713 1 : i := (j - 1) / 2 // parent
714 1 : if i == j || !h.less(j, i) {
715 1 : break
716 : }
717 0 : h.swap(i, j)
718 0 : j = i
719 : }
720 : }
721 :
722 1 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
723 1 : i := i0
724 1 : for {
725 1 : j1 := 2*i + 1
726 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
727 1 : break
728 : }
729 1 : j := j1 // left child
730 1 : if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
731 1 : j = j2 // = 2*i + 2 // right child
732 1 : }
733 1 : if !h.less(j, i) {
734 1 : break
735 : }
736 1 : h.swap(i, j)
737 1 : i = j
738 : }
739 1 : return i > i0
740 : }
|