Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "runtime/debug"
12 : "unsafe"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : "github.com/cockroachdb/pebble/internal/treeprinter"
19 : )
20 :
21 : type mergingIterLevel struct {
22 : index int
23 : iter internalIterator
24 : // rangeDelIter is set to the range-deletion iterator for the level. When
25 : // configured with a levelIter, this pointer changes as sstable boundaries
26 : // are crossed. See levelIter.initRangeDel and the Range Deletions comment
27 : // below.
28 : rangeDelIter keyspan.FragmentIterator
29 : // rangeDelIterGeneration is incremented whenever rangeDelIter changes.
30 : rangeDelIterGeneration int
31 : // iterKV caches the current key-value pair iter points to.
32 : iterKV *base.InternalKV
33 : // levelIter is non-nil if this level's iter is ultimately backed by a
34 : // *levelIter. The handle in iter may have wrapped the levelIter with
35 : // intermediary internalIterator implementations.
36 : levelIter *levelIter
37 :
38 : // tombstone caches the tombstone rangeDelIter is currently pointed at. If
39 : // tombstone is nil, there are no further tombstones within the
40 : // current sstable in the current iterator direction. The cached tombstone is
41 : // only valid for the levels in the range [0,heap[0].index]. This avoids
42 : // positioning tombstones at lower levels which cannot possibly shadow the
43 : // current key.
44 : tombstone *keyspan.Span
45 : }
46 :
47 : // Assert that *mergingIterLevel implements rangeDelIterSetter.
48 : var _ rangeDelIterSetter = (*mergingIterLevel)(nil)
49 :
50 2 : func (ml *mergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
51 2 : ml.tombstone = nil
52 2 : if ml.rangeDelIter != nil {
53 2 : ml.rangeDelIter.Close()
54 2 : }
55 2 : ml.rangeDelIter = iter
56 2 : ml.rangeDelIterGeneration++
57 : }
58 :
59 : // mergingIter provides a merged view of multiple iterators from different
60 : // levels of the LSM.
61 : //
62 : // The core of a mergingIter is a heap of internalIterators (see
63 : // mergingIterHeap). The heap can operate as either a min-heap, used during
64 : // forward iteration (First, SeekGE, Next) or a max-heap, used during reverse
65 : // iteration (Last, SeekLT, Prev). The heap is initialized in calls to First,
66 : // Last, SeekGE, and SeekLT. A call to Next or Prev takes the current top
67 : // element on the heap, advances its iterator, and then "fixes" the heap
68 : // property. When one of the child iterators is exhausted during Next/Prev
69 : // iteration, it is removed from the heap.
70 : //
71 : // # Range Deletions
72 : //
73 : // A mergingIter can optionally be configured with a slice of range deletion
74 : // iterators. The range deletion iterator slice must exactly parallel the point
75 : // iterators and the range deletion iterator must correspond to the same level
76 : // in the LSM as the point iterator. Note that each memtable and each table in
77 : // L0 is a different "level" from the mergingIter perspective. So level 0 below
78 : // does not correspond to L0 in the LSM.
79 : //
80 : // A range deletion iterator iterates over fragmented range tombstones. Range
81 : // tombstones are fragmented by splitting them at any overlapping points. This
82 : // fragmentation guarantees that within an sstable tombstones will either be
83 : // distinct or will have identical start and end user keys. While range
84 : // tombstones are fragmented within an sstable, the start and end keys are not truncated
85 : // to sstable boundaries. This is necessary because the tombstone end key is
86 : // exclusive and does not have a sequence number. Consider an sstable
87 : // containing the range tombstone [a,c)#9 and the key "b#8". The tombstone must
88 : // delete "b#8", yet older versions of "b" might spill over to the next
89 : // sstable. So the boundary key for this sstable must be "b#8". Adjusting the
90 : // end key of tombstones to be optionally inclusive or contain a sequence
91 : // number would be possible solutions (such solutions have potentially serious
92 : // issues: tombstones have exclusive end keys since an inclusive deletion end can
93 : // be converted to an exclusive one while the reverse transformation is not possible;
94 : // the semantics of a sequence number for the end key of a range tombstone are murky).
95 : //
96 : // The approach taken here performs an
97 : // implicit truncation of the tombstone to the sstable boundaries.
98 : //
99 : // During initialization of a mergingIter, the range deletion iterators for
100 : // batches, memtables, and L0 tables are populated up front. Note that Batches
101 : // and memtables index unfragmented tombstones. Batch.newRangeDelIter() and
102 : // memTable.newRangeDelIter() fragment and cache the tombstones on demand. The
103 : // L1-L6 range deletion iterators are populated by levelIter. When configured
104 : // to load range deletion iterators, whenever a levelIter loads a table it
105 : // loads both the point iterator and the range deletion
106 : // iterator. levelIter.rangeDelIter is configured to point to the right entry
107 : // in mergingIter.levels. The effect of this setup is that
108 : // mergingIter.levels[i].rangeDelIter always contains the fragmented range
109 : // tombstone for the current table in level i that the levelIter has open.
110 : //
111 : // Another crucial mechanism of levelIter is that it materializes fake point
112 : // entries for the table boundaries if the boundary is range deletion
113 : // key. Consider a table that contains only a range tombstone [a-e)#10. The
114 : // sstable boundaries for this table will be a#10,15 and
115 : // e#72057594037927935,15. During forward iteration levelIter will return
116 : // e#72057594037927935,15 as a key. During reverse iteration levelIter will
117 : // return a#10,15 as a key. These sentinel keys act as bookends to point
118 : // iteration and allow mergingIter to keep a table and its associated range
119 : // tombstones loaded as long as there are keys at lower levels that are within
120 : // the bounds of the table.
121 : //
122 : // The final piece to the range deletion puzzle is the LSM invariant that for a
123 : // given key K newer versions of K can only exist earlier in the level, or at
124 : // higher levels of the tree. For example, if K#4 exists in L3, k#5 can only
125 : // exist earlier in the L3 or in L0, L1, L2 or a memtable. Get very explicitly
126 : // uses this invariant to find the value for a key by walking the LSM level by
127 : // level. For range deletions, this invariant means that a range deletion at
128 : // level N will necessarily shadow any keys within its bounds in level Y where
129 : // Y > N. One wrinkle to this statement is that it only applies to keys that
130 : // lie within the sstable bounds as well, but we get that guarantee due to the
131 : // way the range deletion iterator and point iterator are bound together by a
132 : // levelIter.
133 : //
134 : // Tying the above all together, we get a picture where each level (index in
135 : // mergingIter.levels) is composed of both point operations (pX) and range
136 : // deletions (rX). The range deletions for level X shadow both the point
137 : // operations and range deletions for level Y where Y > X allowing mergingIter
138 : // to skip processing entries in that shadow. For example, consider the
139 : // scenario:
140 : //
141 : // r0: a---e
142 : // r1: d---h
143 : // r2: g---k
144 : // r3: j---n
145 : // r4: m---q
146 : //
147 : // This is showing 5 levels of range deletions. Consider what happens upon
148 : // SeekGE("b"). We first seek the point iterator for level 0 (the point values
149 : // are not shown above) and we then seek the range deletion iterator. That
150 : // returns the tombstone [a,e). This tombstone tells us that all keys in the
151 : // range [a,e) in lower levels are deleted so we can skip them. So we can
152 : // adjust the seek key to "e", the tombstone end key. For level 1 we seek to
153 : // "e" and find the range tombstone [d,h) and similar logic holds. By the time
154 : // we get to level 4 we're seeking to "n".
155 : //
156 : // One consequence of not truncating tombstone end keys to sstable boundaries
157 : // is the seeking process described above cannot always seek to the tombstone
158 : // end key in the older level. For example, imagine in the above example r3 is
159 : // a partitioned level (i.e., L1+ in our LSM), and the sstable containing [j,
160 : // n) has "k" as its upper boundary. In this situation, compactions involving
161 : // keys at or after "k" can output those keys to r4+, even if they're newer
162 : // than our tombstone [j, n). So instead of seeking to "n" in r4 we can only
163 : // seek to "k". To achieve this, the instance variable `largestUserKey.`
164 : // maintains the upper bounds of the current sstables in the partitioned
165 : // levels. In this example, `levels[3].largestUserKey` holds "k", telling us to
166 : // limit the seek triggered by a tombstone in r3 to "k".
167 : //
168 : // During actual iteration levels can contain both point operations and range
169 : // deletions. Within a level, when a range deletion contains a point operation
170 : // the sequence numbers must be checked to determine if the point operation is
171 : // newer or older than the range deletion tombstone. The mergingIter maintains
172 : // the invariant that the range deletion iterators for all levels newer that
173 : // the current iteration key (L < m.heap.items[0].index) are positioned at the
174 : // next (or previous during reverse iteration) range deletion tombstone. We
175 : // know those levels don't contain a range deletion tombstone that covers the
176 : // current key because if they did the current key would be deleted. The range
177 : // deletion iterator for the current key's level is positioned at a range
178 : // tombstone covering or past the current key. The position of all of other
179 : // range deletion iterators is unspecified. Whenever a key from those levels
180 : // becomes the current key, their range deletion iterators need to be
181 : // positioned. This lazy positioning avoids seeking the range deletion
182 : // iterators for keys that are never considered. (A similar bit of lazy
183 : // evaluation can be done for the point iterators, but is still TBD).
184 : //
185 : // For a full example, consider the following setup:
186 : //
187 : // p0: o
188 : // r0: m---q
189 : //
190 : // p1: n p
191 : // r1: g---k
192 : //
193 : // p2: b d i
194 : // r2: a---e q----v
195 : //
196 : // p3: e
197 : // r3:
198 : //
199 : // If we start iterating from the beginning, the first key we encounter is "b"
200 : // in p2. When the mergingIter is pointing at a valid entry, the range deletion
201 : // iterators for all of the levels < m.heap.items[0].index are positioned at
202 : // the next range tombstone past the current key. So r0 will point at [m,q) and
203 : // r1 at [g,k). When the key "b" is encountered, we check to see if the current
204 : // tombstone for r0 or r1 contains it, and whether the tombstone for r2, [a,e),
205 : // contains and is newer than "b".
206 : //
207 : // Advancing the iterator finds the next key at "d". This is in the same level
208 : // as the previous key "b" so we don't have to reposition any of the range
209 : // deletion iterators, but merely check whether "d" is now contained by any of
210 : // the range tombstones at higher levels or has stepped past the range
211 : // tombstone in its own level or higher levels. In this case, there is nothing to be done.
212 : //
213 : // Advancing the iterator again finds "e". Since "e" comes from p3, we have to
214 : // position the r3 range deletion iterator, which is empty. "e" is past the r2
215 : // tombstone of [a,e) so we need to advance the r2 range deletion iterator to
216 : // [q,v).
217 : //
218 : // The next key is "i". Because this key is in p2, a level above "e", we don't
219 : // have to reposition any range deletion iterators and instead see that "i" is
220 : // covered by the range tombstone [g,k). The iterator is immediately advanced
221 : // to "n" which is covered by the range tombstone [m,q) causing the iterator to
222 : // advance to "o" which is visible.
223 : //
224 : // # Error handling
225 : //
226 : // Any iterator operation may fail. The InternalIterator contract dictates that
227 : // an iterator must return a nil internal key when an error occurs, and a
228 : // subsequent call to Error() should return the error value. The exported
229 : // merging iterator positioning methods must adhere to this contract by setting
230 : // m.err to hold any error encountered by the individual level iterators and
231 : // returning a nil internal key. Some internal helpers (eg,
232 : // find[Next|Prev]Entry) also adhere to this contract, setting m.err directly).
233 : // Other internal functions return an explicit error return value and DO NOT set
234 : // m.err, relying on the caller to set m.err appropriately.
235 : //
236 : // TODO(jackson): Update the InternalIterator interface to return explicit error
237 : // return values (and an *InternalKV pointer).
238 : //
239 : // TODO(peter,rangedel): For testing, advance the iterator through various
240 : // scenarios and have each step display the current state (i.e. the current
241 : // heap and range-del iterator positioning).
242 : type mergingIter struct {
243 : logger Logger
244 : split Split
245 : dir int
246 : snapshot base.SeqNum
247 : batchSnapshot base.SeqNum
248 : levels []mergingIterLevel
249 : heap mergingIterHeap
250 : err error
251 : prefix []byte
252 : lower []byte
253 : upper []byte
254 : stats *InternalIteratorStats
255 : seekKeyBuf []byte
256 :
257 : // levelsPositioned, if non-nil, is a slice of the same length as levels.
258 : // It's used by NextPrefix to record which levels have already been
259 : // repositioned. It's created lazily by the first call to NextPrefix.
260 : levelsPositioned []bool
261 :
262 : combinedIterState *combinedIterState
263 :
264 : // Used in some tests to disable the random disabling of seek optimizations.
265 : forceEnableSeekOpt bool
266 : }
267 :
268 : // mergingIter implements the base.InternalIterator interface.
269 : var _ base.InternalIterator = (*mergingIter)(nil)
270 :
271 : // newMergingIter returns an iterator that merges its input. Walking the
272 : // resultant iterator will return all key/value pairs of all input iterators
273 : // in strictly increasing key order, as defined by cmp. It is permissible to
274 : // pass a nil split parameter if the caller is never going to call
275 : // SeekPrefixGE.
276 : //
277 : // The input's key ranges may overlap, but there are assumed to be no duplicate
278 : // keys: if iters[i] contains a key k then iters[j] will not contain that key k.
279 : //
280 : // None of the iters may be nil.
281 : func newMergingIter(
282 : logger Logger,
283 : stats *base.InternalIteratorStats,
284 : cmp Compare,
285 : split Split,
286 : iters ...internalIterator,
287 2 : ) *mergingIter {
288 2 : m := &mergingIter{}
289 2 : levels := make([]mergingIterLevel, len(iters))
290 2 : for i := range levels {
291 2 : levels[i].iter = iters[i]
292 2 : }
293 2 : m.init(&IterOptions{logger: logger}, stats, cmp, split, levels...)
294 2 : return m
295 : }
296 :
297 : func (m *mergingIter) init(
298 : opts *IterOptions,
299 : stats *base.InternalIteratorStats,
300 : cmp Compare,
301 : split Split,
302 : levels ...mergingIterLevel,
303 2 : ) {
304 2 : m.err = nil // clear cached iteration error
305 2 : m.logger = opts.getLogger()
306 2 : if opts != nil {
307 2 : m.lower = opts.LowerBound
308 2 : m.upper = opts.UpperBound
309 2 : }
310 2 : m.snapshot = base.SeqNumMax
311 2 : m.batchSnapshot = base.SeqNumMax
312 2 : m.levels = levels
313 2 : m.heap.cmp = cmp
314 2 : m.split = split
315 2 : m.stats = stats
316 2 : if cap(m.heap.items) < len(levels) {
317 2 : m.heap.items = make([]*mergingIterLevel, 0, len(levels))
318 2 : } else {
319 2 : m.heap.items = m.heap.items[:0]
320 2 : }
321 2 : for l := range m.levels {
322 2 : m.levels[l].index = l
323 2 : }
324 : }
325 :
326 2 : func (m *mergingIter) initHeap() {
327 2 : m.heap.items = m.heap.items[:0]
328 2 : for i := range m.levels {
329 2 : if l := &m.levels[i]; l.iterKV != nil {
330 2 : m.heap.items = append(m.heap.items, l)
331 2 : }
332 : }
333 2 : m.heap.init()
334 : }
335 :
336 2 : func (m *mergingIter) initMinHeap() error {
337 2 : m.dir = 1
338 2 : m.heap.reverse = false
339 2 : m.initHeap()
340 2 : return m.initMinRangeDelIters(-1)
341 2 : }
342 :
343 : // The level of the previous top element was oldTopLevel. Note that all range delete
344 : // iterators < oldTopLevel are positioned past the key of the previous top element and
345 : // the range delete iterator == oldTopLevel is positioned at or past the key of the
346 : // previous top element. We need to position the range delete iterators from oldTopLevel + 1
347 : // to the level of the current top element.
348 2 : func (m *mergingIter) initMinRangeDelIters(oldTopLevel int) error {
349 2 : if m.heap.len() == 0 {
350 2 : return nil
351 2 : }
352 :
353 : // Position the range-del iterators at levels <= m.heap.items[0].index.
354 2 : item := m.heap.items[0]
355 2 : for level := oldTopLevel + 1; level <= item.index; level++ {
356 2 : l := &m.levels[level]
357 2 : if l.rangeDelIter == nil {
358 2 : continue
359 : }
360 2 : var err error
361 2 : l.tombstone, err = l.rangeDelIter.SeekGE(item.iterKV.K.UserKey)
362 2 : if err != nil {
363 0 : return err
364 0 : }
365 : }
366 2 : return nil
367 : }
368 :
369 2 : func (m *mergingIter) initMaxHeap() error {
370 2 : m.dir = -1
371 2 : m.heap.reverse = true
372 2 : m.initHeap()
373 2 : return m.initMaxRangeDelIters(-1)
374 2 : }
375 :
376 : // The level of the previous top element was oldTopLevel. Note that all range delete
377 : // iterators < oldTopLevel are positioned before the key of the previous top element and
378 : // the range delete iterator == oldTopLevel is positioned at or before the key of the
379 : // previous top element. We need to position the range delete iterators from oldTopLevel + 1
380 : // to the level of the current top element.
381 2 : func (m *mergingIter) initMaxRangeDelIters(oldTopLevel int) error {
382 2 : if m.heap.len() == 0 {
383 2 : return nil
384 2 : }
385 : // Position the range-del iterators at levels <= m.heap.items[0].index.
386 2 : item := m.heap.items[0]
387 2 : for level := oldTopLevel + 1; level <= item.index; level++ {
388 2 : l := &m.levels[level]
389 2 : if l.rangeDelIter == nil {
390 2 : continue
391 : }
392 2 : tomb, err := keyspan.SeekLE(m.heap.cmp, l.rangeDelIter, item.iterKV.K.UserKey)
393 2 : if err != nil {
394 0 : return err
395 0 : }
396 2 : l.tombstone = tomb
397 : }
398 2 : return nil
399 : }
400 :
401 2 : func (m *mergingIter) switchToMinHeap() error {
402 2 : if m.heap.len() == 0 {
403 2 : if m.lower != nil {
404 1 : m.SeekGE(m.lower, base.SeekGEFlagsNone)
405 2 : } else {
406 2 : m.First()
407 2 : }
408 2 : return m.err
409 : }
410 :
411 : // We're switching from using a max heap to a min heap. We need to advance
412 : // any iterator that is less than or equal to the current key. Consider the
413 : // scenario where we have 2 iterators being merged (user-key:seq-num):
414 : //
415 : // i1: *a:2 b:2
416 : // i2: a:1 b:1
417 : //
418 : // The current key is a:2 and i2 is pointed at a:1. When we switch to forward
419 : // iteration, we want to return a key that is greater than a:2.
420 :
421 2 : key := m.heap.items[0].iterKV.K
422 2 : cur := m.heap.items[0]
423 2 :
424 2 : for i := range m.levels {
425 2 : l := &m.levels[i]
426 2 : if l == cur {
427 2 : continue
428 : }
429 2 : for l.iterKV = l.iter.Next(); l.iterKV != nil; l.iterKV = l.iter.Next() {
430 2 : if base.InternalCompare(m.heap.cmp, key, l.iterKV.K) < 0 {
431 2 : // key < iter-key
432 2 : break
433 : }
434 : // key >= iter-key
435 : }
436 2 : if l.iterKV == nil {
437 2 : if err := l.iter.Error(); err != nil {
438 1 : return err
439 1 : }
440 : }
441 : }
442 :
443 : // Special handling for the current iterator because we were using its key
444 : // above.
445 2 : cur.iterKV = cur.iter.Next()
446 2 : if cur.iterKV == nil {
447 2 : if err := cur.iter.Error(); err != nil {
448 1 : return err
449 1 : }
450 : }
451 2 : return m.initMinHeap()
452 : }
453 :
454 2 : func (m *mergingIter) switchToMaxHeap() error {
455 2 : if m.heap.len() == 0 {
456 2 : if m.upper != nil {
457 1 : m.SeekLT(m.upper, base.SeekLTFlagsNone)
458 2 : } else {
459 2 : m.Last()
460 2 : }
461 2 : return m.err
462 : }
463 :
464 : // We're switching from using a min heap to a max heap. We need to backup any
465 : // iterator that is greater than or equal to the current key. Consider the
466 : // scenario where we have 2 iterators being merged (user-key:seq-num):
467 : //
468 : // i1: a:2 *b:2
469 : // i2: a:1 b:1
470 : //
471 : // The current key is b:2 and i2 is pointing at b:1. When we switch to
472 : // reverse iteration, we want to return a key that is less than b:2.
473 2 : key := m.heap.items[0].iterKV.K
474 2 : cur := m.heap.items[0]
475 2 :
476 2 : for i := range m.levels {
477 2 : l := &m.levels[i]
478 2 : if l == cur {
479 2 : continue
480 : }
481 :
482 2 : for l.iterKV = l.iter.Prev(); l.iterKV != nil; l.iterKV = l.iter.Prev() {
483 2 : if base.InternalCompare(m.heap.cmp, key, l.iterKV.K) > 0 {
484 2 : // key > iter-key
485 2 : break
486 : }
487 : // key <= iter-key
488 : }
489 2 : if l.iterKV == nil {
490 2 : if err := l.iter.Error(); err != nil {
491 0 : return err
492 0 : }
493 : }
494 : }
495 :
496 : // Special handling for the current iterator because we were using its key
497 : // above.
498 2 : cur.iterKV = cur.iter.Prev()
499 2 : if cur.iterKV == nil {
500 2 : if err := cur.iter.Error(); err != nil {
501 1 : return err
502 1 : }
503 : }
504 2 : return m.initMaxHeap()
505 : }
506 :
507 : // nextEntry unconditionally steps to the next entry. item is the current top
508 : // item in the heap.
509 2 : func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
510 2 : // INVARIANT: If in prefix iteration mode, item.iterKey must have a prefix equal
511 2 : // to m.prefix. This invariant is important for ensuring TrySeekUsingNext
512 2 : // optimizations behave correctly.
513 2 : //
514 2 : // During prefix iteration, the iterator does not have a full view of the
515 2 : // LSM. Some level iterators may omit keys that are known to fall outside
516 2 : // the seek prefix (eg, due to sstable bloom filter exclusion). It's
517 2 : // important that in such cases we don't position any iterators beyond
518 2 : // m.prefix, because doing so may interfere with future seeks.
519 2 : //
520 2 : // Let prefixes P1 < P2 < P3. Imagine a SeekPrefixGE to prefix P1, followed
521 2 : // by a SeekPrefixGE to prefix P2. Imagine there exist live keys at prefix
522 2 : // P2, but they're not visible to the SeekPrefixGE(P1) (because of
523 2 : // bloom-filter exclusion or a range tombstone that deletes prefix P1 but
524 2 : // not P2). If the SeekPrefixGE(P1) is allowed to move any level iterators
525 2 : // to P3, the SeekPrefixGE(P2, TrySeekUsingNext=true) may mistakenly think
526 2 : // the level contains no point keys or range tombstones within the prefix
527 2 : // P2. Care is taken to avoid ever advancing the iterator beyond the current
528 2 : // prefix. If nextEntry is ever invoked while we're already beyond the
529 2 : // current prefix, we're violating the invariant.
530 2 : if invariants.Enabled && m.prefix != nil {
531 2 : if p := m.split.Prefix(l.iterKV.K.UserKey); !bytes.Equal(m.prefix, p) {
532 0 : m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
533 0 : m.prefix, l.iterKV, debug.Stack())
534 0 : }
535 : }
536 :
537 2 : oldTopLevel := l.index
538 2 : oldRangeDelIterGeneration := l.rangeDelIterGeneration
539 2 :
540 2 : if succKey == nil {
541 2 : l.iterKV = l.iter.Next()
542 2 : } else {
543 2 : l.iterKV = l.iter.NextPrefix(succKey)
544 2 : }
545 :
546 2 : if l.iterKV == nil {
547 2 : if err := l.iter.Error(); err != nil {
548 1 : return err
549 1 : }
550 2 : m.heap.pop()
551 2 : } else {
552 2 : if m.prefix != nil && !bytes.Equal(m.prefix, m.split.Prefix(l.iterKV.K.UserKey)) {
553 2 : // Set keys without a matching prefix to their zero values when in prefix
554 2 : // iteration mode and remove iterated level from heap.
555 2 : l.iterKV = nil
556 2 : m.heap.pop()
557 2 : } else if m.heap.len() > 1 {
558 2 : m.heap.fix(0)
559 2 : }
560 2 : if l.rangeDelIterGeneration != oldRangeDelIterGeneration {
561 2 : // The rangeDelIter changed which indicates that the l.iter moved to the
562 2 : // next sstable. We have to update the tombstone for oldTopLevel as well.
563 2 : oldTopLevel--
564 2 : }
565 : }
566 :
567 : // The cached tombstones are only valid for the levels
568 : // [0,oldTopLevel]. Updated the cached tombstones for any levels in the range
569 : // [oldTopLevel+1,heap[0].index].
570 2 : return m.initMinRangeDelIters(oldTopLevel)
571 : }
572 :
573 : // isNextEntryDeleted starts from the current entry (as the next entry) and if
574 : // it is deleted, moves the iterators forward as needed and returns true, else
575 : // it returns false. item is the top item in the heap. If any of the required
576 : // iterator operations error, the error is returned without updating m.err.
577 : //
578 : // During prefix iteration mode, isNextEntryDeleted will exhaust the iterator by
579 : // clearing the heap if the deleted key(s) extend beyond the iteration prefix
580 : // during prefix-iteration mode.
581 2 : func (m *mergingIter) isNextEntryDeleted(item *mergingIterLevel) (bool, error) {
582 2 : // Look for a range deletion tombstone containing item.iterKV at higher
583 2 : // levels (level < item.index). If we find such a range tombstone we know
584 2 : // it deletes the key in the current level. Also look for a range
585 2 : // deletion at the current level (level == item.index). If we find such a
586 2 : // range deletion we need to check whether it is newer than the current
587 2 : // entry.
588 2 : for level := 0; level <= item.index; level++ {
589 2 : l := &m.levels[level]
590 2 : if l.rangeDelIter == nil || l.tombstone == nil {
591 2 : // If l.tombstone is nil, there are no further tombstones
592 2 : // in the current sstable in the current (forward) iteration
593 2 : // direction.
594 2 : continue
595 : }
596 2 : if m.heap.cmp(l.tombstone.End, item.iterKV.K.UserKey) <= 0 {
597 2 : // The current key is at or past the tombstone end key.
598 2 : //
599 2 : // NB: for the case that this l.rangeDelIter is provided by a levelIter we know that
600 2 : // the levelIter must be positioned at a key >= item.iterKV. So it is sufficient to seek the
601 2 : // current l.rangeDelIter (since any range del iterators that will be provided by the
602 2 : // levelIter in the future cannot contain item.iterKV). Also, it is possible that we
603 2 : // will encounter parts of the range delete that should be ignored -- we handle that
604 2 : // below.
605 2 : var err error
606 2 : l.tombstone, err = l.rangeDelIter.SeekGE(item.iterKV.K.UserKey)
607 2 : if err != nil {
608 1 : return false, err
609 1 : }
610 : }
611 2 : if l.tombstone == nil {
612 2 : continue
613 : }
614 :
615 2 : if l.tombstone.VisibleAt(m.snapshot) && m.heap.cmp(l.tombstone.Start, item.iterKV.K.UserKey) <= 0 {
616 2 : if level < item.index {
617 2 : // We could also do m.seekGE(..., level + 1). The levels from
618 2 : // [level + 1, item.index) are already after item.iterKV so seeking them may be
619 2 : // wasteful.
620 2 :
621 2 : // We can seek up to tombstone.End.
622 2 : //
623 2 : // Progress argument: Since this file is at a higher level than item.iterKV we know
624 2 : // that the iterator in this file must be positioned within its bounds and at a key
625 2 : // X > item.iterKV (otherwise it would be the min of the heap). It is not
626 2 : // possible for X.UserKey == item.iterKV.UserKey, since it is incompatible with
627 2 : // X > item.iterKV (a lower version cannot be in a higher sstable), so it must be that
628 2 : // X.UserKey > item.iterKV.UserKey. Which means l.largestUserKey > item.key.UserKey.
629 2 : // We also know that l.tombstone.End > item.iterKV.UserKey. So the min of these,
630 2 : // seekKey, computed below, is > item.iterKV.UserKey, so the call to seekGE() will
631 2 : // make forward progress.
632 2 : m.seekKeyBuf = append(m.seekKeyBuf[:0], l.tombstone.End...)
633 2 : seekKey := m.seekKeyBuf
634 2 : // This seek is not directly due to a SeekGE call, so we don't know
635 2 : // enough about the underlying iterator positions, and so we keep the
636 2 : // try-seek-using-next optimization disabled. Additionally, if we're in
637 2 : // prefix-seek mode and a re-seek would have moved us past the original
638 2 : // prefix, we can remove all merging iter levels below the rangedel
639 2 : // tombstone's level and return immediately instead of re-seeking. This
640 2 : // is correct since those levels cannot provide a key that matches the
641 2 : // prefix, and is also visible. Additionally, this is important to make
642 2 : // subsequent `TrySeekUsingNext` work correctly, as a re-seek on a
643 2 : // different prefix could have resulted in this iterator skipping visible
644 2 : // keys at prefixes in between m.prefix and seekKey, that are currently
645 2 : // not in the heap due to a bloom filter mismatch.
646 2 : //
647 2 : // Additionally, we set the relative-seek flag. This is
648 2 : // important when iterating with lazy combined iteration. If
649 2 : // there's a range key between this level's current file and the
650 2 : // file the seek will land on, we need to detect it in order to
651 2 : // trigger construction of the combined iterator.
652 2 : if m.prefix != nil {
653 2 : if !bytes.Equal(m.prefix, m.split.Prefix(seekKey)) {
654 2 : for i := item.index; i < len(m.levels); i++ {
655 2 : // Remove this level from the heap. Setting iterKV
656 2 : // to nil should be sufficient for initMinHeap to
657 2 : // not re-initialize the heap with them in it. Other
658 2 : // fields in mergingIterLevel can remain as-is; the
659 2 : // iter/rangeDelIter needs to stay intact for future
660 2 : // trySeekUsingNexts to work, the level iter
661 2 : // boundary context is owned by the levelIter which
662 2 : // is not being repositioned, and any tombstones in
663 2 : // these levels will be irrelevant for us anyway.
664 2 : m.levels[i].iterKV = nil
665 2 : }
666 : // TODO(bilal): Consider a more efficient way of removing levels from
667 : // the heap without reinitializing all of it. This would likely
668 : // necessitate tracking the heap positions of each mergingIterHeap
669 : // item in the mergingIterLevel, and then swapping that item in the
670 : // heap with the last-positioned heap item, and shrinking the heap by
671 : // one.
672 2 : if err := m.initMinHeap(); err != nil {
673 0 : return false, err
674 0 : }
675 2 : return true, nil
676 : }
677 : }
678 2 : if err := m.seekGE(seekKey, item.index, base.SeekGEFlagsNone.EnableRelativeSeek()); err != nil {
679 0 : return false, err
680 0 : }
681 2 : return true, nil
682 : }
683 2 : if l.tombstone.CoversAt(m.snapshot, item.iterKV.SeqNum()) {
684 2 : if err := m.nextEntry(item, nil /* succKey */); err != nil {
685 0 : return false, err
686 0 : }
687 2 : return true, nil
688 : }
689 : }
690 : }
691 2 : return false, nil
692 : }
693 :
694 : // Starting from the current entry, finds the first (next) entry that can be returned.
695 : //
696 : // If an error occurs, m.err is updated to hold the error and findNextentry
697 : // returns a nil internal key.
698 2 : func (m *mergingIter) findNextEntry() *base.InternalKV {
699 2 : for m.heap.len() > 0 && m.err == nil {
700 2 : item := m.heap.items[0]
701 2 :
702 2 : // The levelIter internal iterator will interleave exclusive sentinel
703 2 : // keys to keep files open until their range deletions are no longer
704 2 : // necessary. Sometimes these are interleaved with the user key of a
705 2 : // file's largest key, in which case they may simply be stepped over to
706 2 : // move to the next file in the forward direction. Other times they're
707 2 : // interleaved at the user key of the user-iteration boundary, if that
708 2 : // falls within the bounds of a file. In the latter case, there are no
709 2 : // more keys < m.upper, and we can stop iterating.
710 2 : //
711 2 : // We perform a key comparison to differentiate between these two cases.
712 2 : // This key comparison is considered okay because it only happens for
713 2 : // sentinel keys. It may be eliminated after #2863.
714 2 : if m.levels[item.index].iterKV.K.IsExclusiveSentinel() {
715 2 : if m.upper != nil && m.heap.cmp(m.levels[item.index].iterKV.K.UserKey, m.upper) >= 0 {
716 2 : break
717 : }
718 : // This key is the largest boundary of a file and can be skipped now
719 : // that the file's range deletions are no longer relevant.
720 2 : m.err = m.nextEntry(item, nil /* succKey */)
721 2 : if m.err != nil {
722 0 : return nil
723 0 : }
724 2 : continue
725 : }
726 :
727 2 : m.addItemStats(item)
728 2 :
729 2 : // Check if the heap root key is deleted by a range tombstone in a
730 2 : // higher level. If it is, isNextEntryDeleted will advance the iterator
731 2 : // to a later key (through seeking or nexting).
732 2 : isDeleted, err := m.isNextEntryDeleted(item)
733 2 : if err != nil {
734 1 : m.err = err
735 1 : return nil
736 2 : } else if isDeleted {
737 2 : m.stats.PointsCoveredByRangeTombstones++
738 2 : continue
739 : }
740 :
741 : // Check if the key is visible at the iterator sequence numbers.
742 2 : if !item.iterKV.Visible(m.snapshot, m.batchSnapshot) {
743 2 : m.err = m.nextEntry(item, nil /* succKey */)
744 2 : if m.err != nil {
745 0 : return nil
746 0 : }
747 2 : continue
748 : }
749 :
750 : // The heap root is visible and not deleted by any range tombstones.
751 : // Return it.
752 2 : return item.iterKV
753 : }
754 2 : return nil
755 : }
756 :
757 : // Steps to the prev entry. item is the current top item in the heap.
758 2 : func (m *mergingIter) prevEntry(l *mergingIterLevel) error {
759 2 : oldTopLevel := l.index
760 2 : oldRangeDelIterGeneration := l.rangeDelIterGeneration
761 2 : if l.iterKV = l.iter.Prev(); l.iterKV != nil {
762 2 : if m.heap.len() > 1 {
763 2 : m.heap.fix(0)
764 2 : }
765 2 : if l.rangeDelIterGeneration != oldRangeDelIterGeneration && l.rangeDelIter != nil {
766 2 : // The rangeDelIter changed which indicates that the l.iter moved to the
767 2 : // previous sstable. We have to update the tombstone for oldTopLevel as
768 2 : // well.
769 2 : oldTopLevel--
770 2 : }
771 2 : } else {
772 2 : if err := l.iter.Error(); err != nil {
773 1 : return err
774 1 : }
775 2 : m.heap.pop()
776 : }
777 :
778 : // The cached tombstones are only valid for the levels
779 : // [0,oldTopLevel]. Updated the cached tombstones for any levels in the range
780 : // [oldTopLevel+1,heap[0].index].
781 2 : return m.initMaxRangeDelIters(oldTopLevel)
782 : }
783 :
784 : // isPrevEntryDeleted() starts from the current entry (as the prev entry) and if it is deleted,
785 : // moves the iterators backward as needed and returns true, else it returns false. item is the top
786 : // item in the heap.
787 2 : func (m *mergingIter) isPrevEntryDeleted(item *mergingIterLevel) (bool, error) {
788 2 : // Look for a range deletion tombstone containing item.iterKV at higher
789 2 : // levels (level < item.index). If we find such a range tombstone we know
790 2 : // it deletes the key in the current level. Also look for a range
791 2 : // deletion at the current level (level == item.index). If we find such a
792 2 : // range deletion we need to check whether it is newer than the current
793 2 : // entry.
794 2 : for level := 0; level <= item.index; level++ {
795 2 : l := &m.levels[level]
796 2 : if l.rangeDelIter == nil || l.tombstone == nil {
797 2 : // If l.tombstone is nil, there are no further tombstones
798 2 : // in the current sstable in the current (reverse) iteration
799 2 : // direction.
800 2 : continue
801 : }
802 2 : if m.heap.cmp(item.iterKV.K.UserKey, l.tombstone.Start) < 0 {
803 2 : // The current key is before the tombstone start key.
804 2 : //
805 2 : // NB: for the case that this l.rangeDelIter is provided by a levelIter we know that
806 2 : // the levelIter must be positioned at a key < item.iterKV. So it is sufficient to seek the
807 2 : // current l.rangeDelIter (since any range del iterators that will be provided by the
808 2 : // levelIter in the future cannot contain item.iterKV). Also, it is it is possible that we
809 2 : // will encounter parts of the range delete that should be ignored -- we handle that
810 2 : // below.
811 2 :
812 2 : tomb, err := keyspan.SeekLE(m.heap.cmp, l.rangeDelIter, item.iterKV.K.UserKey)
813 2 : if err != nil {
814 0 : return false, err
815 0 : }
816 2 : l.tombstone = tomb
817 : }
818 2 : if l.tombstone == nil {
819 2 : continue
820 : }
821 2 : if l.tombstone.VisibleAt(m.snapshot) && m.heap.cmp(l.tombstone.End, item.iterKV.K.UserKey) > 0 {
822 2 : if level < item.index {
823 2 : // We could also do m.seekLT(..., level + 1). The levels from
824 2 : // [level + 1, item.index) are already before item.iterKV so seeking them may be
825 2 : // wasteful.
826 2 :
827 2 : // We can seek up to tombstone.Start.UserKey.
828 2 : //
829 2 : // Progress argument: We know that the iterator in this file is positioned within
830 2 : // its bounds and at a key X < item.iterKV (otherwise it would be the max of the heap).
831 2 : // So smallestUserKey <= item.iterKV.UserKey and we already know that
832 2 : // l.tombstone.Start.UserKey <= item.iterKV.UserKey. So the seekKey computed below
833 2 : // is <= item.iterKV.UserKey, and since we do a seekLT() we will make backwards
834 2 : // progress.
835 2 : m.seekKeyBuf = append(m.seekKeyBuf[:0], l.tombstone.Start...)
836 2 : seekKey := m.seekKeyBuf
837 2 : // We set the relative-seek flag. This is important when
838 2 : // iterating with lazy combined iteration. If there's a range
839 2 : // key between this level's current file and the file the seek
840 2 : // will land on, we need to detect it in order to trigger
841 2 : // construction of the combined iterator.
842 2 : if err := m.seekLT(seekKey, item.index, base.SeekLTFlagsNone.EnableRelativeSeek()); err != nil {
843 0 : return false, err
844 0 : }
845 2 : return true, nil
846 : }
847 2 : if l.tombstone.CoversAt(m.snapshot, item.iterKV.SeqNum()) {
848 2 : if err := m.prevEntry(item); err != nil {
849 0 : return false, err
850 0 : }
851 2 : return true, nil
852 : }
853 : }
854 : }
855 2 : return false, nil
856 : }
857 :
858 : // Starting from the current entry, finds the first (prev) entry that can be returned.
859 : //
860 : // If an error occurs, m.err is updated to hold the error and findNextentry
861 : // returns a nil internal key.
862 2 : func (m *mergingIter) findPrevEntry() *base.InternalKV {
863 2 : for m.heap.len() > 0 && m.err == nil {
864 2 : item := m.heap.items[0]
865 2 :
866 2 : // The levelIter internal iterator will interleave exclusive sentinel
867 2 : // keys to keep files open until their range deletions are no longer
868 2 : // necessary. Sometimes these are interleaved with the user key of a
869 2 : // file's smallest key, in which case they may simply be stepped over to
870 2 : // move to the next file in the backward direction. Other times they're
871 2 : // interleaved at the user key of the user-iteration boundary, if that
872 2 : // falls within the bounds of a file. In the latter case, there are no
873 2 : // more keys ≥ m.lower, and we can stop iterating.
874 2 : //
875 2 : // We perform a key comparison to differentiate between these two cases.
876 2 : // This key comparison is considered okay because it only happens for
877 2 : // sentinel keys. It may be eliminated after #2863.
878 2 : if m.levels[item.index].iterKV.K.IsExclusiveSentinel() {
879 2 : if m.lower != nil && m.heap.cmp(m.levels[item.index].iterKV.K.UserKey, m.lower) <= 0 {
880 2 : break
881 : }
882 : // This key is the smallest boundary of a file and can be skipped
883 : // now that the file's range deletions are no longer relevant.
884 2 : m.err = m.prevEntry(item)
885 2 : if m.err != nil {
886 0 : return nil
887 0 : }
888 2 : continue
889 : }
890 :
891 2 : m.addItemStats(item)
892 2 : if isDeleted, err := m.isPrevEntryDeleted(item); err != nil {
893 0 : m.err = err
894 0 : return nil
895 2 : } else if isDeleted {
896 2 : m.stats.PointsCoveredByRangeTombstones++
897 2 : continue
898 : }
899 2 : if item.iterKV.Visible(m.snapshot, m.batchSnapshot) {
900 2 : return item.iterKV
901 2 : }
902 2 : m.err = m.prevEntry(item)
903 : }
904 2 : return nil
905 : }
906 :
907 : // Seeks levels >= level to >= key. Additionally uses range tombstones to extend the seeks.
908 : //
909 : // If an error occurs, seekGE returns the error without setting m.err.
910 2 : func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) error {
911 2 : // When seeking, we can use tombstones to adjust the key we seek to on each
912 2 : // level. Consider the series of range tombstones:
913 2 : //
914 2 : // 1: a---e
915 2 : // 2: d---h
916 2 : // 3: g---k
917 2 : // 4: j---n
918 2 : // 5: m---q
919 2 : //
920 2 : // If we SeekGE("b") we also find the tombstone "b" resides within in the
921 2 : // first level which is [a,e). Regardless of whether this tombstone deletes
922 2 : // "b" in that level, we know it deletes "b" in all lower levels, so we
923 2 : // adjust the search key in the next level to the tombstone end key "e". We
924 2 : // then SeekGE("e") in the second level and find the corresponding tombstone
925 2 : // [d,h). This process continues and we end up seeking for "h" in the 3rd
926 2 : // level, "k" in the 4th level and "n" in the last level.
927 2 : //
928 2 : // TODO(peter,rangedel): In addition to the above we can delay seeking a
929 2 : // level (and any lower levels) when the current iterator position is
930 2 : // contained within a range tombstone at a higher level.
931 2 :
932 2 : // Deterministically disable the TrySeekUsingNext optimizations sometimes in
933 2 : // invariant builds to encourage the metamorphic tests to surface bugs. Note
934 2 : // that we cannot disable the optimization within individual levels. It must
935 2 : // be disabled for all levels or none. If one lower-level iterator performs
936 2 : // a fresh seek whereas another takes advantage of its current iterator
937 2 : // position, the heap can become inconsistent. Consider the following
938 2 : // example:
939 2 : //
940 2 : // L5: [ [b-c) ] [ d ]*
941 2 : // L6: [ b ] [e]*
942 2 : //
943 2 : // Imagine a SeekGE(a). The [b-c) range tombstone deletes the L6 point key
944 2 : // 'b', resulting in the iterator positioned at d with the heap:
945 2 : //
946 2 : // {L5: d, L6: e}
947 2 : //
948 2 : // A subsequent SeekGE(b) is seeking to a larger key, so the caller may set
949 2 : // TrySeekUsingNext()=true. If the L5 iterator used the TrySeekUsingNext
950 2 : // optimization but the L6 iterator did not, the iterator would have the
951 2 : // heap:
952 2 : //
953 2 : // {L6: b, L5: d}
954 2 : //
955 2 : // Because the L5 iterator has already advanced to the next sstable, the
956 2 : // merging iterator cannot observe the [b-c) range tombstone and will
957 2 : // mistakenly return L6's deleted point key 'b'.
958 2 : if testingDisableSeekOpt(key, uintptr(unsafe.Pointer(m))) && !m.forceEnableSeekOpt {
959 2 : flags = flags.DisableTrySeekUsingNext()
960 2 : }
961 :
962 2 : for ; level < len(m.levels); level++ {
963 2 : if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
964 0 : m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
965 0 : }
966 :
967 2 : l := &m.levels[level]
968 2 : if m.prefix != nil {
969 2 : l.iterKV = l.iter.SeekPrefixGE(m.prefix, key, flags)
970 2 : if l.iterKV != nil {
971 2 : if !bytes.Equal(m.prefix, m.split.Prefix(l.iterKV.K.UserKey)) {
972 2 : // Prevent keys without a matching prefix from being added to the heap by setting
973 2 : // iterKey and iterValue to their zero values before calling initMinHeap.
974 2 : l.iterKV = nil
975 2 : }
976 : }
977 2 : } else {
978 2 : l.iterKV = l.iter.SeekGE(key, flags)
979 2 : }
980 2 : if l.iterKV == nil {
981 2 : if err := l.iter.Error(); err != nil {
982 1 : return err
983 1 : }
984 : }
985 :
986 : // If this level contains overlapping range tombstones, alter the seek
987 : // key accordingly. Caveat: If we're performing lazy-combined iteration,
988 : // we cannot alter the seek key: Range tombstones don't delete range
989 : // keys, and there might exist live range keys within the range
990 : // tombstone's span that need to be observed to trigger a switch to
991 : // combined iteration.
992 2 : if rangeDelIter := l.rangeDelIter; rangeDelIter != nil &&
993 2 : (m.combinedIterState == nil || m.combinedIterState.initialized) {
994 2 : // The level has a range-del iterator. Find the tombstone containing
995 2 : // the search key.
996 2 : var err error
997 2 : l.tombstone, err = rangeDelIter.SeekGE(key)
998 2 : if err != nil {
999 0 : return err
1000 0 : }
1001 2 : if l.tombstone != nil && l.tombstone.VisibleAt(m.snapshot) && m.heap.cmp(l.tombstone.Start, key) <= 0 {
1002 2 : // Based on the containment condition tombstone.End > key, so
1003 2 : // the assignment to key results in a monotonically
1004 2 : // non-decreasing key across iterations of this loop.
1005 2 : //
1006 2 : // The adjustment of key here can only move it to a larger key.
1007 2 : // Since the caller of seekGE guaranteed that the original key
1008 2 : // was greater than or equal to m.lower, the new key will
1009 2 : // continue to be greater than or equal to m.lower.
1010 2 : key = l.tombstone.End
1011 2 : }
1012 : }
1013 : }
1014 2 : return m.initMinHeap()
1015 : }
1016 :
1017 0 : func (m *mergingIter) String() string {
1018 0 : return "merging"
1019 0 : }
1020 :
1021 : // SeekGE implements base.InternalIterator.SeekGE. Note that SeekGE only checks
1022 : // the upper bound. It is up to the caller to ensure that key is greater than
1023 : // or equal to the lower bound.
1024 2 : func (m *mergingIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
1025 2 : m.prefix = nil
1026 2 : m.err = m.seekGE(key, 0 /* start level */, flags)
1027 2 : if m.err != nil {
1028 1 : return nil
1029 1 : }
1030 2 : return m.findNextEntry()
1031 : }
1032 :
1033 : // SeekPrefixGE implements base.InternalIterator.SeekPrefixGE.
1034 2 : func (m *mergingIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
1035 2 : return m.SeekPrefixGEStrict(prefix, key, flags)
1036 2 : }
1037 :
1038 : // SeekPrefixGEStrict implements topLevelIterator.SeekPrefixGEStrict. Note that
1039 : // SeekPrefixGEStrict explicitly checks that the key has a matching prefix.
1040 : func (m *mergingIter) SeekPrefixGEStrict(
1041 : prefix, key []byte, flags base.SeekGEFlags,
1042 2 : ) *base.InternalKV {
1043 2 : m.prefix = prefix
1044 2 : m.err = m.seekGE(key, 0 /* start level */, flags)
1045 2 : if m.err != nil {
1046 1 : return nil
1047 1 : }
1048 :
1049 2 : iterKV := m.findNextEntry()
1050 2 : if invariants.Enabled && iterKV != nil {
1051 2 : if !bytes.Equal(m.prefix, m.split.Prefix(iterKV.K.UserKey)) {
1052 0 : m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
1053 0 : }
1054 : }
1055 2 : return iterKV
1056 : }
1057 :
1058 : // Seeks levels >= level to < key. Additionally uses range tombstones to extend the seeks.
1059 2 : func (m *mergingIter) seekLT(key []byte, level int, flags base.SeekLTFlags) error {
1060 2 : // See the comment in seekGE regarding using tombstones to adjust the seek
1061 2 : // target per level.
1062 2 : m.prefix = nil
1063 2 : for ; level < len(m.levels); level++ {
1064 2 : if invariants.Enabled && m.upper != nil && m.heap.cmp(key, m.upper) > 0 {
1065 0 : m.logger.Fatalf("mergingIter: upper bound violation: %s > %s\n%s", key, m.upper, debug.Stack())
1066 0 : }
1067 :
1068 2 : l := &m.levels[level]
1069 2 : l.iterKV = l.iter.SeekLT(key, flags)
1070 2 : if l.iterKV == nil {
1071 2 : if err := l.iter.Error(); err != nil {
1072 1 : return err
1073 1 : }
1074 : }
1075 :
1076 : // If this level contains overlapping range tombstones, alter the seek
1077 : // key accordingly. Caveat: If we're performing lazy-combined iteration,
1078 : // we cannot alter the seek key: Range tombstones don't delete range
1079 : // keys, and there might exist live range keys within the range
1080 : // tombstone's span that need to be observed to trigger a switch to
1081 : // combined iteration.
1082 2 : if rangeDelIter := l.rangeDelIter; rangeDelIter != nil &&
1083 2 : (m.combinedIterState == nil || m.combinedIterState.initialized) {
1084 2 : // The level has a range-del iterator. Find the tombstone containing
1085 2 : // the search key.
1086 2 : tomb, err := keyspan.SeekLE(m.heap.cmp, rangeDelIter, key)
1087 2 : if err != nil {
1088 0 : return err
1089 0 : }
1090 2 : l.tombstone = tomb
1091 2 : // Since SeekLT is exclusive on `key` and a tombstone's end key is
1092 2 : // also exclusive, a seek key equal to a tombstone's end key still
1093 2 : // enables the seek optimization (Note this is different than the
1094 2 : // check performed by (*keyspan.Span).Contains).
1095 2 : if l.tombstone != nil && l.tombstone.VisibleAt(m.snapshot) &&
1096 2 : m.heap.cmp(key, l.tombstone.End) <= 0 {
1097 2 : // NB: Based on the containment condition
1098 2 : // tombstone.Start.UserKey <= key, so the assignment to key
1099 2 : // results in a monotonically non-increasing key across
1100 2 : // iterations of this loop.
1101 2 : //
1102 2 : // The adjustment of key here can only move it to a smaller key.
1103 2 : // Since the caller of seekLT guaranteed that the original key
1104 2 : // was less than or equal to m.upper, the new key will continue
1105 2 : // to be less than or equal to m.upper.
1106 2 : key = l.tombstone.Start
1107 2 : }
1108 : }
1109 : }
1110 :
1111 2 : return m.initMaxHeap()
1112 : }
1113 :
1114 : // SeekLT implements base.InternalIterator.SeekLT. Note that SeekLT only checks
1115 : // the lower bound. It is up to the caller to ensure that key is less than the
1116 : // upper bound.
1117 2 : func (m *mergingIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
1118 2 : m.prefix = nil
1119 2 : m.err = m.seekLT(key, 0 /* start level */, flags)
1120 2 : if m.err != nil {
1121 1 : return nil
1122 1 : }
1123 2 : return m.findPrevEntry()
1124 : }
1125 :
1126 : // First implements base.InternalIterator.First. Note that First only checks
1127 : // the upper bound. It is up to the caller to ensure that key is greater than
1128 : // or equal to the lower bound (e.g. via a call to SeekGE(lower)).
1129 2 : func (m *mergingIter) First() *base.InternalKV {
1130 2 : m.err = nil // clear cached iteration error
1131 2 : m.prefix = nil
1132 2 : m.heap.items = m.heap.items[:0]
1133 2 : for i := range m.levels {
1134 2 : l := &m.levels[i]
1135 2 : l.iterKV = l.iter.First()
1136 2 : if l.iterKV == nil {
1137 2 : if m.err = l.iter.Error(); m.err != nil {
1138 1 : return nil
1139 1 : }
1140 : }
1141 : }
1142 2 : if m.err = m.initMinHeap(); m.err != nil {
1143 0 : return nil
1144 0 : }
1145 2 : return m.findNextEntry()
1146 : }
1147 :
1148 : // Last implements base.InternalIterator.Last. Note that Last only checks the
1149 : // lower bound. It is up to the caller to ensure that key is less than the
1150 : // upper bound (e.g. via a call to SeekLT(upper))
1151 2 : func (m *mergingIter) Last() *base.InternalKV {
1152 2 : m.err = nil // clear cached iteration error
1153 2 : m.prefix = nil
1154 2 : for i := range m.levels {
1155 2 : l := &m.levels[i]
1156 2 : l.iterKV = l.iter.Last()
1157 2 : if l.iterKV == nil {
1158 2 : if m.err = l.iter.Error(); m.err != nil {
1159 1 : return nil
1160 1 : }
1161 : }
1162 : }
1163 2 : if m.err = m.initMaxHeap(); m.err != nil {
1164 0 : return nil
1165 0 : }
1166 2 : return m.findPrevEntry()
1167 : }
1168 :
1169 2 : func (m *mergingIter) Next() *base.InternalKV {
1170 2 : if m.err != nil {
1171 1 : return nil
1172 1 : }
1173 :
1174 2 : if m.dir != 1 {
1175 2 : if m.err = m.switchToMinHeap(); m.err != nil {
1176 1 : return nil
1177 1 : }
1178 2 : return m.findNextEntry()
1179 : }
1180 :
1181 2 : if m.heap.len() == 0 {
1182 1 : return nil
1183 1 : }
1184 :
1185 : // NB: It's okay to call nextEntry directly even during prefix iteration
1186 : // mode. During prefix iteration mode, we rely on the caller to not call
1187 : // Next if the iterator has already advanced beyond the iteration prefix.
1188 : // See the comment above the base.InternalIterator interface.
1189 2 : if m.err = m.nextEntry(m.heap.items[0], nil /* succKey */); m.err != nil {
1190 1 : return nil
1191 1 : }
1192 :
1193 2 : iterKV := m.findNextEntry()
1194 2 : if invariants.Enabled && m.prefix != nil && iterKV != nil {
1195 2 : if !bytes.Equal(m.prefix, m.split.Prefix(iterKV.K.UserKey)) {
1196 0 : m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
1197 0 : }
1198 : }
1199 2 : return iterKV
1200 : }
1201 :
1202 2 : func (m *mergingIter) NextPrefix(succKey []byte) *base.InternalKV {
1203 2 : if m.dir != 1 {
1204 0 : panic("pebble: cannot switch directions with NextPrefix")
1205 : }
1206 2 : if m.err != nil || m.heap.len() == 0 {
1207 0 : return nil
1208 0 : }
1209 2 : if m.levelsPositioned == nil {
1210 2 : m.levelsPositioned = make([]bool, len(m.levels))
1211 2 : } else {
1212 2 : for i := range m.levelsPositioned {
1213 2 : m.levelsPositioned[i] = false
1214 2 : }
1215 : }
1216 :
1217 : // The heap root necessarily must be positioned at a key < succKey, because
1218 : // NextPrefix was invoked.
1219 2 : root := m.heap.items[0]
1220 2 : if invariants.Enabled && m.heap.cmp((*root).iterKV.K.UserKey, succKey) >= 0 {
1221 0 : m.logger.Fatalf("pebble: invariant violation: NextPrefix(%q) called on merging iterator already positioned at %q",
1222 0 : succKey, (*root).iterKV)
1223 0 : }
1224 : // NB: root is the heap root before we call nextEntry; nextEntry may change
1225 : // the heap root, so we must not `root` to still be the root of the heap, or
1226 : // even to be in the heap if the level's iterator becomes exhausted.
1227 2 : if m.err = m.nextEntry(root, succKey); m.err != nil {
1228 1 : return nil
1229 1 : }
1230 : // We only consider the level to be conclusively positioned at the next
1231 : // prefix if our call to nextEntry did not advance the level onto a range
1232 : // deletion's boundary. Range deletions may have bounds within the prefix
1233 : // that are still surfaced by NextPrefix.
1234 2 : m.levelsPositioned[root.index] = root.iterKV == nil || !root.iterKV.K.IsExclusiveSentinel()
1235 2 :
1236 2 : for m.heap.len() > 0 {
1237 2 : root := m.heap.items[0]
1238 2 : if m.levelsPositioned[root.index] {
1239 2 : // A level we've previously positioned is at the top of the heap, so
1240 2 : // there are no other levels positioned at keys < succKey. We've
1241 2 : // advanced as far as we need to.
1242 2 : break
1243 : }
1244 : // If the current heap root is a sentinel key, we need to skip it.
1245 : // Calling NextPrefix while positioned at a sentinel key is not
1246 : // supported.
1247 2 : if root.iterKV.K.IsExclusiveSentinel() {
1248 1 : if m.err = m.nextEntry(root, nil); m.err != nil {
1249 0 : return nil
1250 0 : }
1251 1 : continue
1252 : }
1253 :
1254 : // Since this level was not the original heap root when NextPrefix was
1255 : // called, we don't know whether this level's current key has the
1256 : // previous prefix or a new one.
1257 2 : if m.heap.cmp(root.iterKV.K.UserKey, succKey) >= 0 {
1258 2 : break
1259 : }
1260 2 : if m.err = m.nextEntry(root, succKey); m.err != nil {
1261 0 : return nil
1262 0 : }
1263 : // We only consider the level to be conclusively positioned at the next
1264 : // prefix if our call to nextEntry did not land onto a range deletion's
1265 : // boundary. Range deletions may have bounds within the prefix that are
1266 : // still surfaced by NextPrefix.
1267 2 : m.levelsPositioned[root.index] = root.iterKV == nil || !root.iterKV.K.IsExclusiveSentinel()
1268 : }
1269 2 : return m.findNextEntry()
1270 : }
1271 :
1272 2 : func (m *mergingIter) Prev() *base.InternalKV {
1273 2 : if m.err != nil {
1274 0 : return nil
1275 0 : }
1276 :
1277 2 : if m.dir != -1 {
1278 2 : if m.prefix != nil {
1279 1 : m.err = errors.New("pebble: unsupported reverse prefix iteration")
1280 1 : return nil
1281 1 : }
1282 2 : if m.err = m.switchToMaxHeap(); m.err != nil {
1283 1 : return nil
1284 1 : }
1285 2 : return m.findPrevEntry()
1286 : }
1287 :
1288 2 : if m.heap.len() == 0 {
1289 1 : return nil
1290 1 : }
1291 2 : if m.err = m.prevEntry(m.heap.items[0]); m.err != nil {
1292 1 : return nil
1293 1 : }
1294 2 : return m.findPrevEntry()
1295 : }
1296 :
1297 2 : func (m *mergingIter) Error() error {
1298 2 : if m.heap.len() == 0 || m.err != nil {
1299 2 : return m.err
1300 2 : }
1301 2 : return m.levels[m.heap.items[0].index].iter.Error()
1302 : }
1303 :
1304 2 : func (m *mergingIter) Close() error {
1305 2 : for i := range m.levels {
1306 2 : iter := m.levels[i].iter
1307 2 : if err := iter.Close(); err != nil && m.err == nil {
1308 0 : m.err = err
1309 0 : }
1310 2 : m.levels[i].setRangeDelIter(nil)
1311 : }
1312 2 : m.levels = nil
1313 2 : m.heap.items = m.heap.items[:0]
1314 2 : return m.err
1315 : }
1316 :
1317 2 : func (m *mergingIter) SetBounds(lower, upper []byte) {
1318 2 : m.prefix = nil
1319 2 : m.lower = lower
1320 2 : m.upper = upper
1321 2 : for i := range m.levels {
1322 2 : m.levels[i].iter.SetBounds(lower, upper)
1323 2 : }
1324 2 : m.heap.clear()
1325 : }
1326 :
1327 1 : func (m *mergingIter) SetContext(ctx context.Context) {
1328 1 : for i := range m.levels {
1329 1 : m.levels[i].iter.SetContext(ctx)
1330 1 : }
1331 : }
1332 :
1333 : // DebugTree is part of the InternalIterator interface.
1334 0 : func (m *mergingIter) DebugTree(tp treeprinter.Node) {
1335 0 : n := tp.Childf("%T(%p)", m, m)
1336 0 : for i := range m.levels {
1337 0 : if iter := m.levels[i].iter; iter != nil {
1338 0 : iter.DebugTree(n)
1339 0 : }
1340 : }
1341 : }
1342 :
1343 0 : func (m *mergingIter) DebugString() string {
1344 0 : var buf bytes.Buffer
1345 0 : sep := ""
1346 0 : for m.heap.len() > 0 {
1347 0 : item := m.heap.pop()
1348 0 : fmt.Fprintf(&buf, "%s%s", sep, item.iterKV.K)
1349 0 : sep = " "
1350 0 : }
1351 0 : var err error
1352 0 : if m.dir == 1 {
1353 0 : err = m.initMinHeap()
1354 0 : } else {
1355 0 : err = m.initMaxHeap()
1356 0 : }
1357 0 : if err != nil {
1358 0 : fmt.Fprintf(&buf, "err=<%s>", err)
1359 0 : }
1360 0 : return buf.String()
1361 : }
1362 :
1363 2 : func (m *mergingIter) ForEachLevelIter(fn func(li *levelIter) bool) {
1364 2 : for _, ml := range m.levels {
1365 2 : if ml.levelIter != nil {
1366 2 : if done := fn(ml.levelIter); done {
1367 2 : break
1368 : }
1369 : }
1370 : }
1371 : }
1372 :
1373 2 : func (m *mergingIter) addItemStats(l *mergingIterLevel) {
1374 2 : m.stats.PointCount++
1375 2 : m.stats.KeyBytes += uint64(len(l.iterKV.K.UserKey))
1376 2 : m.stats.ValueBytes += uint64(l.iterKV.V.InternalLen())
1377 2 : }
1378 :
1379 : var _ internalIterator = &mergingIter{}
|