Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "runtime/debug"
12 : "unsafe"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : )
19 :
20 : type mergingIterLevel struct {
21 : index int
22 : iter internalIterator
23 : // rangeDelIter is set to the range-deletion iterator for the level. When
24 : // configured with a levelIter, this pointer changes as sstable boundaries
25 : // are crossed. See levelIter.initRangeDel and the Range Deletions comment
26 : // below.
27 : rangeDelIter keyspan.FragmentIterator
28 : // iterKey and iterValue cache the current key and value iter are pointed at.
29 : iterKey *InternalKey
30 : iterValue base.LazyValue
31 : // levelIter is non-nil if this level's iter is ultimately backed by a
32 : // *levelIter. The handle in iter may have wrapped the levelIter with
33 : // intermediary internalIterator implementations.
34 : levelIter *levelIter
35 :
36 : // levelIterBoundaryContext's fields are set when using levelIter, in order
37 : // to surface sstable boundary keys and file-level context. See levelIter
38 : // comment and the Range Deletions comment below.
39 : levelIterBoundaryContext
40 :
41 : // tombstone caches the tombstone rangeDelIter is currently pointed at. If
42 : // tombstone is nil, there are no further tombstones within the
43 : // current sstable in the current iterator direction. The cached tombstone is
44 : // only valid for the levels in the range [0,heap[0].index]. This avoids
45 : // positioning tombstones at lower levels which cannot possibly shadow the
46 : // current key.
47 : tombstone *keyspan.Span
48 : }
49 :
50 : type levelIterBoundaryContext struct {
51 : // smallestUserKey and largestUserKey are populated with the smallest and
52 : // largest boundaries of the current file.
53 : smallestUserKey, largestUserKey []byte
54 : // isLargestUserKeyExclusive is set to true when a file's largest boundary
55 : // is an exclusive key, (eg, a range deletion sentinel). If true, the file
56 : // does not contain any keys with the provided user key, and the
57 : // largestUserKey bound is exclusive.
58 : isLargestUserKeyExclusive bool
59 : // isSyntheticIterBoundsKey is set to true iff the key returned by the level
60 : // iterator is a synthetic key derived from the iterator bounds. This is used
61 : // to prevent the mergingIter from being stuck at such a synthetic key if it
62 : // becomes the top element of the heap. When used with a user-facing Iterator,
63 : // the only range deletions exposed by this mergingIter should be those with
64 : // `isSyntheticIterBoundsKey || isIgnorableBoundaryKey`.
65 : isSyntheticIterBoundsKey bool
66 : // isIgnorableBoundaryKey is set to true iff the key returned by the level
67 : // iterator is a file boundary key that should be ignored when returning to
68 : // the parent iterator. File boundary keys are used by the level iter to
69 : // keep a levelIter file's range deletion iterator open as long as other
70 : // levels within the merging iterator require it. When used with a user-facing
71 : // Iterator, the only range deletions exposed by this mergingIter should be
72 : // those with `isSyntheticIterBoundsKey || isIgnorableBoundaryKey`.
73 : isIgnorableBoundaryKey bool
74 : }
75 :
76 : // mergingIter provides a merged view of multiple iterators from different
77 : // levels of the LSM.
78 : //
79 : // The core of a mergingIter is a heap of internalIterators (see
80 : // mergingIterHeap). The heap can operate as either a min-heap, used during
81 : // forward iteration (First, SeekGE, Next) or a max-heap, used during reverse
82 : // iteration (Last, SeekLT, Prev). The heap is initialized in calls to First,
83 : // Last, SeekGE, and SeekLT. A call to Next or Prev takes the current top
84 : // element on the heap, advances its iterator, and then "fixes" the heap
85 : // property. When one of the child iterators is exhausted during Next/Prev
86 : // iteration, it is removed from the heap.
87 : //
88 : // # Range Deletions
89 : //
90 : // A mergingIter can optionally be configured with a slice of range deletion
91 : // iterators. The range deletion iterator slice must exactly parallel the point
92 : // iterators and the range deletion iterator must correspond to the same level
93 : // in the LSM as the point iterator. Note that each memtable and each table in
94 : // L0 is a different "level" from the mergingIter perspective. So level 0 below
95 : // does not correspond to L0 in the LSM.
96 : //
97 : // A range deletion iterator iterates over fragmented range tombstones. Range
98 : // tombstones are fragmented by splitting them at any overlapping points. This
99 : // fragmentation guarantees that within an sstable tombstones will either be
100 : // distinct or will have identical start and end user keys. While range
101 : // tombstones are fragmented within an sstable, the start and end keys are not truncated
102 : // to sstable boundaries. This is necessary because the tombstone end key is
103 : // exclusive and does not have a sequence number. Consider an sstable
104 : // containing the range tombstone [a,c)#9 and the key "b#8". The tombstone must
105 : // delete "b#8", yet older versions of "b" might spill over to the next
106 : // sstable. So the boundary key for this sstable must be "b#8". Adjusting the
107 : // end key of tombstones to be optionally inclusive or contain a sequence
108 : // number would be possible solutions (such solutions have potentially serious
109 : // issues: tombstones have exclusive end keys since an inclusive deletion end can
110 : // be converted to an exclusive one while the reverse transformation is not possible;
111 : // the semantics of a sequence number for the end key of a range tombstone are murky).
112 : //
113 : // The approach taken here performs an
114 : // implicit truncation of the tombstone to the sstable boundaries.
115 : //
116 : // During initialization of a mergingIter, the range deletion iterators for
117 : // batches, memtables, and L0 tables are populated up front. Note that Batches
118 : // and memtables index unfragmented tombstones. Batch.newRangeDelIter() and
119 : // memTable.newRangeDelIter() fragment and cache the tombstones on demand. The
120 : // L1-L6 range deletion iterators are populated by levelIter. When configured
121 : // to load range deletion iterators, whenever a levelIter loads a table it
122 : // loads both the point iterator and the range deletion
123 : // iterator. levelIter.rangeDelIter is configured to point to the right entry
124 : // in mergingIter.levels. The effect of this setup is that
125 : // mergingIter.levels[i].rangeDelIter always contains the fragmented range
126 : // tombstone for the current table in level i that the levelIter has open.
127 : //
128 : // Another crucial mechanism of levelIter is that it materializes fake point
129 : // entries for the table boundaries if the boundary is range deletion
130 : // key. Consider a table that contains only a range tombstone [a-e)#10. The
131 : // sstable boundaries for this table will be a#10,15 and
132 : // e#72057594037927935,15. During forward iteration levelIter will return
133 : // e#72057594037927935,15 as a key. During reverse iteration levelIter will
134 : // return a#10,15 as a key. These sentinel keys act as bookends to point
135 : // iteration and allow mergingIter to keep a table and its associated range
136 : // tombstones loaded as long as there are keys at lower levels that are within
137 : // the bounds of the table.
138 : //
139 : // The final piece to the range deletion puzzle is the LSM invariant that for a
140 : // given key K newer versions of K can only exist earlier in the level, or at
141 : // higher levels of the tree. For example, if K#4 exists in L3, k#5 can only
142 : // exist earlier in the L3 or in L0, L1, L2 or a memtable. Get very explicitly
143 : // uses this invariant to find the value for a key by walking the LSM level by
144 : // level. For range deletions, this invariant means that a range deletion at
145 : // level N will necessarily shadow any keys within its bounds in level Y where
146 : // Y > N. One wrinkle to this statement is that it only applies to keys that
147 : // lie within the sstable bounds as well, but we get that guarantee due to the
148 : // way the range deletion iterator and point iterator are bound together by a
149 : // levelIter.
150 : //
151 : // Tying the above all together, we get a picture where each level (index in
152 : // mergingIter.levels) is composed of both point operations (pX) and range
153 : // deletions (rX). The range deletions for level X shadow both the point
154 : // operations and range deletions for level Y where Y > X allowing mergingIter
155 : // to skip processing entries in that shadow. For example, consider the
156 : // scenario:
157 : //
158 : // r0: a---e
159 : // r1: d---h
160 : // r2: g---k
161 : // r3: j---n
162 : // r4: m---q
163 : //
164 : // This is showing 5 levels of range deletions. Consider what happens upon
165 : // SeekGE("b"). We first seek the point iterator for level 0 (the point values
166 : // are not shown above) and we then seek the range deletion iterator. That
167 : // returns the tombstone [a,e). This tombstone tells us that all keys in the
168 : // range [a,e) in lower levels are deleted so we can skip them. So we can
169 : // adjust the seek key to "e", the tombstone end key. For level 1 we seek to
170 : // "e" and find the range tombstone [d,h) and similar logic holds. By the time
171 : // we get to level 4 we're seeking to "n".
172 : //
173 : // One consequence of not truncating tombstone end keys to sstable boundaries
174 : // is the seeking process described above cannot always seek to the tombstone
175 : // end key in the older level. For example, imagine in the above example r3 is
176 : // a partitioned level (i.e., L1+ in our LSM), and the sstable containing [j,
177 : // n) has "k" as its upper boundary. In this situation, compactions involving
178 : // keys at or after "k" can output those keys to r4+, even if they're newer
179 : // than our tombstone [j, n). So instead of seeking to "n" in r4 we can only
180 : // seek to "k". To achieve this, the instance variable `largestUserKey.`
181 : // maintains the upper bounds of the current sstables in the partitioned
182 : // levels. In this example, `levels[3].largestUserKey` holds "k", telling us to
183 : // limit the seek triggered by a tombstone in r3 to "k".
184 : //
185 : // During actual iteration levels can contain both point operations and range
186 : // deletions. Within a level, when a range deletion contains a point operation
187 : // the sequence numbers must be checked to determine if the point operation is
188 : // newer or older than the range deletion tombstone. The mergingIter maintains
189 : // the invariant that the range deletion iterators for all levels newer that
190 : // the current iteration key (L < m.heap.items[0].index) are positioned at the
191 : // next (or previous during reverse iteration) range deletion tombstone. We
192 : // know those levels don't contain a range deletion tombstone that covers the
193 : // current key because if they did the current key would be deleted. The range
194 : // deletion iterator for the current key's level is positioned at a range
195 : // tombstone covering or past the current key. The position of all of other
196 : // range deletion iterators is unspecified. Whenever a key from those levels
197 : // becomes the current key, their range deletion iterators need to be
198 : // positioned. This lazy positioning avoids seeking the range deletion
199 : // iterators for keys that are never considered. (A similar bit of lazy
200 : // evaluation can be done for the point iterators, but is still TBD).
201 : //
202 : // For a full example, consider the following setup:
203 : //
204 : // p0: o
205 : // r0: m---q
206 : //
207 : // p1: n p
208 : // r1: g---k
209 : //
210 : // p2: b d i
211 : // r2: a---e q----v
212 : //
213 : // p3: e
214 : // r3:
215 : //
216 : // If we start iterating from the beginning, the first key we encounter is "b"
217 : // in p2. When the mergingIter is pointing at a valid entry, the range deletion
218 : // iterators for all of the levels < m.heap.items[0].index are positioned at
219 : // the next range tombstone past the current key. So r0 will point at [m,q) and
220 : // r1 at [g,k). When the key "b" is encountered, we check to see if the current
221 : // tombstone for r0 or r1 contains it, and whether the tombstone for r2, [a,e),
222 : // contains and is newer than "b".
223 : //
224 : // Advancing the iterator finds the next key at "d". This is in the same level
225 : // as the previous key "b" so we don't have to reposition any of the range
226 : // deletion iterators, but merely check whether "d" is now contained by any of
227 : // the range tombstones at higher levels or has stepped past the range
228 : // tombstone in its own level or higher levels. In this case, there is nothing to be done.
229 : //
230 : // Advancing the iterator again finds "e". Since "e" comes from p3, we have to
231 : // position the r3 range deletion iterator, which is empty. "e" is past the r2
232 : // tombstone of [a,e) so we need to advance the r2 range deletion iterator to
233 : // [q,v).
234 : //
235 : // The next key is "i". Because this key is in p2, a level above "e", we don't
236 : // have to reposition any range deletion iterators and instead see that "i" is
237 : // covered by the range tombstone [g,k). The iterator is immediately advanced
238 : // to "n" which is covered by the range tombstone [m,q) causing the iterator to
239 : // advance to "o" which is visible.
240 : //
241 : // TODO(peter,rangedel): For testing, advance the iterator through various
242 : // scenarios and have each step display the current state (i.e. the current
243 : // heap and range-del iterator positioning).
244 : type mergingIter struct {
245 : logger Logger
246 : split Split
247 : dir int
248 : snapshot uint64
249 : batchSnapshot uint64
250 : levels []mergingIterLevel
251 : heap mergingIterHeap
252 : err error
253 : prefix []byte
254 : lower []byte
255 : upper []byte
256 : stats *InternalIteratorStats
257 :
258 : // levelsPositioned, if non-nil, is a slice of the same length as levels.
259 : // It's used by NextPrefix to record which levels have already been
260 : // repositioned. It's created lazily by the first call to NextPrefix.
261 : levelsPositioned []bool
262 :
263 : combinedIterState *combinedIterState
264 :
265 : // Used in some tests to disable the random disabling of seek optimizations.
266 : forceEnableSeekOpt bool
267 : }
268 :
269 : // mergingIter implements the base.InternalIterator interface.
270 : var _ base.InternalIterator = (*mergingIter)(nil)
271 :
272 : // newMergingIter returns an iterator that merges its input. Walking the
273 : // resultant iterator will return all key/value pairs of all input iterators
274 : // in strictly increasing key order, as defined by cmp. It is permissible to
275 : // pass a nil split parameter if the caller is never going to call
276 : // SeekPrefixGE.
277 : //
278 : // The input's key ranges may overlap, but there are assumed to be no duplicate
279 : // keys: if iters[i] contains a key k then iters[j] will not contain that key k.
280 : //
281 : // None of the iters may be nil.
282 : func newMergingIter(
283 : logger Logger,
284 : stats *base.InternalIteratorStats,
285 : cmp Compare,
286 : split Split,
287 : iters ...internalIterator,
288 2 : ) *mergingIter {
289 2 : m := &mergingIter{}
290 2 : levels := make([]mergingIterLevel, len(iters))
291 2 : for i := range levels {
292 2 : levels[i].iter = iters[i]
293 2 : }
294 2 : m.init(&IterOptions{logger: logger}, stats, cmp, split, levels...)
295 2 : return m
296 : }
297 :
298 : func (m *mergingIter) init(
299 : opts *IterOptions,
300 : stats *base.InternalIteratorStats,
301 : cmp Compare,
302 : split Split,
303 : levels ...mergingIterLevel,
304 2 : ) {
305 2 : m.err = nil // clear cached iteration error
306 2 : m.logger = opts.getLogger()
307 2 : if opts != nil {
308 2 : m.lower = opts.LowerBound
309 2 : m.upper = opts.UpperBound
310 2 : }
311 2 : m.snapshot = InternalKeySeqNumMax
312 2 : m.batchSnapshot = InternalKeySeqNumMax
313 2 : m.levels = levels
314 2 : m.heap.cmp = cmp
315 2 : m.split = split
316 2 : m.stats = stats
317 2 : if cap(m.heap.items) < len(levels) {
318 2 : m.heap.items = make([]*mergingIterLevel, 0, len(levels))
319 2 : } else {
320 2 : m.heap.items = m.heap.items[:0]
321 2 : }
322 2 : for l := range m.levels {
323 2 : m.levels[l].index = l
324 2 : }
325 : }
326 :
327 2 : func (m *mergingIter) initHeap() {
328 2 : m.heap.items = m.heap.items[:0]
329 2 : for i := range m.levels {
330 2 : if l := &m.levels[i]; l.iterKey != nil {
331 2 : m.heap.items = append(m.heap.items, l)
332 2 : } else {
333 2 : m.err = firstError(m.err, l.iter.Error())
334 2 : if m.err != nil {
335 1 : return
336 1 : }
337 : }
338 : }
339 2 : m.heap.init()
340 : }
341 :
342 2 : func (m *mergingIter) initMinHeap() {
343 2 : m.dir = 1
344 2 : m.heap.reverse = false
345 2 : m.initHeap()
346 2 : m.initMinRangeDelIters(-1)
347 2 : }
348 :
349 : // The level of the previous top element was oldTopLevel. Note that all range delete
350 : // iterators < oldTopLevel are positioned past the key of the previous top element and
351 : // the range delete iterator == oldTopLevel is positioned at or past the key of the
352 : // previous top element. We need to position the range delete iterators from oldTopLevel + 1
353 : // to the level of the current top element.
354 2 : func (m *mergingIter) initMinRangeDelIters(oldTopLevel int) {
355 2 : if m.heap.len() == 0 {
356 2 : return
357 2 : }
358 :
359 : // Position the range-del iterators at levels <= m.heap.items[0].index.
360 2 : item := m.heap.items[0]
361 2 : for level := oldTopLevel + 1; level <= item.index; level++ {
362 2 : l := &m.levels[level]
363 2 : if l.rangeDelIter == nil {
364 2 : continue
365 : }
366 2 : l.tombstone = l.rangeDelIter.SeekGE(item.iterKey.UserKey)
367 : }
368 : }
369 :
370 2 : func (m *mergingIter) initMaxHeap() {
371 2 : m.dir = -1
372 2 : m.heap.reverse = true
373 2 : m.initHeap()
374 2 : m.initMaxRangeDelIters(-1)
375 2 : }
376 :
377 : // The level of the previous top element was oldTopLevel. Note that all range delete
378 : // iterators < oldTopLevel are positioned before the key of the previous top element and
379 : // the range delete iterator == oldTopLevel is positioned at or before the key of the
380 : // previous top element. We need to position the range delete iterators from oldTopLevel + 1
381 : // to the level of the current top element.
382 2 : func (m *mergingIter) initMaxRangeDelIters(oldTopLevel int) {
383 2 : if m.heap.len() == 0 {
384 2 : return
385 2 : }
386 : // Position the range-del iterators at levels <= m.heap.items[0].index.
387 2 : item := m.heap.items[0]
388 2 : for level := oldTopLevel + 1; level <= item.index; level++ {
389 2 : l := &m.levels[level]
390 2 : if l.rangeDelIter == nil {
391 2 : continue
392 : }
393 2 : l.tombstone = keyspan.SeekLE(m.heap.cmp, l.rangeDelIter, item.iterKey.UserKey)
394 : }
395 : }
396 :
397 2 : func (m *mergingIter) switchToMinHeap() {
398 2 : if m.heap.len() == 0 {
399 2 : if m.lower != nil {
400 2 : m.SeekGE(m.lower, base.SeekGEFlagsNone)
401 2 : } else {
402 2 : m.First()
403 2 : }
404 2 : return
405 : }
406 :
407 : // We're switching from using a max heap to a min heap. We need to advance
408 : // any iterator that is less than or equal to the current key. Consider the
409 : // scenario where we have 2 iterators being merged (user-key:seq-num):
410 : //
411 : // i1: *a:2 b:2
412 : // i2: a:1 b:1
413 : //
414 : // The current key is a:2 and i2 is pointed at a:1. When we switch to forward
415 : // iteration, we want to return a key that is greater than a:2.
416 :
417 2 : key := m.heap.items[0].iterKey
418 2 : cur := m.heap.items[0]
419 2 :
420 2 : for i := range m.levels {
421 2 : l := &m.levels[i]
422 2 : if l == cur {
423 2 : continue
424 : }
425 :
426 : // If the iterator is exhausted, it may be out of bounds if range
427 : // deletions modified our search key as we descended. we need to
428 : // reposition it within the search bounds. If the current key is a
429 : // range tombstone, the iterator might still be exhausted but at a
430 : // sstable boundary sentinel. It would be okay to reposition an
431 : // interator like this only through successive Next calls, except that
432 : // it would violate the levelIter's invariants by causing it to return
433 : // a key before the lower bound.
434 : //
435 : // bounds = [ f, _ )
436 : // L0: [ b ] [ f* z ]
437 : // L1: [ a |----| k y ]
438 : // L2: [ c (d) ] [ e g m ]
439 : // L3: [ x ]
440 : //
441 : // * - current key [] - table bounds () - heap item
442 : //
443 : // In the above diagram, the L2 iterator is positioned at a sstable
444 : // boundary (d) outside the lower bound (f). It arrived here from a
445 : // seek whose seek-key was modified by a range tombstone. If we called
446 : // Next on the L2 iterator, it would return e, violating its lower
447 : // bound. Instead, we seek it to >= f and Next from there.
448 :
449 2 : if l.iterKey == nil || (m.lower != nil && l.isSyntheticIterBoundsKey &&
450 2 : l.iterKey.IsExclusiveSentinel() &&
451 2 : m.heap.cmp(l.iterKey.UserKey, m.lower) <= 0) {
452 2 : if m.lower != nil {
453 2 : l.iterKey, l.iterValue = l.iter.SeekGE(m.lower, base.SeekGEFlagsNone)
454 2 : } else {
455 2 : l.iterKey, l.iterValue = l.iter.First()
456 2 : }
457 : }
458 2 : for ; l.iterKey != nil; l.iterKey, l.iterValue = l.iter.Next() {
459 2 : if base.InternalCompare(m.heap.cmp, *key, *l.iterKey) < 0 {
460 2 : // key < iter-key
461 2 : break
462 : }
463 : // key >= iter-key
464 : }
465 : }
466 :
467 : // Special handling for the current iterator because we were using its key
468 : // above. The iterator cur.iter may still be exhausted at a sstable boundary
469 : // sentinel. Similar to the logic applied to the other levels, in these
470 : // cases we seek the iterator to the first key in order to avoid violating
471 : // levelIter's invariants. See the example in the for loop above.
472 2 : if m.lower != nil && cur.isSyntheticIterBoundsKey && cur.iterKey.IsExclusiveSentinel() &&
473 2 : m.heap.cmp(cur.iterKey.UserKey, m.lower) <= 0 {
474 2 : cur.iterKey, cur.iterValue = cur.iter.SeekGE(m.lower, base.SeekGEFlagsNone)
475 2 : } else {
476 2 : cur.iterKey, cur.iterValue = cur.iter.Next()
477 2 : }
478 2 : m.initMinHeap()
479 : }
480 :
481 2 : func (m *mergingIter) switchToMaxHeap() {
482 2 : if m.heap.len() == 0 {
483 2 : if m.upper != nil {
484 2 : m.SeekLT(m.upper, base.SeekLTFlagsNone)
485 2 : } else {
486 2 : m.Last()
487 2 : }
488 2 : return
489 : }
490 :
491 : // We're switching from using a min heap to a max heap. We need to backup any
492 : // iterator that is greater than or equal to the current key. Consider the
493 : // scenario where we have 2 iterators being merged (user-key:seq-num):
494 : //
495 : // i1: a:2 *b:2
496 : // i2: a:1 b:1
497 : //
498 : // The current key is b:2 and i2 is pointing at b:1. When we switch to
499 : // reverse iteration, we want to return a key that is less than b:2.
500 2 : key := m.heap.items[0].iterKey
501 2 : cur := m.heap.items[0]
502 2 :
503 2 : for i := range m.levels {
504 2 : l := &m.levels[i]
505 2 : if l == cur {
506 2 : continue
507 : }
508 :
509 : // If the iterator is exhausted, it may be out of bounds if range
510 : // deletions modified our search key as we descended. we need to
511 : // reposition it within the search bounds. If the current key is a
512 : // range tombstone, the iterator might still be exhausted but at a
513 : // sstable boundary sentinel. It would be okay to reposition an
514 : // interator like this only through successive Prev calls, except that
515 : // it would violate the levelIter's invariants by causing it to return
516 : // a key beyond the upper bound.
517 : //
518 : // bounds = [ _, g )
519 : // L0: [ b ] [ f* z ]
520 : // L1: [ a |-------| k y ]
521 : // L2: [ c d ] h [(i) m ]
522 : // L3: [ e x ]
523 : //
524 : // * - current key [] - table bounds () - heap item
525 : //
526 : // In the above diagram, the L2 iterator is positioned at a sstable
527 : // boundary (i) outside the upper bound (g). It arrived here from a
528 : // seek whose seek-key was modified by a range tombstone. If we called
529 : // Prev on the L2 iterator, it would return h, violating its upper
530 : // bound. Instead, we seek it to < g, and Prev from there.
531 :
532 2 : if l.iterKey == nil || (m.upper != nil && l.isSyntheticIterBoundsKey &&
533 2 : l.iterKey.IsExclusiveSentinel() && m.heap.cmp(l.iterKey.UserKey, m.upper) >= 0) {
534 2 : if m.upper != nil {
535 2 : l.iterKey, l.iterValue = l.iter.SeekLT(m.upper, base.SeekLTFlagsNone)
536 2 : } else {
537 2 : l.iterKey, l.iterValue = l.iter.Last()
538 2 : }
539 : }
540 2 : for ; l.iterKey != nil; l.iterKey, l.iterValue = l.iter.Prev() {
541 2 : if base.InternalCompare(m.heap.cmp, *key, *l.iterKey) > 0 {
542 2 : // key > iter-key
543 2 : break
544 : }
545 : // key <= iter-key
546 : }
547 : }
548 :
549 : // Special handling for the current iterator because we were using its key
550 : // above. The iterator cur.iter may still be exhausted at a sstable boundary
551 : // sentinel. Similar to the logic applied to the other levels, in these
552 : // cases we seek the iterator to in order to avoid violating levelIter's
553 : // invariants by Prev-ing through files. See the example in the for loop
554 : // above.
555 2 : if m.upper != nil && cur.isSyntheticIterBoundsKey && cur.iterKey.IsExclusiveSentinel() &&
556 2 : m.heap.cmp(cur.iterKey.UserKey, m.upper) >= 0 {
557 2 : cur.iterKey, cur.iterValue = cur.iter.SeekLT(m.upper, base.SeekLTFlagsNone)
558 2 : } else {
559 2 : cur.iterKey, cur.iterValue = cur.iter.Prev()
560 2 : }
561 2 : m.initMaxHeap()
562 : }
563 :
564 : // maybeNextEntryWithinPrefix steps to the next entry, as long as the iteration
565 : // prefix has not already been exceeded. If it has, it exhausts the iterator by
566 : // resetting the heap to empty.
567 2 : func (m *mergingIter) maybeNextEntryWithinPrefix(l *mergingIterLevel) {
568 2 : if s := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:s]) {
569 2 : // The item at the root of the heap already exceeds the iteration
570 2 : // prefix. We should not advance any more. Clear the heap to reflect
571 2 : // that the iterator is now exhausted (within this prefix, at
572 2 : // least).
573 2 : m.heap.items = m.heap.items[:0]
574 2 : return
575 2 : }
576 2 : m.nextEntry(l, nil /* succKey */)
577 : }
578 :
579 : // nextEntry unconditionally steps to the next entry. item is the current top
580 : // item in the heap.
581 : //
582 : // nextEntry should be called directly when not in prefix-iteration mode, or by
583 : // Next. During prefix iteration mode, all other callers should use
584 : // maybeNextEntryWithinPrefix which will avoid advancing the iterator if the
585 : // current iteration prefix has been exhausted. See the comment within
586 : // nextEntry's body for an explanation of why other callers should call
587 : // maybeNextEntryWithinPrefix, which will ensure the documented invariant is
588 : // preserved.
589 2 : func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) {
590 2 : // INVARIANT: If in prefix iteration mode, item.iterKey must have a prefix equal
591 2 : // to m.prefix. This invariant is important for ensuring TrySeekUsingNext
592 2 : // optimizations behave correctly.
593 2 : //
594 2 : // During prefix iteration, the iterator does not have a full view of the
595 2 : // LSM. Some level iterators may omit keys that are known to fall outside
596 2 : // the seek prefix (eg, due to sstable bloom filter exclusion). It's
597 2 : // important that in such cases we don't position any iterators beyond
598 2 : // m.prefix, because doing so may interfere with future seeks.
599 2 : //
600 2 : // Let prefixes P1 < P2 < P3. Imagine a SeekPrefixGE to prefix P1, followed
601 2 : // by a SeekPrefixGE to prefix P2. Imagine there exist live keys at prefix
602 2 : // P2, but they're not visible to the SeekPrefixGE(P1) (because of
603 2 : // bloom-filter exclusion or a range tombstone that deletes prefix P1 but
604 2 : // not P2). If the SeekPrefixGE(P1) is allowed to move any level iterators
605 2 : // to P3, the SeekPrefixGE(P2, TrySeekUsingNext=true) may mistakenly think
606 2 : // the level contains no point keys or range tombstones within the prefix
607 2 : // P2. Care is taken to avoid ever advancing the iterator beyond the current
608 2 : // prefix. If nextEntry is ever invoked while we're already beyond the
609 2 : // current prefix, we're violating the invariant.
610 2 : if invariants.Enabled && m.prefix != nil {
611 2 : if s := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:s]) {
612 0 : m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
613 0 : m.prefix, l.iterKey, debug.Stack())
614 0 : }
615 : }
616 :
617 2 : oldTopLevel := l.index
618 2 : oldRangeDelIter := l.rangeDelIter
619 2 :
620 2 : if succKey == nil {
621 2 : l.iterKey, l.iterValue = l.iter.Next()
622 2 : } else {
623 2 : l.iterKey, l.iterValue = l.iter.NextPrefix(succKey)
624 2 : }
625 :
626 2 : if l.iterKey != nil {
627 2 : if m.heap.len() > 1 {
628 2 : m.heap.fix(0)
629 2 : }
630 2 : if l.rangeDelIter != oldRangeDelIter {
631 2 : // The rangeDelIter changed which indicates that the l.iter moved to the
632 2 : // next sstable. We have to update the tombstone for oldTopLevel as well.
633 2 : oldTopLevel--
634 2 : }
635 2 : } else {
636 2 : m.err = l.iter.Error()
637 2 : if m.err == nil {
638 2 : m.heap.pop()
639 2 : }
640 : }
641 :
642 : // The cached tombstones are only valid for the levels
643 : // [0,oldTopLevel]. Updated the cached tombstones for any levels in the range
644 : // [oldTopLevel+1,heap[0].index].
645 2 : m.initMinRangeDelIters(oldTopLevel)
646 : }
647 :
648 : // isNextEntryDeleted starts from the current entry (as the next entry) and if
649 : // it is deleted, moves the iterators forward as needed and returns true, else
650 : // it returns false. item is the top item in the heap.
651 : //
652 : // During prefix iteration mode, isNextEntryDeleted will exhaust the iterator by
653 : // clearing the heap if the deleted key(s) extend beyond the iteration prefix
654 : // during prefix-iteration mode.
655 2 : func (m *mergingIter) isNextEntryDeleted(item *mergingIterLevel) bool {
656 2 : // Look for a range deletion tombstone containing item.iterKey at higher
657 2 : // levels (level < item.index). If we find such a range tombstone we know
658 2 : // it deletes the key in the current level. Also look for a range
659 2 : // deletion at the current level (level == item.index). If we find such a
660 2 : // range deletion we need to check whether it is newer than the current
661 2 : // entry.
662 2 : for level := 0; level <= item.index; level++ {
663 2 : l := &m.levels[level]
664 2 : if l.rangeDelIter == nil || l.tombstone == nil {
665 2 : // If l.tombstone is nil, there are no further tombstones
666 2 : // in the current sstable in the current (forward) iteration
667 2 : // direction.
668 2 : continue
669 : }
670 2 : if m.heap.cmp(l.tombstone.End, item.iterKey.UserKey) <= 0 {
671 2 : // The current key is at or past the tombstone end key.
672 2 : //
673 2 : // NB: for the case that this l.rangeDelIter is provided by a levelIter we know that
674 2 : // the levelIter must be positioned at a key >= item.iterKey. So it is sufficient to seek the
675 2 : // current l.rangeDelIter (since any range del iterators that will be provided by the
676 2 : // levelIter in the future cannot contain item.iterKey). Also, it is possible that we
677 2 : // will encounter parts of the range delete that should be ignored -- we handle that
678 2 : // below.
679 2 : l.tombstone = l.rangeDelIter.SeekGE(item.iterKey.UserKey)
680 2 : }
681 2 : if l.tombstone == nil {
682 2 : continue
683 : }
684 :
685 : // Reasoning for correctness of untruncated tombstone handling when the untruncated
686 : // tombstone is at a higher level:
687 : // The iterator corresponding to this tombstone is still in the heap so it must be
688 : // positioned >= item.iterKey. Which means the Largest key bound of the sstable containing this
689 : // tombstone is >= item.iterKey. So the upper limit of this tombstone cannot be file-bounds-constrained
690 : // to < item.iterKey. But it is possible that item.key < smallestUserKey, in which
691 : // case this tombstone should be ignored.
692 : //
693 : // Example 1:
694 : // sstable bounds [c#8, g#12] containing a tombstone [b, i)#7, and key is c#6. The
695 : // smallestUserKey is c, so we know the key is within the file bounds and the tombstone
696 : // [b, i) covers it.
697 : //
698 : // Example 2:
699 : // Same sstable bounds but key is b#10. The smallestUserKey is c, so the tombstone [b, i)
700 : // does not cover this key.
701 : //
702 : // For a tombstone at the same level as the key, the file bounds are trivially satisfied.
703 2 : if (l.smallestUserKey == nil || m.heap.cmp(l.smallestUserKey, item.iterKey.UserKey) <= 0) &&
704 2 : l.tombstone.VisibleAt(m.snapshot) && l.tombstone.Contains(m.heap.cmp, item.iterKey.UserKey) {
705 2 : if level < item.index {
706 2 : // We could also do m.seekGE(..., level + 1). The levels from
707 2 : // [level + 1, item.index) are already after item.iterKey so seeking them may be
708 2 : // wasteful.
709 2 :
710 2 : // We can seek up to the min of largestUserKey and tombstone.End.
711 2 : //
712 2 : // Using example 1 above, we can seek to the smaller of g and i, which is g.
713 2 : //
714 2 : // Another example, where the sstable bounds are [c#8, i#InternalRangeDelSentinel],
715 2 : // and the tombstone is [b, i)#8. Seeking to i is correct since it is seeking up to
716 2 : // the exclusive bound of the tombstone. We do not need to look at
717 2 : // isLargestKeyRangeDelSentinel.
718 2 : //
719 2 : // Progress argument: Since this file is at a higher level than item.iterKey we know
720 2 : // that the iterator in this file must be positioned within its bounds and at a key
721 2 : // X > item.iterKey (otherwise it would be the min of the heap). It is not
722 2 : // possible for X.UserKey == item.iterKey.UserKey, since it is incompatible with
723 2 : // X > item.iterKey (a lower version cannot be in a higher sstable), so it must be that
724 2 : // X.UserKey > item.iterKey.UserKey. Which means l.largestUserKey > item.key.UserKey.
725 2 : // We also know that l.tombstone.End > item.iterKey.UserKey. So the min of these,
726 2 : // seekKey, computed below, is > item.iterKey.UserKey, so the call to seekGE() will
727 2 : // make forward progress.
728 2 : seekKey := l.tombstone.End
729 2 : if l.largestUserKey != nil && m.heap.cmp(l.largestUserKey, seekKey) < 0 {
730 1 : seekKey = l.largestUserKey
731 1 : }
732 : // This seek is not directly due to a SeekGE call, so we don't know
733 : // enough about the underlying iterator positions, and so we keep the
734 : // try-seek-using-next optimization disabled. Additionally, if we're in
735 : // prefix-seek mode and a re-seek would have moved us past the original
736 : // prefix, we can remove all merging iter levels below the rangedel
737 : // tombstone's level and return immediately instead of re-seeking. This
738 : // is correct since those levels cannot provide a key that matches the
739 : // prefix, and is also visible. Additionally, this is important to make
740 : // subsequent `TrySeekUsingNext` work correctly, as a re-seek on a
741 : // different prefix could have resulted in this iterator skipping visible
742 : // keys at prefixes in between m.prefix and seekKey, that are currently
743 : // not in the heap due to a bloom filter mismatch.
744 : //
745 : // Additionally, we set the relative-seek flag. This is
746 : // important when iterating with lazy combined iteration. If
747 : // there's a range key between this level's current file and the
748 : // file the seek will land on, we need to detect it in order to
749 : // trigger construction of the combined iterator.
750 2 : if m.prefix != nil {
751 2 : if n := m.split(seekKey); !bytes.Equal(m.prefix, seekKey[:n]) {
752 2 : for i := item.index; i < len(m.levels); i++ {
753 2 : // Remove this level from the heap. Setting iterKey and iterValue
754 2 : // to their zero values should be sufficient for initMinHeap to not
755 2 : // re-initialize the heap with them in it. Other fields in
756 2 : // mergingIterLevel can remain as-is; the iter/rangeDelIter needs
757 2 : // to stay intact for future trySeekUsingNexts to work, the level
758 2 : // iter boundary context is owned by the levelIter which is not
759 2 : // being repositioned, and any tombstones in these levels will be
760 2 : // irrelevant for us anyway.
761 2 : m.levels[i].iterKey = nil
762 2 : m.levels[i].iterValue = base.LazyValue{}
763 2 : }
764 : // TODO(bilal): Consider a more efficient way of removing levels from
765 : // the heap without reinitializing all of it. This would likely
766 : // necessitate tracking the heap positions of each mergingIterHeap
767 : // item in the mergingIterLevel, and then swapping that item in the
768 : // heap with the last-positioned heap item, and shrinking the heap by
769 : // one.
770 2 : m.initMinHeap()
771 2 : return true
772 : }
773 : }
774 2 : m.seekGE(seekKey, item.index, base.SeekGEFlagsNone.EnableRelativeSeek())
775 2 : return true
776 : }
777 2 : if l.tombstone.CoversAt(m.snapshot, item.iterKey.SeqNum()) {
778 2 : if m.prefix == nil {
779 2 : m.nextEntry(item, nil /* succKey */)
780 2 : } else {
781 2 : m.maybeNextEntryWithinPrefix(item)
782 2 : }
783 2 : return true
784 : }
785 : }
786 : }
787 2 : return false
788 : }
789 :
790 : // Starting from the current entry, finds the first (next) entry that can be returned.
791 2 : func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {
792 2 : for m.heap.len() > 0 && m.err == nil {
793 2 : item := m.heap.items[0]
794 2 : if m.levels[item.index].isSyntheticIterBoundsKey {
795 2 : break
796 : }
797 :
798 2 : m.addItemStats(item)
799 2 :
800 2 : // Skip ignorable boundary keys. These are not real keys and exist to
801 2 : // keep sstables open until we've surpassed their end boundaries so that
802 2 : // their range deletions are visible.
803 2 : if m.levels[item.index].isIgnorableBoundaryKey {
804 2 : if m.prefix == nil {
805 2 : m.nextEntry(item, nil /* succKey */)
806 2 : } else {
807 2 : m.maybeNextEntryWithinPrefix(item)
808 2 : }
809 2 : continue
810 : }
811 :
812 : // Check if the heap root key is deleted by a range tombstone in a
813 : // higher level. If it is, isNextEntryDeleted will advance the iterator
814 : // to a later key (through seeking or nexting).
815 2 : if m.isNextEntryDeleted(item) {
816 2 : m.stats.PointsCoveredByRangeTombstones++
817 2 : continue
818 : }
819 :
820 : // Check if the key is visible at the iterator sequence numbers.
821 2 : if !item.iterKey.Visible(m.snapshot, m.batchSnapshot) {
822 2 : if m.prefix == nil {
823 2 : m.nextEntry(item, nil /* succKey */)
824 2 : } else {
825 2 : m.maybeNextEntryWithinPrefix(item)
826 2 : }
827 2 : continue
828 : }
829 :
830 : // The heap root is visible and not deleted by any range tombstones.
831 : // Return it.
832 2 : return item.iterKey, item.iterValue
833 : }
834 2 : return nil, base.LazyValue{}
835 : }
836 :
837 : // Steps to the prev entry. item is the current top item in the heap.
838 2 : func (m *mergingIter) prevEntry(l *mergingIterLevel) {
839 2 : oldTopLevel := l.index
840 2 : oldRangeDelIter := l.rangeDelIter
841 2 : if l.iterKey, l.iterValue = l.iter.Prev(); l.iterKey != nil {
842 2 : if m.heap.len() > 1 {
843 2 : m.heap.fix(0)
844 2 : }
845 2 : if l.rangeDelIter != oldRangeDelIter && l.rangeDelIter != nil {
846 2 : // The rangeDelIter changed which indicates that the l.iter moved to the
847 2 : // previous sstable. We have to update the tombstone for oldTopLevel as
848 2 : // well.
849 2 : oldTopLevel--
850 2 : }
851 2 : } else {
852 2 : m.err = l.iter.Error()
853 2 : if m.err == nil {
854 2 : m.heap.pop()
855 2 : }
856 : }
857 :
858 : // The cached tombstones are only valid for the levels
859 : // [0,oldTopLevel]. Updated the cached tombstones for any levels in the range
860 : // [oldTopLevel+1,heap[0].index].
861 2 : m.initMaxRangeDelIters(oldTopLevel)
862 : }
863 :
864 : // isPrevEntryDeleted() starts from the current entry (as the prev entry) and if it is deleted,
865 : // moves the iterators backward as needed and returns true, else it returns false. item is the top
866 : // item in the heap.
867 2 : func (m *mergingIter) isPrevEntryDeleted(item *mergingIterLevel) bool {
868 2 : // Look for a range deletion tombstone containing item.iterKey at higher
869 2 : // levels (level < item.index). If we find such a range tombstone we know
870 2 : // it deletes the key in the current level. Also look for a range
871 2 : // deletion at the current level (level == item.index). If we find such a
872 2 : // range deletion we need to check whether it is newer than the current
873 2 : // entry.
874 2 : for level := 0; level <= item.index; level++ {
875 2 : l := &m.levels[level]
876 2 : if l.rangeDelIter == nil || l.tombstone == nil {
877 2 : // If l.tombstone is nil, there are no further tombstones
878 2 : // in the current sstable in the current (reverse) iteration
879 2 : // direction.
880 2 : continue
881 : }
882 2 : if m.heap.cmp(item.iterKey.UserKey, l.tombstone.Start) < 0 {
883 2 : // The current key is before the tombstone start key.
884 2 : //
885 2 : // NB: for the case that this l.rangeDelIter is provided by a levelIter we know that
886 2 : // the levelIter must be positioned at a key < item.iterKey. So it is sufficient to seek the
887 2 : // current l.rangeDelIter (since any range del iterators that will be provided by the
888 2 : // levelIter in the future cannot contain item.iterKey). Also, it is it is possible that we
889 2 : // will encounter parts of the range delete that should be ignored -- we handle that
890 2 : // below.
891 2 : l.tombstone = keyspan.SeekLE(m.heap.cmp, l.rangeDelIter, item.iterKey.UserKey)
892 2 : }
893 2 : if l.tombstone == nil {
894 2 : continue
895 : }
896 :
897 : // Reasoning for correctness of untruncated tombstone handling when the untruncated
898 : // tombstone is at a higher level:
899 : //
900 : // The iterator corresponding to this tombstone is still in the heap so it must be
901 : // positioned <= item.iterKey. Which means the Smallest key bound of the sstable containing this
902 : // tombstone is <= item.iterKey. So the lower limit of this tombstone cannot have been
903 : // file-bounds-constrained to > item.iterKey. But it is possible that item.key >= Largest
904 : // key bound of this sstable, in which case this tombstone should be ignored.
905 : //
906 : // Example 1:
907 : // sstable bounds [c#8, g#12] containing a tombstone [b, i)#7, and key is f#6. The
908 : // largestUserKey is g, so we know the key is within the file bounds and the tombstone
909 : // [b, i) covers it.
910 : //
911 : // Example 2:
912 : // Same sstable but the key is g#6. This cannot happen since the [b, i)#7 untruncated
913 : // tombstone was involved in a compaction which must have had a file to the right of this
914 : // sstable that is part of the same atomic compaction group for future compactions. That
915 : // file must have bounds that cover g#6 and this levelIter must be at that file.
916 : //
917 : // Example 3:
918 : // sstable bounds [c#8, g#RangeDelSentinel] containing [b, i)#7 and the key is g#10.
919 : // This key is not deleted by this tombstone. We need to look at
920 : // isLargestUserKeyExclusive.
921 : //
922 : // For a tombstone at the same level as the key, the file bounds are trivially satisfied.
923 :
924 : // Default to within bounds.
925 2 : withinLargestSSTableBound := true
926 2 : if l.largestUserKey != nil {
927 2 : cmpResult := m.heap.cmp(l.largestUserKey, item.iterKey.UserKey)
928 2 : withinLargestSSTableBound = cmpResult > 0 || (cmpResult == 0 && !l.isLargestUserKeyExclusive)
929 2 : }
930 2 : if withinLargestSSTableBound && l.tombstone.Contains(m.heap.cmp, item.iterKey.UserKey) && l.tombstone.VisibleAt(m.snapshot) {
931 2 : if level < item.index {
932 2 : // We could also do m.seekLT(..., level + 1). The levels from
933 2 : // [level + 1, item.index) are already before item.iterKey so seeking them may be
934 2 : // wasteful.
935 2 :
936 2 : // We can seek up to the max of smallestUserKey and tombstone.Start.UserKey.
937 2 : //
938 2 : // Using example 1 above, we can seek to the larger of c and b, which is c.
939 2 : //
940 2 : // Progress argument: We know that the iterator in this file is positioned within
941 2 : // its bounds and at a key X < item.iterKey (otherwise it would be the max of the heap).
942 2 : // So smallestUserKey <= item.iterKey.UserKey and we already know that
943 2 : // l.tombstone.Start.UserKey <= item.iterKey.UserKey. So the seekKey computed below
944 2 : // is <= item.iterKey.UserKey, and since we do a seekLT() we will make backwards
945 2 : // progress.
946 2 : seekKey := l.tombstone.Start
947 2 : if l.smallestUserKey != nil && m.heap.cmp(l.smallestUserKey, seekKey) > 0 {
948 1 : seekKey = l.smallestUserKey
949 1 : }
950 : // We set the relative-seek flag. This is important when
951 : // iterating with lazy combined iteration. If there's a range
952 : // key between this level's current file and the file the seek
953 : // will land on, we need to detect it in order to trigger
954 : // construction of the combined iterator.
955 2 : m.seekLT(seekKey, item.index, base.SeekLTFlagsNone.EnableRelativeSeek())
956 2 : return true
957 : }
958 2 : if l.tombstone.CoversAt(m.snapshot, item.iterKey.SeqNum()) {
959 2 : m.prevEntry(item)
960 2 : return true
961 2 : }
962 : }
963 : }
964 2 : return false
965 : }
966 :
967 : // Starting from the current entry, finds the first (prev) entry that can be returned.
968 2 : func (m *mergingIter) findPrevEntry() (*InternalKey, base.LazyValue) {
969 2 : for m.heap.len() > 0 && m.err == nil {
970 2 : item := m.heap.items[0]
971 2 : if m.levels[item.index].isSyntheticIterBoundsKey {
972 2 : break
973 : }
974 2 : m.addItemStats(item)
975 2 : if m.isPrevEntryDeleted(item) {
976 2 : m.stats.PointsCoveredByRangeTombstones++
977 2 : continue
978 : }
979 2 : if item.iterKey.Visible(m.snapshot, m.batchSnapshot) &&
980 2 : (!m.levels[item.index].isIgnorableBoundaryKey) {
981 2 : return item.iterKey, item.iterValue
982 2 : }
983 2 : m.prevEntry(item)
984 : }
985 2 : return nil, base.LazyValue{}
986 : }
987 :
988 : // Seeks levels >= level to >= key. Additionally uses range tombstones to extend the seeks.
989 2 : func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) {
990 2 : // When seeking, we can use tombstones to adjust the key we seek to on each
991 2 : // level. Consider the series of range tombstones:
992 2 : //
993 2 : // 1: a---e
994 2 : // 2: d---h
995 2 : // 3: g---k
996 2 : // 4: j---n
997 2 : // 5: m---q
998 2 : //
999 2 : // If we SeekGE("b") we also find the tombstone "b" resides within in the
1000 2 : // first level which is [a,e). Regardless of whether this tombstone deletes
1001 2 : // "b" in that level, we know it deletes "b" in all lower levels, so we
1002 2 : // adjust the search key in the next level to the tombstone end key "e". We
1003 2 : // then SeekGE("e") in the second level and find the corresponding tombstone
1004 2 : // [d,h). This process continues and we end up seeking for "h" in the 3rd
1005 2 : // level, "k" in the 4th level and "n" in the last level.
1006 2 : //
1007 2 : // TODO(peter,rangedel): In addition to the above we can delay seeking a
1008 2 : // level (and any lower levels) when the current iterator position is
1009 2 : // contained within a range tombstone at a higher level.
1010 2 :
1011 2 : // Deterministically disable the TrySeekUsingNext optimizations sometimes in
1012 2 : // invariant builds to encourage the metamorphic tests to surface bugs. Note
1013 2 : // that we cannot disable the optimization within individual levels. It must
1014 2 : // be disabled for all levels or none. If one lower-level iterator performs
1015 2 : // a fresh seek whereas another takes advantage of its current iterator
1016 2 : // position, the heap can become inconsistent. Consider the following
1017 2 : // example:
1018 2 : //
1019 2 : // L5: [ [b-c) ] [ d ]*
1020 2 : // L6: [ b ] [e]*
1021 2 : //
1022 2 : // Imagine a SeekGE(a). The [b-c) range tombstone deletes the L6 point key
1023 2 : // 'b', resulting in the iterator positioned at d with the heap:
1024 2 : //
1025 2 : // {L5: d, L6: e}
1026 2 : //
1027 2 : // A subsequent SeekGE(b) is seeking to a larger key, so the caller may set
1028 2 : // TrySeekUsingNext()=true. If the L5 iterator used the TrySeekUsingNext
1029 2 : // optimization but the L6 iterator did not, the iterator would have the
1030 2 : // heap:
1031 2 : //
1032 2 : // {L6: b, L5: d}
1033 2 : //
1034 2 : // Because the L5 iterator has already advanced to the next sstable, the
1035 2 : // merging iterator cannot observe the [b-c) range tombstone and will
1036 2 : // mistakenly return L6's deleted point key 'b'.
1037 2 : if invariants.Enabled && flags.TrySeekUsingNext() && !m.forceEnableSeekOpt &&
1038 2 : disableSeekOpt(key, uintptr(unsafe.Pointer(m))) {
1039 2 : flags = flags.DisableTrySeekUsingNext()
1040 2 : }
1041 :
1042 2 : for ; level < len(m.levels); level++ {
1043 2 : if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
1044 0 : m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
1045 0 : }
1046 :
1047 2 : l := &m.levels[level]
1048 2 : if m.prefix != nil {
1049 2 : l.iterKey, l.iterValue = l.iter.SeekPrefixGE(m.prefix, key, flags)
1050 2 : } else {
1051 2 : l.iterKey, l.iterValue = l.iter.SeekGE(key, flags)
1052 2 : }
1053 :
1054 : // If this level contains overlapping range tombstones, alter the seek
1055 : // key accordingly. Caveat: If we're performing lazy-combined iteration,
1056 : // we cannot alter the seek key: Range tombstones don't delete range
1057 : // keys, and there might exist live range keys within the range
1058 : // tombstone's span that need to be observed to trigger a switch to
1059 : // combined iteration.
1060 2 : if rangeDelIter := l.rangeDelIter; rangeDelIter != nil &&
1061 2 : (m.combinedIterState == nil || m.combinedIterState.initialized) {
1062 2 : // The level has a range-del iterator. Find the tombstone containing
1063 2 : // the search key.
1064 2 : //
1065 2 : // For untruncated tombstones that are possibly file-bounds-constrained, we are using a
1066 2 : // levelIter which will set smallestUserKey and largestUserKey. Since the levelIter
1067 2 : // is at this file we know that largestUserKey >= key, so we know that the
1068 2 : // tombstone we find cannot be file-bounds-constrained in its upper bound to something < key.
1069 2 : // We do need to compare with smallestUserKey to ensure that the tombstone is not
1070 2 : // file-bounds-constrained in its lower bound.
1071 2 : //
1072 2 : // See the detailed comments in isNextEntryDeleted() on why similar containment and
1073 2 : // seeking logic is correct. The subtle difference here is that key is a user key,
1074 2 : // so we can have a sstable with bounds [c#8, i#InternalRangeDelSentinel], and the
1075 2 : // tombstone is [b, k)#8 and the seek key is i: levelIter.SeekGE(i) will move past
1076 2 : // this sstable since it realizes the largest key is a InternalRangeDelSentinel.
1077 2 : l.tombstone = rangeDelIter.SeekGE(key)
1078 2 : if l.tombstone != nil && l.tombstone.VisibleAt(m.snapshot) && l.tombstone.Contains(m.heap.cmp, key) &&
1079 2 : (l.smallestUserKey == nil || m.heap.cmp(l.smallestUserKey, key) <= 0) {
1080 2 : // NB: Based on the comment above l.largestUserKey >= key, and based on the
1081 2 : // containment condition tombstone.End > key, so the assignment to key results
1082 2 : // in a monotonically non-decreasing key across iterations of this loop.
1083 2 : //
1084 2 : // The adjustment of key here can only move it to a larger key. Since
1085 2 : // the caller of seekGE guaranteed that the original key was greater
1086 2 : // than or equal to m.lower, the new key will continue to be greater
1087 2 : // than or equal to m.lower.
1088 2 : if l.largestUserKey != nil &&
1089 2 : m.heap.cmp(l.largestUserKey, l.tombstone.End) < 0 {
1090 1 : // Truncate the tombstone for seeking purposes. Note that this can over-truncate
1091 1 : // but that is harmless for this seek optimization.
1092 1 : key = l.largestUserKey
1093 2 : } else {
1094 2 : key = l.tombstone.End
1095 2 : }
1096 : }
1097 : }
1098 : }
1099 :
1100 2 : m.initMinHeap()
1101 : }
1102 :
1103 0 : func (m *mergingIter) String() string {
1104 0 : return "merging"
1105 0 : }
1106 :
1107 : // SeekGE implements base.InternalIterator.SeekGE. Note that SeekGE only checks
1108 : // the upper bound. It is up to the caller to ensure that key is greater than
1109 : // or equal to the lower bound.
1110 2 : func (m *mergingIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, base.LazyValue) {
1111 2 : m.err = nil // clear cached iteration error
1112 2 : m.prefix = nil
1113 2 : m.seekGE(key, 0 /* start level */, flags)
1114 2 : return m.findNextEntry()
1115 2 : }
1116 :
1117 : // SeekPrefixGE implements base.InternalIterator.SeekPrefixGE. Note that
1118 : // SeekPrefixGE only checks the upper bound. It is up to the caller to ensure
1119 : // that key is greater than or equal to the lower bound.
1120 : func (m *mergingIter) SeekPrefixGE(
1121 : prefix, key []byte, flags base.SeekGEFlags,
1122 2 : ) (*base.InternalKey, base.LazyValue) {
1123 2 : m.err = nil // clear cached iteration error
1124 2 : m.prefix = prefix
1125 2 : m.seekGE(key, 0 /* start level */, flags)
1126 2 : return m.findNextEntry()
1127 2 : }
1128 :
1129 : // Seeks levels >= level to < key. Additionally uses range tombstones to extend the seeks.
1130 2 : func (m *mergingIter) seekLT(key []byte, level int, flags base.SeekLTFlags) {
1131 2 : // See the comment in seekGE regarding using tombstones to adjust the seek
1132 2 : // target per level.
1133 2 : m.prefix = nil
1134 2 : for ; level < len(m.levels); level++ {
1135 2 : if invariants.Enabled && m.upper != nil && m.heap.cmp(key, m.upper) > 0 {
1136 0 : m.logger.Fatalf("mergingIter: upper bound violation: %s > %s\n%s", key, m.upper, debug.Stack())
1137 0 : }
1138 :
1139 2 : l := &m.levels[level]
1140 2 : l.iterKey, l.iterValue = l.iter.SeekLT(key, flags)
1141 2 :
1142 2 : // If this level contains overlapping range tombstones, alter the seek
1143 2 : // key accordingly. Caveat: If we're performing lazy-combined iteration,
1144 2 : // we cannot alter the seek key: Range tombstones don't delete range
1145 2 : // keys, and there might exist live range keys within the range
1146 2 : // tombstone's span that need to be observed to trigger a switch to
1147 2 : // combined iteration.
1148 2 : if rangeDelIter := l.rangeDelIter; rangeDelIter != nil &&
1149 2 : (m.combinedIterState == nil || m.combinedIterState.initialized) {
1150 2 : // The level has a range-del iterator. Find the tombstone containing
1151 2 : // the search key.
1152 2 : //
1153 2 : // For untruncated tombstones that are possibly file-bounds-constrained we are using a
1154 2 : // levelIter which will set smallestUserKey and largestUserKey. Since the levelIter
1155 2 : // is at this file we know that smallestUserKey <= key, so we know that the
1156 2 : // tombstone we find cannot be file-bounds-constrained in its lower bound to something > key.
1157 2 : // We do need to compare with largestUserKey to ensure that the tombstone is not
1158 2 : // file-bounds-constrained in its upper bound.
1159 2 : //
1160 2 : // See the detailed comments in isPrevEntryDeleted() on why similar containment and
1161 2 : // seeking logic is correct.
1162 2 :
1163 2 : // Default to within bounds.
1164 2 : withinLargestSSTableBound := true
1165 2 : if l.largestUserKey != nil {
1166 2 : cmpResult := m.heap.cmp(l.largestUserKey, key)
1167 2 : withinLargestSSTableBound = cmpResult > 0 || (cmpResult == 0 && !l.isLargestUserKeyExclusive)
1168 2 : }
1169 :
1170 2 : l.tombstone = keyspan.SeekLE(m.heap.cmp, rangeDelIter, key)
1171 2 : if l.tombstone != nil && l.tombstone.VisibleAt(m.snapshot) &&
1172 2 : l.tombstone.Contains(m.heap.cmp, key) && withinLargestSSTableBound {
1173 2 : // NB: Based on the comment above l.smallestUserKey <= key, and based
1174 2 : // on the containment condition tombstone.Start.UserKey <= key, so the
1175 2 : // assignment to key results in a monotonically non-increasing key
1176 2 : // across iterations of this loop.
1177 2 : //
1178 2 : // The adjustment of key here can only move it to a smaller key. Since
1179 2 : // the caller of seekLT guaranteed that the original key was less than
1180 2 : // or equal to m.upper, the new key will continue to be less than or
1181 2 : // equal to m.upper.
1182 2 : if l.smallestUserKey != nil &&
1183 2 : m.heap.cmp(l.smallestUserKey, l.tombstone.Start) >= 0 {
1184 2 : // Truncate the tombstone for seeking purposes. Note that this can over-truncate
1185 2 : // but that is harmless for this seek optimization.
1186 2 : key = l.smallestUserKey
1187 2 : } else {
1188 2 : key = l.tombstone.Start
1189 2 : }
1190 : }
1191 : }
1192 : }
1193 :
1194 2 : m.initMaxHeap()
1195 : }
1196 :
1197 : // SeekLT implements base.InternalIterator.SeekLT. Note that SeekLT only checks
1198 : // the lower bound. It is up to the caller to ensure that key is less than the
1199 : // upper bound.
1200 2 : func (m *mergingIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, base.LazyValue) {
1201 2 : m.err = nil // clear cached iteration error
1202 2 : m.prefix = nil
1203 2 : m.seekLT(key, 0 /* start level */, flags)
1204 2 : return m.findPrevEntry()
1205 2 : }
1206 :
1207 : // First implements base.InternalIterator.First. Note that First only checks
1208 : // the upper bound. It is up to the caller to ensure that key is greater than
1209 : // or equal to the lower bound (e.g. via a call to SeekGE(lower)).
1210 2 : func (m *mergingIter) First() (*InternalKey, base.LazyValue) {
1211 2 : m.err = nil // clear cached iteration error
1212 2 : m.prefix = nil
1213 2 : m.heap.items = m.heap.items[:0]
1214 2 : for i := range m.levels {
1215 2 : l := &m.levels[i]
1216 2 : l.iterKey, l.iterValue = l.iter.First()
1217 2 : }
1218 2 : m.initMinHeap()
1219 2 : return m.findNextEntry()
1220 : }
1221 :
1222 : // Last implements base.InternalIterator.Last. Note that Last only checks the
1223 : // lower bound. It is up to the caller to ensure that key is less than the
1224 : // upper bound (e.g. via a call to SeekLT(upper))
1225 2 : func (m *mergingIter) Last() (*InternalKey, base.LazyValue) {
1226 2 : m.err = nil // clear cached iteration error
1227 2 : m.prefix = nil
1228 2 : for i := range m.levels {
1229 2 : l := &m.levels[i]
1230 2 : l.iterKey, l.iterValue = l.iter.Last()
1231 2 : }
1232 2 : m.initMaxHeap()
1233 2 : return m.findPrevEntry()
1234 : }
1235 :
1236 2 : func (m *mergingIter) Next() (*InternalKey, base.LazyValue) {
1237 2 : if m.err != nil {
1238 1 : return nil, base.LazyValue{}
1239 1 : }
1240 :
1241 2 : if m.dir != 1 {
1242 2 : m.switchToMinHeap()
1243 2 : return m.findNextEntry()
1244 2 : }
1245 :
1246 2 : if m.heap.len() == 0 {
1247 1 : return nil, base.LazyValue{}
1248 1 : }
1249 :
1250 : // NB: It's okay to call nextEntry directly even during prefix iteration
1251 : // mode (as opposed to indirectly through maybeNextEntryWithinPrefix).
1252 : // During prefix iteration mode, we rely on the caller to not call Next if
1253 : // the iterator has already advanced beyond the iteration prefix. See the
1254 : // comment above the base.InternalIterator interface.
1255 2 : m.nextEntry(m.heap.items[0], nil /* succKey */)
1256 2 : return m.findNextEntry()
1257 : }
1258 :
1259 2 : func (m *mergingIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
1260 2 : if m.dir != 1 {
1261 0 : panic("pebble: cannot switch directions with NextPrefix")
1262 : }
1263 2 : if m.err != nil || m.heap.len() == 0 {
1264 0 : return nil, LazyValue{}
1265 0 : }
1266 2 : if m.levelsPositioned == nil {
1267 1 : m.levelsPositioned = make([]bool, len(m.levels))
1268 2 : } else {
1269 2 : for i := range m.levelsPositioned {
1270 2 : m.levelsPositioned[i] = false
1271 2 : }
1272 : }
1273 :
1274 : // The heap root necessarily must be positioned at a key < succKey, because
1275 : // NextPrefix was invoked.
1276 2 : root := &m.heap.items[0]
1277 2 : m.levelsPositioned[(*root).index] = true
1278 2 : if invariants.Enabled && m.heap.cmp((*root).iterKey.UserKey, succKey) >= 0 {
1279 0 : m.logger.Fatalf("pebble: invariant violation: NextPrefix(%q) called on merging iterator already positioned at %q",
1280 0 : succKey, (*root).iterKey)
1281 0 : }
1282 2 : m.nextEntry(*root, succKey)
1283 2 : // NB: root is a pointer to the heap root. nextEntry may have changed
1284 2 : // the heap root, so we must not expect root to still point to the same
1285 2 : // level (or to even be valid, if the heap is now exhaused).
1286 2 :
1287 2 : for m.heap.len() > 0 {
1288 2 : if m.levelsPositioned[(*root).index] {
1289 2 : // A level we've previously positioned is at the top of the heap, so
1290 2 : // there are no other levels positioned at keys < succKey. We've
1291 2 : // advanced as far as we need to.
1292 2 : break
1293 : }
1294 : // Since this level was not the original heap root when NextPrefix was
1295 : // called, we don't know whether this level's current key has the
1296 : // previous prefix or a new one.
1297 2 : if m.heap.cmp((*root).iterKey.UserKey, succKey) >= 0 {
1298 1 : break
1299 : }
1300 2 : m.levelsPositioned[(*root).index] = true
1301 2 : m.nextEntry(*root, succKey)
1302 : }
1303 2 : return m.findNextEntry()
1304 : }
1305 :
1306 2 : func (m *mergingIter) Prev() (*InternalKey, base.LazyValue) {
1307 2 : if m.err != nil {
1308 0 : return nil, base.LazyValue{}
1309 0 : }
1310 :
1311 2 : if m.dir != -1 {
1312 2 : if m.prefix != nil {
1313 1 : m.err = errors.New("pebble: unsupported reverse prefix iteration")
1314 1 : return nil, base.LazyValue{}
1315 1 : }
1316 2 : m.switchToMaxHeap()
1317 2 : return m.findPrevEntry()
1318 : }
1319 :
1320 2 : if m.heap.len() == 0 {
1321 1 : return nil, base.LazyValue{}
1322 1 : }
1323 :
1324 2 : m.prevEntry(m.heap.items[0])
1325 2 : return m.findPrevEntry()
1326 : }
1327 :
1328 2 : func (m *mergingIter) Error() error {
1329 2 : if m.heap.len() == 0 || m.err != nil {
1330 2 : return m.err
1331 2 : }
1332 2 : return m.levels[m.heap.items[0].index].iter.Error()
1333 : }
1334 :
1335 2 : func (m *mergingIter) Close() error {
1336 2 : for i := range m.levels {
1337 2 : iter := m.levels[i].iter
1338 2 : if err := iter.Close(); err != nil && m.err == nil {
1339 0 : m.err = err
1340 0 : }
1341 2 : if rangeDelIter := m.levels[i].rangeDelIter; rangeDelIter != nil {
1342 2 : if err := rangeDelIter.Close(); err != nil && m.err == nil {
1343 0 : m.err = err
1344 0 : }
1345 : }
1346 : }
1347 2 : m.levels = nil
1348 2 : m.heap.items = m.heap.items[:0]
1349 2 : return m.err
1350 : }
1351 :
1352 2 : func (m *mergingIter) SetBounds(lower, upper []byte) {
1353 2 : m.prefix = nil
1354 2 : m.lower = lower
1355 2 : m.upper = upper
1356 2 : for i := range m.levels {
1357 2 : m.levels[i].iter.SetBounds(lower, upper)
1358 2 : }
1359 2 : m.heap.clear()
1360 : }
1361 :
1362 1 : func (m *mergingIter) SetContext(ctx context.Context) {
1363 1 : for i := range m.levels {
1364 1 : m.levels[i].iter.SetContext(ctx)
1365 1 : }
1366 : }
1367 :
1368 0 : func (m *mergingIter) DebugString() string {
1369 0 : var buf bytes.Buffer
1370 0 : sep := ""
1371 0 : for m.heap.len() > 0 {
1372 0 : item := m.heap.pop()
1373 0 : fmt.Fprintf(&buf, "%s%s", sep, item.iterKey)
1374 0 : sep = " "
1375 0 : }
1376 0 : if m.dir == 1 {
1377 0 : m.initMinHeap()
1378 0 : } else {
1379 0 : m.initMaxHeap()
1380 0 : }
1381 0 : return buf.String()
1382 : }
1383 :
1384 2 : func (m *mergingIter) ForEachLevelIter(fn func(li *levelIter) bool) {
1385 2 : for _, ml := range m.levels {
1386 2 : if ml.levelIter != nil {
1387 2 : if done := fn(ml.levelIter); done {
1388 2 : break
1389 : }
1390 : }
1391 : }
1392 : }
1393 :
1394 2 : func (m *mergingIter) addItemStats(l *mergingIterLevel) {
1395 2 : m.stats.PointCount++
1396 2 : m.stats.KeyBytes += uint64(len(l.iterKey.UserKey))
1397 2 : m.stats.ValueBytes += uint64(len(l.iterValue.ValueOrHandle))
1398 2 : }
1399 :
1400 : var _ internalIterator = &mergingIter{}
|