Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : stdcmp "cmp"
9 : "context"
10 : "fmt"
11 : "io"
12 : "iter"
13 : "maps"
14 : "slices"
15 : "sort"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/manifest"
21 : "github.com/cockroachdb/pebble/sstable"
22 : "github.com/cockroachdb/pebble/sstable/blob"
23 : "github.com/cockroachdb/pebble/sstable/block"
24 : "github.com/cockroachdb/pebble/sstable/colblk"
25 : )
26 :
27 : // This file implements DB.CheckLevels() which checks that every entry in the
28 : // DB is consistent with respect to the level invariant: any point (or the
29 : // infinite number of points in a range tombstone) has a seqnum such that a
30 : // point with the same UserKey at a lower level has a lower seqnum. This is an
31 : // expensive check since it involves iterating over all the entries in the DB,
32 : // hence only intended for tests or tools.
33 : //
34 : // If we ignore range tombstones, the consistency checking of points can be
35 : // done with a simplified version of mergingIter. simpleMergingIter is that
36 : // simplified version of mergingIter that only needs to step through points
37 : // (analogous to only doing Next()). It can also easily accommodate
38 : // consistency checking of points relative to range tombstones.
39 : // simpleMergingIter does not do any seek optimizations present in mergingIter
40 : // (it minimally needs to seek the range delete iterators to position them at
41 : // or past the current point) since it does not want to miss points for
42 : // purposes of consistency checking.
43 : //
44 : // Mutual consistency of range tombstones is non-trivial to check. One needs
45 : // to detect inversions of the form [a, c)#8 at higher level and [b, c)#10 at
46 : // a lower level. The start key of the former is not contained in the latter
47 : // and we can't use the exclusive end key, c, for a containment check since it
48 : // is the sentinel key. We observe that if these tombstones were fragmented
49 : // wrt each other we would have [a, b)#8 and [b, c)#8 at the higher level and
50 : // [b, c)#10 at the lower level and then it is is trivial to compare the two
51 : // [b, c) tombstones. Note that this fragmentation needs to take into account
52 : // that tombstones in a file may be untruncated and need to act within the
53 : // bounds of the file. This checking is performed by checkRangeTombstones()
54 : // and its helper functions.
55 :
56 : // The per-level structure used by simpleMergingIter.
57 : type simpleMergingIterLevel struct {
58 : iter internalIterator
59 : rangeDelIter keyspan.FragmentIterator
60 :
61 : iterKV *base.InternalKV
62 : tombstone *keyspan.Span
63 : }
64 :
65 1 : func (ml *simpleMergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
66 1 : ml.tombstone = nil
67 1 : if ml.rangeDelIter != nil {
68 1 : ml.rangeDelIter.Close()
69 1 : }
70 1 : ml.rangeDelIter = iter
71 : }
72 :
73 : type simpleMergingIter struct {
74 : levels []simpleMergingIterLevel
75 : snapshot base.SeqNum
76 : heap simpleMergingIterHeap
77 : // The last point's key and level. For validation.
78 : lastKey InternalKey
79 : lastLevel int
80 : lastIterMsg string
81 : // A non-nil valueMerger means MERGE record processing is ongoing.
82 : valueMerger base.ValueMerger
83 : // The first error will cause step() to return false.
84 : err error
85 : numPoints int64
86 : merge Merge
87 : formatKey base.FormatKey
88 : }
89 :
90 : func (m *simpleMergingIter) init(
91 : merge Merge,
92 : cmp Compare,
93 : snapshot base.SeqNum,
94 : formatKey base.FormatKey,
95 : levels ...simpleMergingIterLevel,
96 1 : ) {
97 1 : m.levels = levels
98 1 : m.formatKey = formatKey
99 1 : m.merge = merge
100 1 : m.snapshot = snapshot
101 1 : m.lastLevel = -1
102 1 : m.heap.cmp = cmp
103 1 : m.heap.items = make([]simpleMergingIterItem, 0, len(levels))
104 1 : for i := range m.levels {
105 1 : l := &m.levels[i]
106 1 : l.iterKV = l.iter.First()
107 1 : if l.iterKV != nil {
108 1 : item := simpleMergingIterItem{
109 1 : index: i,
110 1 : kv: *l.iterKV,
111 1 : }
112 1 : item.kv.K = l.iterKV.K.Clone()
113 1 : m.heap.items = append(m.heap.items, item)
114 1 : }
115 : }
116 1 : m.heap.init()
117 1 :
118 1 : if m.heap.len() == 0 {
119 1 : return
120 1 : }
121 1 : m.positionRangeDels()
122 : }
123 :
124 : // Positions all the rangedel iterators at or past the current top of the
125 : // heap, using SeekGE().
126 1 : func (m *simpleMergingIter) positionRangeDels() {
127 1 : item := &m.heap.items[0]
128 1 : for i := range m.levels {
129 1 : l := &m.levels[i]
130 1 : if l.rangeDelIter == nil {
131 1 : continue
132 : }
133 1 : t, err := l.rangeDelIter.SeekGE(item.kv.K.UserKey)
134 1 : m.err = firstError(m.err, err)
135 1 : l.tombstone = t
136 : }
137 : }
138 :
139 : // Returns true if not yet done.
140 1 : func (m *simpleMergingIter) step() bool {
141 1 : if m.heap.len() == 0 || m.err != nil {
142 1 : return false
143 1 : }
144 1 : item := &m.heap.items[0]
145 1 : l := &m.levels[item.index]
146 1 : // Sentinels are not relevant for this point checking.
147 1 : if !item.kv.K.IsExclusiveSentinel() && item.kv.K.Visible(m.snapshot, base.SeqNumMax) {
148 1 : // This is a visible point key.
149 1 : if !m.handleVisiblePoint(item, l) {
150 0 : return false
151 0 : }
152 : }
153 :
154 : // The iterator for the current level may be closed in the following call to
155 : // Next(). We save its debug string for potential use after it is closed -
156 : // either in this current step() invocation or on the next invocation.
157 1 : m.lastIterMsg = l.iter.String()
158 1 :
159 1 : // Step to the next point.
160 1 : l.iterKV = l.iter.Next()
161 1 : if l.iterKV == nil {
162 1 : m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
163 1 : l.iter = nil
164 1 : m.heap.pop()
165 1 : } else {
166 1 : // Check point keys in an sstable are ordered. Although not required, we check
167 1 : // for memtables as well. A subtle check here is that successive sstables of
168 1 : // L1 and higher levels are ordered. This happens when levelIter moves to the
169 1 : // next sstable in the level, in which case item.key is previous sstable's
170 1 : // last point key.
171 1 : if !l.iterKV.K.IsExclusiveSentinel() && base.InternalCompare(m.heap.cmp, item.kv.K, l.iterKV.K) >= 0 {
172 0 : m.err = errors.Errorf("out of order keys %s >= %s in %s",
173 0 : item.kv.K.Pretty(m.formatKey), l.iterKV.K.Pretty(m.formatKey), l.iter)
174 0 : return false
175 0 : }
176 1 : userKeyBuf := item.kv.K.UserKey[:0]
177 1 : item.kv = *l.iterKV
178 1 : item.kv.K.UserKey = append(userKeyBuf, l.iterKV.K.UserKey...)
179 1 : if m.heap.len() > 1 {
180 1 : m.heap.fix(0)
181 1 : }
182 : }
183 1 : if m.err != nil {
184 0 : return false
185 0 : }
186 1 : if m.heap.len() == 0 {
187 1 : // If m.valueMerger != nil, the last record was a MERGE record.
188 1 : if m.valueMerger != nil {
189 1 : var closer io.Closer
190 1 : var err error
191 1 : _, closer, err = m.valueMerger.Finish(true /* includesBase */)
192 1 : if closer != nil {
193 0 : err = errors.CombineErrors(err, closer.Close())
194 0 : }
195 1 : if err != nil {
196 0 : m.err = errors.CombineErrors(m.err,
197 0 : errors.Wrapf(err, "merge processing error on key %s in %s",
198 0 : item.kv.K.Pretty(m.formatKey), m.lastIterMsg))
199 0 : }
200 1 : m.valueMerger = nil
201 : }
202 1 : return false
203 : }
204 1 : m.positionRangeDels()
205 1 : return true
206 : }
207 :
208 : // handleVisiblePoint returns true if validation succeeded and level checking
209 : // can continue.
210 : func (m *simpleMergingIter) handleVisiblePoint(
211 : item *simpleMergingIterItem, l *simpleMergingIterLevel,
212 1 : ) (ok bool) {
213 1 : m.numPoints++
214 1 : keyChanged := m.heap.cmp(item.kv.K.UserKey, m.lastKey.UserKey) != 0
215 1 : if !keyChanged {
216 1 : // At the same user key. We will see them in decreasing seqnum
217 1 : // order so the lastLevel must not be lower.
218 1 : if m.lastLevel > item.index {
219 0 : m.err = errors.Errorf("found InternalKey %s in %s and InternalKey %s in %s",
220 0 : item.kv.K.Pretty(m.formatKey), l.iter, m.lastKey.Pretty(m.formatKey),
221 0 : m.lastIterMsg)
222 0 : return false
223 0 : }
224 1 : m.lastLevel = item.index
225 1 : } else {
226 1 : // The user key has changed.
227 1 : m.lastKey.Trailer = item.kv.K.Trailer
228 1 : m.lastKey.UserKey = append(m.lastKey.UserKey[:0], item.kv.K.UserKey...)
229 1 : m.lastLevel = item.index
230 1 : }
231 : // Ongoing series of MERGE records ends with a MERGE record.
232 1 : if keyChanged && m.valueMerger != nil {
233 1 : var closer io.Closer
234 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
235 1 : if m.err == nil && closer != nil {
236 0 : m.err = closer.Close()
237 0 : }
238 1 : m.valueMerger = nil
239 : }
240 1 : itemValue, _, err := item.kv.Value(nil)
241 1 : if err != nil {
242 0 : m.err = err
243 0 : return false
244 0 : }
245 1 : if m.valueMerger != nil {
246 1 : // Ongoing series of MERGE records.
247 1 : switch item.kv.K.Kind() {
248 1 : case InternalKeyKindSingleDelete, InternalKeyKindDelete, InternalKeyKindDeleteSized:
249 1 : var closer io.Closer
250 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
251 1 : if m.err == nil && closer != nil {
252 0 : m.err = closer.Close()
253 0 : }
254 1 : m.valueMerger = nil
255 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
256 1 : m.err = m.valueMerger.MergeOlder(itemValue)
257 1 : if m.err == nil {
258 1 : var closer io.Closer
259 1 : _, closer, m.err = m.valueMerger.Finish(true /* includesBase */)
260 1 : if m.err == nil && closer != nil {
261 0 : m.err = closer.Close()
262 0 : }
263 : }
264 1 : m.valueMerger = nil
265 1 : case InternalKeyKindMerge:
266 1 : m.err = m.valueMerger.MergeOlder(itemValue)
267 0 : default:
268 0 : m.err = errors.Errorf("pebble: invalid internal key kind %s in %s",
269 0 : item.kv.K.Pretty(m.formatKey),
270 0 : l.iter)
271 0 : return false
272 : }
273 1 : } else if item.kv.K.Kind() == InternalKeyKindMerge && m.err == nil {
274 1 : // New series of MERGE records.
275 1 : m.valueMerger, m.err = m.merge(item.kv.K.UserKey, itemValue)
276 1 : }
277 1 : if m.err != nil {
278 0 : m.err = errors.Wrapf(m.err, "merge processing error on key %s in %s",
279 0 : item.kv.K.Pretty(m.formatKey), l.iter)
280 0 : return false
281 0 : }
282 : // Is this point covered by a tombstone at a lower level? Note that all these
283 : // iterators must be positioned at a key > item.key.
284 1 : for level := item.index + 1; level < len(m.levels); level++ {
285 1 : lvl := &m.levels[level]
286 1 : if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
287 1 : continue
288 : }
289 1 : if lvl.tombstone.Contains(m.heap.cmp, item.kv.K.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.kv.K.SeqNum()) {
290 0 : m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
291 0 : lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.kv.K.Pretty(m.formatKey),
292 0 : l.iter)
293 0 : return false
294 0 : }
295 : }
296 1 : return true
297 : }
298 :
299 : // Checking that range tombstones are mutually consistent is performed by
300 : // checkRangeTombstones(). See the overview comment at the top of the file.
301 : //
302 : // We do this check as follows:
303 : // - Collect the tombstones for each level, put them into one pool of tombstones
304 : // along with their level information (addTombstonesFromIter()).
305 : // - Collect the start and end user keys from all these tombstones
306 : // (collectAllUserKey()) and use them to fragment all the tombstones
307 : // (fragmentUsingUserKey()).
308 : // - Sort tombstones by start key and decreasing seqnum (all tombstones that
309 : // have the same start key will have the same end key because they have been
310 : // fragmented)
311 : // - Iterate and check (iterateAndCheckTombstones()).
312 : //
313 : // Note that this simple approach requires holding all the tombstones across all
314 : // levels in-memory. A more sophisticated incremental approach could be devised,
315 : // if necessary.
316 :
317 : // A tombstone and the corresponding level it was found in.
318 : type tombstoneWithLevel struct {
319 : keyspan.Span
320 : level int
321 : // The level in LSM. A -1 means it's a memtable.
322 : lsmLevel int
323 : tableNum base.TableNum
324 : }
325 :
326 : func iterateAndCheckTombstones(
327 : cmp Compare, formatKey base.FormatKey, tombstones []tombstoneWithLevel,
328 1 : ) error {
329 1 : slices.SortFunc(tombstones, func(a, b tombstoneWithLevel) int {
330 1 : if v := cmp(a.Start, b.Start); v != 0 {
331 1 : return v
332 1 : }
333 1 : return stdcmp.Compare(b.LargestSeqNum(), a.LargestSeqNum())
334 : })
335 :
336 : // For a sequence of tombstones that share the same start UserKey, we will
337 : // encounter them in non-increasing seqnum order and so should encounter them
338 : // in non-decreasing level order.
339 1 : lastTombstone := tombstoneWithLevel{}
340 1 : for _, t := range tombstones {
341 1 : if cmp(lastTombstone.Start, t.Start) == 0 && lastTombstone.level > t.level {
342 0 : return errors.Errorf("encountered tombstone %s in %s"+
343 0 : " that has a lower seqnum than the same tombstone in %s",
344 0 : t.Span.Pretty(formatKey), levelOrMemtable(t.lsmLevel, t.tableNum),
345 0 : levelOrMemtable(lastTombstone.lsmLevel, lastTombstone.tableNum))
346 0 : }
347 1 : lastTombstone = t
348 : }
349 1 : return nil
350 : }
351 :
352 : type checkConfig struct {
353 : logger Logger
354 : comparer *Comparer
355 : readState *readState
356 : newIters tableNewIters
357 : seqNum base.SeqNum
358 : stats *CheckLevelsStats
359 : merge Merge
360 : formatKey base.FormatKey
361 : readEnv block.ReadEnv
362 : // blobValueFetcher is the ValueFetcher to use when retrieving values stored
363 : // externally in blob files.
364 : blobValueFetcher blob.ValueFetcher
365 : fileCache *fileCacheHandle
366 : }
367 :
368 : // cmp is shorthand for comparer.Compare.
369 1 : func (c *checkConfig) cmp(a, b []byte) int { return c.comparer.Compare(a, b) }
370 :
371 1 : func checkRangeTombstones(c *checkConfig) error {
372 1 : var level int
373 1 : var tombstones []tombstoneWithLevel
374 1 : var err error
375 1 :
376 1 : memtables := c.readState.memtables
377 1 : for i := len(memtables) - 1; i >= 0; i-- {
378 1 : iter := memtables[i].newRangeDelIter(nil)
379 1 : if iter == nil {
380 1 : continue
381 : }
382 1 : tombstones, err = addTombstonesFromIter(
383 1 : iter, level, -1, 0, tombstones, c.seqNum, c.cmp, c.formatKey,
384 1 : )
385 1 : iter.Close()
386 1 : if err != nil {
387 0 : return err
388 0 : }
389 1 : level++
390 : }
391 :
392 1 : current := c.readState.current
393 1 : addTombstonesFromLevel := func(files iter.Seq[*manifest.TableMetadata], lsmLevel int) error {
394 1 : for f := range files {
395 1 : iters, err := c.newIters(
396 1 : context.Background(), f, &IterOptions{layer: manifest.Level(lsmLevel)},
397 1 : internalIterOpts{}, iterRangeDeletions)
398 1 : if err != nil {
399 0 : return err
400 0 : }
401 1 : tombstones, err = addTombstonesFromIter(iters.RangeDeletion(), level, lsmLevel, f.TableNum,
402 1 : tombstones, c.seqNum, c.cmp, c.formatKey)
403 1 : _ = iters.CloseAll()
404 1 :
405 1 : if err != nil {
406 0 : return err
407 0 : }
408 : }
409 1 : return nil
410 : }
411 : // Now the levels with untruncated tombsones.
412 1 : for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- {
413 1 : if current.L0SublevelFiles[i].Empty() {
414 0 : continue
415 : }
416 1 : err := addTombstonesFromLevel(current.L0SublevelFiles[i].All(), 0)
417 1 : if err != nil {
418 0 : return err
419 0 : }
420 1 : level++
421 : }
422 1 : for i := 1; i < len(current.Levels); i++ {
423 1 : if err := addTombstonesFromLevel(current.Levels[i].All(), i); err != nil {
424 0 : return err
425 0 : }
426 1 : level++
427 : }
428 1 : if c.stats != nil {
429 0 : c.stats.NumTombstones = len(tombstones)
430 0 : }
431 : // We now have truncated tombstones.
432 : // Fragment them all.
433 1 : userKeys := collectAllUserKeys(c.cmp, tombstones)
434 1 : tombstones = fragmentUsingUserKeys(c.cmp, tombstones, userKeys)
435 1 : return iterateAndCheckTombstones(c.cmp, c.formatKey, tombstones)
436 : }
437 :
438 0 : func levelOrMemtable(lsmLevel int, tableNum base.TableNum) string {
439 0 : if lsmLevel == -1 {
440 0 : return "memtable"
441 0 : }
442 0 : return fmt.Sprintf("L%d: fileNum=%s", lsmLevel, tableNum)
443 : }
444 :
445 : func addTombstonesFromIter(
446 : iter keyspan.FragmentIterator,
447 : level int,
448 : lsmLevel int,
449 : tableNum base.TableNum,
450 : tombstones []tombstoneWithLevel,
451 : seqNum base.SeqNum,
452 : cmp Compare,
453 : formatKey base.FormatKey,
454 1 : ) (_ []tombstoneWithLevel, err error) {
455 1 : var prevTombstone keyspan.Span
456 1 : tomb, err := iter.First()
457 1 : for ; tomb != nil; tomb, err = iter.Next() {
458 1 : t := tomb.Visible(seqNum)
459 1 : if t.Empty() {
460 1 : continue
461 : }
462 1 : t = t.Clone()
463 1 : // This is mainly a test for rangeDelV2 formatted blocks which are expected to
464 1 : // be ordered and fragmented on disk. But we anyways check for memtables,
465 1 : // rangeDelV1 as well.
466 1 : if cmp(prevTombstone.End, t.Start) > 0 {
467 0 : return nil, errors.Errorf("unordered or unfragmented range delete tombstones %s, %s in %s",
468 0 : prevTombstone.Pretty(formatKey), t.Pretty(formatKey), levelOrMemtable(lsmLevel, tableNum))
469 0 : }
470 1 : prevTombstone = t
471 1 :
472 1 : if !t.Empty() {
473 1 : tombstones = append(tombstones, tombstoneWithLevel{
474 1 : Span: t,
475 1 : level: level,
476 1 : lsmLevel: lsmLevel,
477 1 : tableNum: tableNum,
478 1 : })
479 1 : }
480 : }
481 1 : if err != nil {
482 0 : return nil, err
483 0 : }
484 1 : return tombstones, nil
485 : }
486 :
487 1 : func collectAllUserKeys(cmp Compare, tombstones []tombstoneWithLevel) [][]byte {
488 1 : keys := make([][]byte, 0, len(tombstones)*2)
489 1 : for _, t := range tombstones {
490 1 : keys = append(keys, t.Start, t.End)
491 1 : }
492 1 : slices.SortFunc(keys, cmp)
493 1 : return slices.CompactFunc(keys, func(a, b []byte) bool {
494 1 : return cmp(a, b) == 0
495 1 : })
496 : }
497 :
498 : func fragmentUsingUserKeys(
499 : cmp Compare, tombstones []tombstoneWithLevel, userKeys [][]byte,
500 1 : ) []tombstoneWithLevel {
501 1 : var buf []tombstoneWithLevel
502 1 : for _, t := range tombstones {
503 1 : // Find the first position with tombstone start < user key
504 1 : i := sort.Search(len(userKeys), func(i int) bool {
505 1 : return cmp(t.Start, userKeys[i]) < 0
506 1 : })
507 1 : for ; i < len(userKeys); i++ {
508 1 : if cmp(userKeys[i], t.End) >= 0 {
509 1 : break
510 : }
511 1 : tPartial := t
512 1 : tPartial.End = userKeys[i]
513 1 : buf = append(buf, tPartial)
514 1 : t.Start = userKeys[i]
515 : }
516 1 : buf = append(buf, t)
517 : }
518 1 : return buf
519 : }
520 :
521 : // CheckLevelsStats provides basic stats on points and tombstones encountered.
522 : type CheckLevelsStats struct {
523 : NumPoints int64
524 : NumTombstones int
525 : }
526 :
527 : // CheckLevels checks:
528 : // - Every entry in the DB is consistent with the level invariant. See the
529 : // comment at the top of the file.
530 : // - Point keys in sstables are ordered.
531 : // - Range delete tombstones in sstables are ordered and fragmented.
532 : // - Successful processing of all MERGE records.
533 : // - Each sstable's blob reference liveness block is valid.
534 1 : func (d *DB) CheckLevels(stats *CheckLevelsStats) error {
535 1 : // Grab and reference the current readState.
536 1 : readState := d.loadReadState()
537 1 : defer readState.unref()
538 1 :
539 1 : // Determine the seqnum to read at after grabbing the read state (current and
540 1 : // memtables) above.
541 1 : seqNum := d.mu.versions.visibleSeqNum.Load()
542 1 :
543 1 : checkConfig := &checkConfig{
544 1 : logger: d.opts.Logger,
545 1 : comparer: d.opts.Comparer,
546 1 : readState: readState,
547 1 : newIters: d.newIters,
548 1 : seqNum: seqNum,
549 1 : stats: stats,
550 1 : merge: d.merge,
551 1 : formatKey: d.opts.Comparer.FormatKey,
552 1 : readEnv: block.ReadEnv{
553 1 : // TODO(jackson): Add categorized stats.
554 1 : },
555 1 : fileCache: d.fileCache,
556 1 : }
557 1 : checkConfig.blobValueFetcher.Init(&readState.current.BlobFiles, checkConfig.fileCache, checkConfig.readEnv)
558 1 : defer func() { _ = checkConfig.blobValueFetcher.Close() }()
559 1 : return checkLevelsInternal(checkConfig)
560 : }
561 :
562 1 : func checkLevelsInternal(c *checkConfig) (err error) {
563 1 : internalOpts := internalIterOpts{
564 1 : readEnv: sstable.ReadEnv{Block: c.readEnv},
565 1 : blobValueFetcher: &c.blobValueFetcher,
566 1 : }
567 1 :
568 1 : // Phase 1: Use a simpleMergingIter to step through all the points and ensure
569 1 : // that points with the same user key at different levels are not inverted
570 1 : // wrt sequence numbers and the same holds for tombstones that cover points.
571 1 : // To do this, one needs to construct a simpleMergingIter which is similar to
572 1 : // how one constructs a mergingIter.
573 1 :
574 1 : // Add mem tables from newest to oldest.
575 1 : var mlevels []simpleMergingIterLevel
576 1 : defer func() {
577 1 : for i := range mlevels {
578 1 : l := &mlevels[i]
579 1 : if l.iter != nil {
580 1 : err = firstError(err, l.iter.Close())
581 1 : l.iter = nil
582 1 : }
583 1 : if l.rangeDelIter != nil {
584 1 : l.rangeDelIter.Close()
585 1 : l.rangeDelIter = nil
586 1 : }
587 : }
588 : }()
589 :
590 1 : memtables := c.readState.memtables
591 1 : for i := len(memtables) - 1; i >= 0; i-- {
592 1 : mem := memtables[i]
593 1 : mlevels = append(mlevels, simpleMergingIterLevel{
594 1 : iter: mem.newIter(nil),
595 1 : rangeDelIter: mem.newRangeDelIter(nil),
596 1 : })
597 1 : }
598 :
599 1 : current := c.readState.current
600 1 : // Determine the final size for mlevels so that there are no more
601 1 : // reallocations. levelIter will hold a pointer to elements in mlevels.
602 1 : start := len(mlevels)
603 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
604 1 : if current.L0SublevelFiles[sublevel].Empty() {
605 0 : continue
606 : }
607 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
608 : }
609 1 : for level := 1; level < len(current.Levels); level++ {
610 1 : if current.Levels[level].Empty() {
611 1 : continue
612 : }
613 1 : mlevels = append(mlevels, simpleMergingIterLevel{})
614 : }
615 1 : mlevelAlloc := mlevels[start:]
616 1 : var allTables []*manifest.TableMetadata
617 1 :
618 1 : // Add L0 files by sublevel.
619 1 : for sublevel := len(current.L0SublevelFiles) - 1; sublevel >= 0; sublevel-- {
620 1 : if current.L0SublevelFiles[sublevel].Empty() {
621 0 : continue
622 : }
623 1 : manifestIter := current.L0SublevelFiles[sublevel].Iter()
624 1 : iterOpts := IterOptions{logger: c.logger}
625 1 : li := &levelIter{}
626 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
627 1 : manifest.L0Sublevel(sublevel), internalOpts)
628 1 : li.initRangeDel(&mlevelAlloc[0])
629 1 : mlevelAlloc[0].iter = li
630 1 : mlevelAlloc = mlevelAlloc[1:]
631 1 : for f := range current.L0SublevelFiles[sublevel].All() {
632 1 : allTables = append(allTables, f)
633 1 : }
634 : }
635 1 : for level := 1; level < len(current.Levels); level++ {
636 1 : if current.Levels[level].Empty() {
637 1 : continue
638 : }
639 1 : iterOpts := IterOptions{logger: c.logger}
640 1 : li := &levelIter{}
641 1 : li.init(context.Background(), iterOpts, c.comparer, c.newIters,
642 1 : current.Levels[level].Iter(), manifest.Level(level), internalOpts)
643 1 : li.initRangeDel(&mlevelAlloc[0])
644 1 : mlevelAlloc[0].iter = li
645 1 : mlevelAlloc = mlevelAlloc[1:]
646 1 : for f := range current.Levels[level].All() {
647 1 : allTables = append(allTables, f)
648 1 : }
649 : }
650 :
651 1 : mergingIter := &simpleMergingIter{}
652 1 : mergingIter.init(c.merge, c.cmp, c.seqNum, c.formatKey, mlevels...)
653 1 : for cont := mergingIter.step(); cont; cont = mergingIter.step() {
654 1 : }
655 1 : if err := mergingIter.err; err != nil {
656 0 : return err
657 0 : }
658 1 : if c.stats != nil {
659 0 : c.stats.NumPoints = mergingIter.numPoints
660 0 : }
661 :
662 : // Phase 2: Check that the tombstones are mutually consistent.
663 1 : if err := checkRangeTombstones(c); err != nil {
664 0 : return err
665 0 : }
666 :
667 : // Phase 3: Validate blob value liveness block for all tables in the LSM.
668 : // TODO(annie): This is a very expensive operation. We should try to reduce
669 : // the amount of work performed. One possibility is to have the caller
670 : // pass in a prng seed and use that to choose which tables to validate.
671 1 : if err := validateBlobValueLiveness(allTables, c.fileCache, c.readEnv, &c.blobValueFetcher); err != nil {
672 0 : return err
673 0 : }
674 :
675 1 : return nil
676 : }
677 :
678 : type valuesInfo struct {
679 : valueIDs []int
680 : totalSize int
681 : }
682 :
683 : // gatherBlobHandles gathers all the blob handles in an sstable, returning a
684 : // slice of maps; indexing into the slice at `i` is equivalent to retrieving
685 : // each blob.BlockID's referenced blob.BlockValueID for the `i`th blob reference.
686 : func gatherBlobHandles(
687 : ctx context.Context,
688 : r *sstable.Reader,
689 : blobRefs manifest.BlobReferences,
690 : valueFetcher base.ValueFetcher,
691 1 : ) ([]map[blob.BlockID]valuesInfo, error) {
692 1 : iter, err := r.NewPointIter(ctx, sstable.IterOptions{
693 1 : BlobContext: sstable.TableBlobContext{
694 1 : ValueFetcher: valueFetcher,
695 1 : References: &blobRefs,
696 1 : },
697 1 : })
698 1 : if err != nil {
699 0 : return nil, err
700 0 : }
701 1 : defer func() { _ = iter.Close() }()
702 :
703 1 : referenced := make([]map[blob.BlockID]valuesInfo, len(blobRefs))
704 1 : for i := range referenced {
705 1 : referenced[i] = make(map[blob.BlockID]valuesInfo)
706 1 : }
707 1 : for kv := iter.First(); kv != nil; kv = iter.Next() {
708 1 : if kv.V.IsBlobValueHandle() {
709 1 : lv := kv.V.LazyValue()
710 1 : handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
711 1 : refID, ok := blobRefs.IDByBlobFileID(lv.Fetcher.BlobFileID)
712 1 : if !ok {
713 0 : return nil, errors.Errorf("blob file ID %d not found in blob references", lv.Fetcher.BlobFileID)
714 0 : }
715 1 : blockID := handleSuffix.BlockID
716 1 : valueID := int(handleSuffix.ValueID)
717 1 : vi := referenced[refID][blockID]
718 1 : vi.valueIDs = append(vi.valueIDs, valueID)
719 1 : vi.totalSize += lv.Len()
720 1 : referenced[refID][blockID] = vi
721 : }
722 : }
723 1 : return referenced, nil
724 : }
725 :
726 : func performValidationForSSTable(
727 : decoder colblk.ReferenceLivenessBlockDecoder,
728 : tableNum base.TableNum,
729 : referenced []map[blob.BlockID]valuesInfo,
730 1 : ) error {
731 1 : if len(referenced) != decoder.BlockDecoder().Rows() {
732 0 : return errors.Errorf("mismatch in number of references in blob value "+
733 0 : "liveness block: expected=%d found=%d", len(referenced),
734 0 : decoder.BlockDecoder().Rows())
735 0 : }
736 1 : for refID, blockValues := range referenced {
737 1 : bitmapEncodings := slices.Clone(decoder.LivenessAtReference(refID))
738 1 : for _, blockEnc := range sstable.DecodeBlobRefLivenessEncoding(bitmapEncodings) {
739 1 : blockID := blockEnc.BlockID
740 1 : vi, ok := blockValues[blockID]
741 1 : if !ok {
742 0 : return errors.Errorf("dangling refID=%d blockID=%d in blob value "+
743 0 : "liveness encoding for sstable %d", refID, blockID, tableNum)
744 0 : }
745 1 : encodedVals := slices.Collect(sstable.IterSetBitsInRunLengthBitmap(blockEnc.Bitmap))
746 1 : if !slices.Equal(vi.valueIDs, encodedVals) {
747 0 : return errors.Errorf("bitmap mismatch for refID=%d blockID=%d: "+
748 0 : "expected=%v encoded=%v for sstable %d", refID, blockID, vi.valueIDs,
749 0 : encodedVals, tableNum)
750 0 : }
751 1 : if vi.totalSize != blockEnc.ValuesSize {
752 0 : return errors.Errorf("value size mismatch for refID=%d blockID=%d: "+
753 0 : "expected=%d encoded=%d for sstable %d", refID, blockID, vi.totalSize,
754 0 : blockEnc.ValuesSize, tableNum)
755 0 : }
756 : // Remove the processed blockID from the map so that later,
757 : // we can check if we processed everything. This is to
758 : // ensure that we do not have any missing references in the
759 : // blob reference liveness block for any of the references
760 : // in the sstable.
761 1 : delete(blockValues, blockID)
762 : }
763 1 : if len(blockValues) > 0 {
764 0 : return errors.Errorf("refID=%d blockIDs=%v referenced by sstable %d "+
765 0 : "is/are not present in blob reference liveness block", refID,
766 0 : slices.Collect(maps.Keys(blockValues)), tableNum)
767 0 : }
768 : }
769 1 : return nil
770 : }
771 :
772 : // validateBlobValueLiveness iterates through each table,
773 : // gathering all the blob handles, and then compares the values encoded in the
774 : // blob reference liveness block to the values referenced by the blob handles.
775 : func validateBlobValueLiveness(
776 : tables []*manifest.TableMetadata,
777 : fc *fileCacheHandle,
778 : readEnv block.ReadEnv,
779 : valueFetcher base.ValueFetcher,
780 1 : ) error {
781 1 : ctx := context.TODO()
782 1 : var decoder colblk.ReferenceLivenessBlockDecoder
783 1 : for _, t := range tables {
784 1 : if len(t.BlobReferences) == 0 {
785 1 : continue
786 : }
787 1 : if err := fc.withReader(ctx, readEnv, t, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
788 1 : // For this sstable, gather all the blob handles -- tracking
789 1 : // each blob.ReferenceID + blob.BlockID's referenced
790 1 : // blob.BlockValueIDs.
791 1 : referenced, err := gatherBlobHandles(ctx, r, t.BlobReferences, valueFetcher)
792 1 : if err != nil {
793 0 : return err
794 0 : }
795 1 : h, err := r.ReadBlobRefIndexBlock(ctx, readEnv.Block)
796 1 : if err != nil {
797 0 : return err
798 0 : }
799 1 : defer h.Release()
800 1 : decoder.Init(h.BlockData())
801 1 : return performValidationForSSTable(decoder, t.TableNum, referenced)
802 0 : }); err != nil {
803 0 : return err
804 0 : }
805 : }
806 1 : return nil
807 : }
808 :
809 : type simpleMergingIterItem struct {
810 : index int
811 : kv base.InternalKV
812 : }
813 :
814 : type simpleMergingIterHeap struct {
815 : cmp Compare
816 : reverse bool
817 : items []simpleMergingIterItem
818 : }
819 :
820 1 : func (h *simpleMergingIterHeap) len() int {
821 1 : return len(h.items)
822 1 : }
823 :
824 1 : func (h *simpleMergingIterHeap) less(i, j int) bool {
825 1 : ikey, jkey := h.items[i].kv.K, h.items[j].kv.K
826 1 : if c := h.cmp(ikey.UserKey, jkey.UserKey); c != 0 {
827 1 : if h.reverse {
828 0 : return c > 0
829 0 : }
830 1 : return c < 0
831 : }
832 1 : if h.reverse {
833 0 : return ikey.Trailer < jkey.Trailer
834 0 : }
835 1 : return ikey.Trailer > jkey.Trailer
836 : }
837 :
838 1 : func (h *simpleMergingIterHeap) swap(i, j int) {
839 1 : h.items[i], h.items[j] = h.items[j], h.items[i]
840 1 : }
841 :
842 : // init, fix, up and down are copied from the go stdlib.
843 1 : func (h *simpleMergingIterHeap) init() {
844 1 : // heapify
845 1 : n := h.len()
846 1 : for i := n/2 - 1; i >= 0; i-- {
847 1 : h.down(i, n)
848 1 : }
849 : }
850 :
851 1 : func (h *simpleMergingIterHeap) fix(i int) {
852 1 : if !h.down(i, h.len()) {
853 1 : h.up(i)
854 1 : }
855 : }
856 :
857 1 : func (h *simpleMergingIterHeap) pop() *simpleMergingIterItem {
858 1 : n := h.len() - 1
859 1 : h.swap(0, n)
860 1 : h.down(0, n)
861 1 : item := &h.items[n]
862 1 : h.items = h.items[:n]
863 1 : return item
864 1 : }
865 :
866 1 : func (h *simpleMergingIterHeap) up(j int) {
867 1 : for {
868 1 : i := (j - 1) / 2 // parent
869 1 : if i == j || !h.less(j, i) {
870 1 : break
871 : }
872 0 : h.swap(i, j)
873 0 : j = i
874 : }
875 : }
876 :
877 1 : func (h *simpleMergingIterHeap) down(i0, n int) bool {
878 1 : i := i0
879 1 : for {
880 1 : j1 := 2*i + 1
881 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
882 1 : break
883 : }
884 1 : j := j1 // left child
885 1 : if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
886 1 : j = j2 // = 2*i + 2 // right child
887 1 : }
888 1 : if !h.less(j, i) {
889 1 : break
890 : }
891 1 : h.swap(i, j)
892 1 : i = j
893 : }
894 1 : return i > i0
895 : }
|