Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "runtime/debug"
12 : "unsafe"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : )
19 :
20 : type mergingIterLevel struct {
21 : index int
22 : iter internalIterator
23 : // rangeDelIter is set to the range-deletion iterator for the level. When
24 : // configured with a levelIter, this pointer changes as sstable boundaries
25 : // are crossed. See levelIter.initRangeDel and the Range Deletions comment
26 : // below.
27 : rangeDelIter keyspan.FragmentIterator
28 : // iterKey and iterValue cache the current key and value iter are pointed at.
29 : iterKey *InternalKey
30 : iterValue base.LazyValue
31 : // levelIter is non-nil if this level's iter is ultimately backed by a
32 : // *levelIter. The handle in iter may have wrapped the levelIter with
33 : // intermediary internalIterator implementations.
34 : levelIter *levelIter
35 :
36 : // levelIterBoundaryContext's fields are set when using levelIter, in order
37 : // to surface sstable boundary keys and file-level context. See levelIter
38 : // comment and the Range Deletions comment below.
39 : levelIterBoundaryContext
40 :
41 : // tombstone caches the tombstone rangeDelIter is currently pointed at. If
42 : // tombstone is nil, there are no further tombstones within the
43 : // current sstable in the current iterator direction. The cached tombstone is
44 : // only valid for the levels in the range [0,heap[0].index]. This avoids
45 : // positioning tombstones at lower levels which cannot possibly shadow the
46 : // current key.
47 : tombstone *keyspan.Span
48 : }
49 :
50 : type levelIterBoundaryContext struct {
51 : // isSyntheticIterBoundsKey is set to true iff the key returned by the level
52 : // iterator is a synthetic key derived from the iterator bounds. This is used
53 : // to prevent the mergingIter from being stuck at such a synthetic key if it
54 : // becomes the top element of the heap. When used with a user-facing Iterator,
55 : // the only range deletions exposed by this mergingIter should be those with
56 : // `isSyntheticIterBoundsKey || isIgnorableBoundaryKey`.
57 : isSyntheticIterBoundsKey bool
58 : // isIgnorableBoundaryKey is set to true iff the key returned by the level
59 : // iterator is a file boundary key that should be ignored when returning to
60 : // the parent iterator. File boundary keys are used by the level iter to
61 : // keep a levelIter file's range deletion iterator open as long as other
62 : // levels within the merging iterator require it. When used with a user-facing
63 : // Iterator, the only range deletions exposed by this mergingIter should be
64 : // those with `isSyntheticIterBoundsKey || isIgnorableBoundaryKey`.
65 : isIgnorableBoundaryKey bool
66 : }
67 :
68 : // mergingIter provides a merged view of multiple iterators from different
69 : // levels of the LSM.
70 : //
71 : // The core of a mergingIter is a heap of internalIterators (see
72 : // mergingIterHeap). The heap can operate as either a min-heap, used during
73 : // forward iteration (First, SeekGE, Next) or a max-heap, used during reverse
74 : // iteration (Last, SeekLT, Prev). The heap is initialized in calls to First,
75 : // Last, SeekGE, and SeekLT. A call to Next or Prev takes the current top
76 : // element on the heap, advances its iterator, and then "fixes" the heap
77 : // property. When one of the child iterators is exhausted during Next/Prev
78 : // iteration, it is removed from the heap.
79 : //
80 : // # Range Deletions
81 : //
82 : // A mergingIter can optionally be configured with a slice of range deletion
83 : // iterators. The range deletion iterator slice must exactly parallel the point
84 : // iterators and the range deletion iterator must correspond to the same level
85 : // in the LSM as the point iterator. Note that each memtable and each table in
86 : // L0 is a different "level" from the mergingIter perspective. So level 0 below
87 : // does not correspond to L0 in the LSM.
88 : //
89 : // A range deletion iterator iterates over fragmented range tombstones. Range
90 : // tombstones are fragmented by splitting them at any overlapping points. This
91 : // fragmentation guarantees that within an sstable tombstones will either be
92 : // distinct or will have identical start and end user keys. While range
93 : // tombstones are fragmented within an sstable, the start and end keys are not truncated
94 : // to sstable boundaries. This is necessary because the tombstone end key is
95 : // exclusive and does not have a sequence number. Consider an sstable
96 : // containing the range tombstone [a,c)#9 and the key "b#8". The tombstone must
97 : // delete "b#8", yet older versions of "b" might spill over to the next
98 : // sstable. So the boundary key for this sstable must be "b#8". Adjusting the
99 : // end key of tombstones to be optionally inclusive or contain a sequence
100 : // number would be possible solutions (such solutions have potentially serious
101 : // issues: tombstones have exclusive end keys since an inclusive deletion end can
102 : // be converted to an exclusive one while the reverse transformation is not possible;
103 : // the semantics of a sequence number for the end key of a range tombstone are murky).
104 : //
105 : // The approach taken here performs an
106 : // implicit truncation of the tombstone to the sstable boundaries.
107 : //
108 : // During initialization of a mergingIter, the range deletion iterators for
109 : // batches, memtables, and L0 tables are populated up front. Note that Batches
110 : // and memtables index unfragmented tombstones. Batch.newRangeDelIter() and
111 : // memTable.newRangeDelIter() fragment and cache the tombstones on demand. The
112 : // L1-L6 range deletion iterators are populated by levelIter. When configured
113 : // to load range deletion iterators, whenever a levelIter loads a table it
114 : // loads both the point iterator and the range deletion
115 : // iterator. levelIter.rangeDelIter is configured to point to the right entry
116 : // in mergingIter.levels. The effect of this setup is that
117 : // mergingIter.levels[i].rangeDelIter always contains the fragmented range
118 : // tombstone for the current table in level i that the levelIter has open.
119 : //
120 : // Another crucial mechanism of levelIter is that it materializes fake point
121 : // entries for the table boundaries if the boundary is range deletion
122 : // key. Consider a table that contains only a range tombstone [a-e)#10. The
123 : // sstable boundaries for this table will be a#10,15 and
124 : // e#72057594037927935,15. During forward iteration levelIter will return
125 : // e#72057594037927935,15 as a key. During reverse iteration levelIter will
126 : // return a#10,15 as a key. These sentinel keys act as bookends to point
127 : // iteration and allow mergingIter to keep a table and its associated range
128 : // tombstones loaded as long as there are keys at lower levels that are within
129 : // the bounds of the table.
130 : //
131 : // The final piece to the range deletion puzzle is the LSM invariant that for a
132 : // given key K newer versions of K can only exist earlier in the level, or at
133 : // higher levels of the tree. For example, if K#4 exists in L3, k#5 can only
134 : // exist earlier in the L3 or in L0, L1, L2 or a memtable. Get very explicitly
135 : // uses this invariant to find the value for a key by walking the LSM level by
136 : // level. For range deletions, this invariant means that a range deletion at
137 : // level N will necessarily shadow any keys within its bounds in level Y where
138 : // Y > N. One wrinkle to this statement is that it only applies to keys that
139 : // lie within the sstable bounds as well, but we get that guarantee due to the
140 : // way the range deletion iterator and point iterator are bound together by a
141 : // levelIter.
142 : //
143 : // Tying the above all together, we get a picture where each level (index in
144 : // mergingIter.levels) is composed of both point operations (pX) and range
145 : // deletions (rX). The range deletions for level X shadow both the point
146 : // operations and range deletions for level Y where Y > X allowing mergingIter
147 : // to skip processing entries in that shadow. For example, consider the
148 : // scenario:
149 : //
150 : // r0: a---e
151 : // r1: d---h
152 : // r2: g---k
153 : // r3: j---n
154 : // r4: m---q
155 : //
156 : // This is showing 5 levels of range deletions. Consider what happens upon
157 : // SeekGE("b"). We first seek the point iterator for level 0 (the point values
158 : // are not shown above) and we then seek the range deletion iterator. That
159 : // returns the tombstone [a,e). This tombstone tells us that all keys in the
160 : // range [a,e) in lower levels are deleted so we can skip them. So we can
161 : // adjust the seek key to "e", the tombstone end key. For level 1 we seek to
162 : // "e" and find the range tombstone [d,h) and similar logic holds. By the time
163 : // we get to level 4 we're seeking to "n".
164 : //
165 : // One consequence of not truncating tombstone end keys to sstable boundaries
166 : // is the seeking process described above cannot always seek to the tombstone
167 : // end key in the older level. For example, imagine in the above example r3 is
168 : // a partitioned level (i.e., L1+ in our LSM), and the sstable containing [j,
169 : // n) has "k" as its upper boundary. In this situation, compactions involving
170 : // keys at or after "k" can output those keys to r4+, even if they're newer
171 : // than our tombstone [j, n). So instead of seeking to "n" in r4 we can only
172 : // seek to "k". To achieve this, the instance variable `largestUserKey.`
173 : // maintains the upper bounds of the current sstables in the partitioned
174 : // levels. In this example, `levels[3].largestUserKey` holds "k", telling us to
175 : // limit the seek triggered by a tombstone in r3 to "k".
176 : //
177 : // During actual iteration levels can contain both point operations and range
178 : // deletions. Within a level, when a range deletion contains a point operation
179 : // the sequence numbers must be checked to determine if the point operation is
180 : // newer or older than the range deletion tombstone. The mergingIter maintains
181 : // the invariant that the range deletion iterators for all levels newer that
182 : // the current iteration key (L < m.heap.items[0].index) are positioned at the
183 : // next (or previous during reverse iteration) range deletion tombstone. We
184 : // know those levels don't contain a range deletion tombstone that covers the
185 : // current key because if they did the current key would be deleted. The range
186 : // deletion iterator for the current key's level is positioned at a range
187 : // tombstone covering or past the current key. The position of all of other
188 : // range deletion iterators is unspecified. Whenever a key from those levels
189 : // becomes the current key, their range deletion iterators need to be
190 : // positioned. This lazy positioning avoids seeking the range deletion
191 : // iterators for keys that are never considered. (A similar bit of lazy
192 : // evaluation can be done for the point iterators, but is still TBD).
193 : //
194 : // For a full example, consider the following setup:
195 : //
196 : // p0: o
197 : // r0: m---q
198 : //
199 : // p1: n p
200 : // r1: g---k
201 : //
202 : // p2: b d i
203 : // r2: a---e q----v
204 : //
205 : // p3: e
206 : // r3:
207 : //
208 : // If we start iterating from the beginning, the first key we encounter is "b"
209 : // in p2. When the mergingIter is pointing at a valid entry, the range deletion
210 : // iterators for all of the levels < m.heap.items[0].index are positioned at
211 : // the next range tombstone past the current key. So r0 will point at [m,q) and
212 : // r1 at [g,k). When the key "b" is encountered, we check to see if the current
213 : // tombstone for r0 or r1 contains it, and whether the tombstone for r2, [a,e),
214 : // contains and is newer than "b".
215 : //
216 : // Advancing the iterator finds the next key at "d". This is in the same level
217 : // as the previous key "b" so we don't have to reposition any of the range
218 : // deletion iterators, but merely check whether "d" is now contained by any of
219 : // the range tombstones at higher levels or has stepped past the range
220 : // tombstone in its own level or higher levels. In this case, there is nothing to be done.
221 : //
222 : // Advancing the iterator again finds "e". Since "e" comes from p3, we have to
223 : // position the r3 range deletion iterator, which is empty. "e" is past the r2
224 : // tombstone of [a,e) so we need to advance the r2 range deletion iterator to
225 : // [q,v).
226 : //
227 : // The next key is "i". Because this key is in p2, a level above "e", we don't
228 : // have to reposition any range deletion iterators and instead see that "i" is
229 : // covered by the range tombstone [g,k). The iterator is immediately advanced
230 : // to "n" which is covered by the range tombstone [m,q) causing the iterator to
231 : // advance to "o" which is visible.
232 : //
233 : // # Error handling
234 : //
235 : // Any iterator operation may fail. The InternalIterator contract dictates that
236 : // an iterator must return a nil internal key when an error occurs, and a
237 : // subsequent call to Error() should return the error value. The exported
238 : // merging iterator positioning methods must adhere to this contract by setting
239 : // m.err to hold any error encountered by the individual level iterators and
240 : // returning a nil internal key. Some internal helpers (eg,
241 : // find[Next|Prev]Entry) also adhere to this contract, setting m.err directly).
242 : // Other internal functions return an explicit error return value and DO NOT set
243 : // m.err, relying on the caller to set m.err appropriately.
244 : //
245 : // TODO(jackson): Update the InternalIterator interface to return explicit error
246 : // return values (and an *InternalKV pointer).
247 : //
248 : // TODO(peter,rangedel): For testing, advance the iterator through various
249 : // scenarios and have each step display the current state (i.e. the current
250 : // heap and range-del iterator positioning).
251 : type mergingIter struct {
252 : logger Logger
253 : split Split
254 : dir int
255 : snapshot uint64
256 : batchSnapshot uint64
257 : levels []mergingIterLevel
258 : heap mergingIterHeap
259 : err error
260 : prefix []byte
261 : lower []byte
262 : upper []byte
263 : stats *InternalIteratorStats
264 :
265 : // levelsPositioned, if non-nil, is a slice of the same length as levels.
266 : // It's used by NextPrefix to record which levels have already been
267 : // repositioned. It's created lazily by the first call to NextPrefix.
268 : levelsPositioned []bool
269 :
270 : combinedIterState *combinedIterState
271 :
272 : // Used in some tests to disable the random disabling of seek optimizations.
273 : forceEnableSeekOpt bool
274 : }
275 :
276 : // mergingIter implements the base.InternalIterator interface.
277 : var _ base.InternalIterator = (*mergingIter)(nil)
278 :
279 : // newMergingIter returns an iterator that merges its input. Walking the
280 : // resultant iterator will return all key/value pairs of all input iterators
281 : // in strictly increasing key order, as defined by cmp. It is permissible to
282 : // pass a nil split parameter if the caller is never going to call
283 : // SeekPrefixGE.
284 : //
285 : // The input's key ranges may overlap, but there are assumed to be no duplicate
286 : // keys: if iters[i] contains a key k then iters[j] will not contain that key k.
287 : //
288 : // None of the iters may be nil.
289 : func newMergingIter(
290 : logger Logger,
291 : stats *base.InternalIteratorStats,
292 : cmp Compare,
293 : split Split,
294 : iters ...internalIterator,
295 1 : ) *mergingIter {
296 1 : m := &mergingIter{}
297 1 : levels := make([]mergingIterLevel, len(iters))
298 1 : for i := range levels {
299 1 : levels[i].iter = iters[i]
300 1 : }
301 1 : m.init(&IterOptions{logger: logger}, stats, cmp, split, levels...)
302 1 : return m
303 : }
304 :
305 : func (m *mergingIter) init(
306 : opts *IterOptions,
307 : stats *base.InternalIteratorStats,
308 : cmp Compare,
309 : split Split,
310 : levels ...mergingIterLevel,
311 1 : ) {
312 1 : m.err = nil // clear cached iteration error
313 1 : m.logger = opts.getLogger()
314 1 : if opts != nil {
315 1 : m.lower = opts.LowerBound
316 1 : m.upper = opts.UpperBound
317 1 : }
318 1 : m.snapshot = InternalKeySeqNumMax
319 1 : m.batchSnapshot = InternalKeySeqNumMax
320 1 : m.levels = levels
321 1 : m.heap.cmp = cmp
322 1 : m.split = split
323 1 : m.stats = stats
324 1 : if cap(m.heap.items) < len(levels) {
325 1 : m.heap.items = make([]*mergingIterLevel, 0, len(levels))
326 1 : } else {
327 1 : m.heap.items = m.heap.items[:0]
328 1 : }
329 1 : for l := range m.levels {
330 1 : m.levels[l].index = l
331 1 : }
332 : }
333 :
334 1 : func (m *mergingIter) initHeap() {
335 1 : m.heap.items = m.heap.items[:0]
336 1 : for i := range m.levels {
337 1 : if l := &m.levels[i]; l.iterKey != nil {
338 1 : m.heap.items = append(m.heap.items, l)
339 1 : }
340 : }
341 1 : m.heap.init()
342 : }
343 :
344 1 : func (m *mergingIter) initMinHeap() error {
345 1 : m.dir = 1
346 1 : m.heap.reverse = false
347 1 : m.initHeap()
348 1 : return m.initMinRangeDelIters(-1)
349 1 : }
350 :
351 : // The level of the previous top element was oldTopLevel. Note that all range delete
352 : // iterators < oldTopLevel are positioned past the key of the previous top element and
353 : // the range delete iterator == oldTopLevel is positioned at or past the key of the
354 : // previous top element. We need to position the range delete iterators from oldTopLevel + 1
355 : // to the level of the current top element.
356 1 : func (m *mergingIter) initMinRangeDelIters(oldTopLevel int) error {
357 1 : if m.heap.len() == 0 {
358 1 : return nil
359 1 : }
360 :
361 : // Position the range-del iterators at levels <= m.heap.items[0].index.
362 1 : item := m.heap.items[0]
363 1 : for level := oldTopLevel + 1; level <= item.index; level++ {
364 1 : l := &m.levels[level]
365 1 : if l.rangeDelIter == nil {
366 1 : continue
367 : }
368 1 : var err error
369 1 : l.tombstone, err = l.rangeDelIter.SeekGE(item.iterKey.UserKey)
370 1 : if err != nil {
371 1 : return err
372 1 : }
373 : }
374 1 : return nil
375 : }
376 :
377 1 : func (m *mergingIter) initMaxHeap() error {
378 1 : m.dir = -1
379 1 : m.heap.reverse = true
380 1 : m.initHeap()
381 1 : return m.initMaxRangeDelIters(-1)
382 1 : }
383 :
384 : // The level of the previous top element was oldTopLevel. Note that all range delete
385 : // iterators < oldTopLevel are positioned before the key of the previous top element and
386 : // the range delete iterator == oldTopLevel is positioned at or before the key of the
387 : // previous top element. We need to position the range delete iterators from oldTopLevel + 1
388 : // to the level of the current top element.
389 1 : func (m *mergingIter) initMaxRangeDelIters(oldTopLevel int) error {
390 1 : if m.heap.len() == 0 {
391 1 : return nil
392 1 : }
393 : // Position the range-del iterators at levels <= m.heap.items[0].index.
394 1 : item := m.heap.items[0]
395 1 : for level := oldTopLevel + 1; level <= item.index; level++ {
396 1 : l := &m.levels[level]
397 1 : if l.rangeDelIter == nil {
398 1 : continue
399 : }
400 1 : tomb, err := keyspan.SeekLE(m.heap.cmp, l.rangeDelIter, item.iterKey.UserKey)
401 1 : if err != nil {
402 1 : return err
403 1 : }
404 1 : l.tombstone = tomb
405 : }
406 1 : return nil
407 : }
408 :
409 1 : func (m *mergingIter) switchToMinHeap() error {
410 1 : if m.heap.len() == 0 {
411 1 : if m.lower != nil {
412 1 : m.SeekGE(m.lower, base.SeekGEFlagsNone)
413 1 : } else {
414 1 : m.First()
415 1 : }
416 1 : return m.err
417 : }
418 :
419 : // We're switching from using a max heap to a min heap. We need to advance
420 : // any iterator that is less than or equal to the current key. Consider the
421 : // scenario where we have 2 iterators being merged (user-key:seq-num):
422 : //
423 : // i1: *a:2 b:2
424 : // i2: a:1 b:1
425 : //
426 : // The current key is a:2 and i2 is pointed at a:1. When we switch to forward
427 : // iteration, we want to return a key that is greater than a:2.
428 :
429 1 : key := m.heap.items[0].iterKey
430 1 : cur := m.heap.items[0]
431 1 :
432 1 : for i := range m.levels {
433 1 : l := &m.levels[i]
434 1 : if l == cur {
435 1 : continue
436 : }
437 :
438 : // If the iterator is exhausted, it may be out of bounds if range
439 : // deletions modified our search key as we descended. we need to
440 : // reposition it within the search bounds. If the current key is a
441 : // range tombstone, the iterator might still be exhausted but at a
442 : // sstable boundary sentinel. It would be okay to reposition an
443 : // interator like this only through successive Next calls, except that
444 : // it would violate the levelIter's invariants by causing it to return
445 : // a key before the lower bound.
446 : //
447 : // bounds = [ f, _ )
448 : // L0: [ b ] [ f* z ]
449 : // L1: [ a |----| k y ]
450 : // L2: [ c (d) ] [ e g m ]
451 : // L3: [ x ]
452 : //
453 : // * - current key [] - table bounds () - heap item
454 : //
455 : // In the above diagram, the L2 iterator is positioned at a sstable
456 : // boundary (d) outside the lower bound (f). It arrived here from a
457 : // seek whose seek-key was modified by a range tombstone. If we called
458 : // Next on the L2 iterator, it would return e, violating its lower
459 : // bound. Instead, we seek it to >= f and Next from there.
460 :
461 1 : if l.iterKey == nil || (m.lower != nil && l.isSyntheticIterBoundsKey &&
462 1 : l.iterKey.IsExclusiveSentinel() &&
463 1 : m.heap.cmp(l.iterKey.UserKey, m.lower) <= 0) {
464 1 : if m.lower != nil {
465 1 : l.iterKey, l.iterValue = l.iter.SeekGE(m.lower, base.SeekGEFlagsNone)
466 1 : } else {
467 1 : l.iterKey, l.iterValue = l.iter.First()
468 1 : }
469 1 : if l.iterKey == nil {
470 1 : if err := l.iter.Error(); err != nil {
471 1 : return err
472 1 : }
473 : }
474 : }
475 1 : for ; l.iterKey != nil; l.iterKey, l.iterValue = l.iter.Next() {
476 1 : if base.InternalCompare(m.heap.cmp, *key, *l.iterKey) < 0 {
477 1 : // key < iter-key
478 1 : break
479 : }
480 : // key >= iter-key
481 : }
482 1 : if l.iterKey == nil {
483 1 : if err := l.iter.Error(); err != nil {
484 1 : return err
485 1 : }
486 : }
487 : }
488 :
489 : // Special handling for the current iterator because we were using its key
490 : // above. The iterator cur.iter may still be exhausted at a sstable boundary
491 : // sentinel. Similar to the logic applied to the other levels, in these
492 : // cases we seek the iterator to the first key in order to avoid violating
493 : // levelIter's invariants. See the example in the for loop above.
494 1 : if m.lower != nil && cur.isSyntheticIterBoundsKey && cur.iterKey.IsExclusiveSentinel() &&
495 1 : m.heap.cmp(cur.iterKey.UserKey, m.lower) <= 0 {
496 1 : cur.iterKey, cur.iterValue = cur.iter.SeekGE(m.lower, base.SeekGEFlagsNone)
497 1 : } else {
498 1 : cur.iterKey, cur.iterValue = cur.iter.Next()
499 1 : }
500 1 : if cur.iterKey == nil {
501 1 : if err := cur.iter.Error(); err != nil {
502 1 : return err
503 1 : }
504 : }
505 1 : return m.initMinHeap()
506 : }
507 :
508 1 : func (m *mergingIter) switchToMaxHeap() error {
509 1 : if m.heap.len() == 0 {
510 1 : if m.upper != nil {
511 1 : m.SeekLT(m.upper, base.SeekLTFlagsNone)
512 1 : } else {
513 1 : m.Last()
514 1 : }
515 1 : return m.err
516 : }
517 :
518 : // We're switching from using a min heap to a max heap. We need to backup any
519 : // iterator that is greater than or equal to the current key. Consider the
520 : // scenario where we have 2 iterators being merged (user-key:seq-num):
521 : //
522 : // i1: a:2 *b:2
523 : // i2: a:1 b:1
524 : //
525 : // The current key is b:2 and i2 is pointing at b:1. When we switch to
526 : // reverse iteration, we want to return a key that is less than b:2.
527 1 : key := m.heap.items[0].iterKey
528 1 : cur := m.heap.items[0]
529 1 :
530 1 : for i := range m.levels {
531 1 : l := &m.levels[i]
532 1 : if l == cur {
533 1 : continue
534 : }
535 :
536 : // If the iterator is exhausted, it may be out of bounds if range
537 : // deletions modified our search key as we descended. we need to
538 : // reposition it within the search bounds. If the current key is a
539 : // range tombstone, the iterator might still be exhausted but at a
540 : // sstable boundary sentinel. It would be okay to reposition an
541 : // interator like this only through successive Prev calls, except that
542 : // it would violate the levelIter's invariants by causing it to return
543 : // a key beyond the upper bound.
544 : //
545 : // bounds = [ _, g )
546 : // L0: [ b ] [ f* z ]
547 : // L1: [ a |-------| k y ]
548 : // L2: [ c d ] h [(i) m ]
549 : // L3: [ e x ]
550 : //
551 : // * - current key [] - table bounds () - heap item
552 : //
553 : // In the above diagram, the L2 iterator is positioned at a sstable
554 : // boundary (i) outside the upper bound (g). It arrived here from a
555 : // seek whose seek-key was modified by a range tombstone. If we called
556 : // Prev on the L2 iterator, it would return h, violating its upper
557 : // bound. Instead, we seek it to < g, and Prev from there.
558 :
559 1 : if l.iterKey == nil || (m.upper != nil && l.isSyntheticIterBoundsKey &&
560 1 : l.iterKey.IsExclusiveSentinel() && m.heap.cmp(l.iterKey.UserKey, m.upper) >= 0) {
561 1 : if m.upper != nil {
562 1 : l.iterKey, l.iterValue = l.iter.SeekLT(m.upper, base.SeekLTFlagsNone)
563 1 : } else {
564 1 : l.iterKey, l.iterValue = l.iter.Last()
565 1 : }
566 1 : if l.iterKey == nil {
567 1 : if err := l.iter.Error(); err != nil {
568 0 : return err
569 0 : }
570 : }
571 : }
572 1 : for ; l.iterKey != nil; l.iterKey, l.iterValue = l.iter.Prev() {
573 1 : if base.InternalCompare(m.heap.cmp, *key, *l.iterKey) > 0 {
574 1 : // key > iter-key
575 1 : break
576 : }
577 : // key <= iter-key
578 : }
579 1 : if l.iterKey == nil {
580 1 : if err := l.iter.Error(); err != nil {
581 0 : return err
582 0 : }
583 : }
584 : }
585 :
586 : // Special handling for the current iterator because we were using its key
587 : // above. The iterator cur.iter may still be exhausted at a sstable boundary
588 : // sentinel. Similar to the logic applied to the other levels, in these
589 : // cases we seek the iterator to in order to avoid violating levelIter's
590 : // invariants by Prev-ing through files. See the example in the for loop
591 : // above.
592 1 : if m.upper != nil && cur.isSyntheticIterBoundsKey && cur.iterKey.IsExclusiveSentinel() &&
593 1 : m.heap.cmp(cur.iterKey.UserKey, m.upper) >= 0 {
594 1 : cur.iterKey, cur.iterValue = cur.iter.SeekLT(m.upper, base.SeekLTFlagsNone)
595 1 : } else {
596 1 : cur.iterKey, cur.iterValue = cur.iter.Prev()
597 1 : }
598 1 : if cur.iterKey == nil {
599 1 : if err := cur.iter.Error(); err != nil {
600 1 : return err
601 1 : }
602 : }
603 1 : return m.initMaxHeap()
604 : }
605 :
606 : // nextEntry unconditionally steps to the next entry. item is the current top
607 : // item in the heap.
608 1 : func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
609 1 : // INVARIANT: If in prefix iteration mode, item.iterKey must have a prefix equal
610 1 : // to m.prefix. This invariant is important for ensuring TrySeekUsingNext
611 1 : // optimizations behave correctly.
612 1 : //
613 1 : // During prefix iteration, the iterator does not have a full view of the
614 1 : // LSM. Some level iterators may omit keys that are known to fall outside
615 1 : // the seek prefix (eg, due to sstable bloom filter exclusion). It's
616 1 : // important that in such cases we don't position any iterators beyond
617 1 : // m.prefix, because doing so may interfere with future seeks.
618 1 : //
619 1 : // Let prefixes P1 < P2 < P3. Imagine a SeekPrefixGE to prefix P1, followed
620 1 : // by a SeekPrefixGE to prefix P2. Imagine there exist live keys at prefix
621 1 : // P2, but they're not visible to the SeekPrefixGE(P1) (because of
622 1 : // bloom-filter exclusion or a range tombstone that deletes prefix P1 but
623 1 : // not P2). If the SeekPrefixGE(P1) is allowed to move any level iterators
624 1 : // to P3, the SeekPrefixGE(P2, TrySeekUsingNext=true) may mistakenly think
625 1 : // the level contains no point keys or range tombstones within the prefix
626 1 : // P2. Care is taken to avoid ever advancing the iterator beyond the current
627 1 : // prefix. If nextEntry is ever invoked while we're already beyond the
628 1 : // current prefix, we're violating the invariant.
629 1 : if invariants.Enabled && m.prefix != nil {
630 1 : if s := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:s]) {
631 0 : m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
632 0 : m.prefix, l.iterKey, debug.Stack())
633 0 : }
634 : }
635 :
636 1 : oldTopLevel := l.index
637 1 : oldRangeDelIter := l.rangeDelIter
638 1 :
639 1 : if succKey == nil {
640 1 : l.iterKey, l.iterValue = l.iter.Next()
641 1 : } else {
642 1 : l.iterKey, l.iterValue = l.iter.NextPrefix(succKey)
643 1 : }
644 :
645 1 : if l.iterKey == nil {
646 1 : if err := l.iter.Error(); err != nil {
647 1 : return err
648 1 : }
649 1 : m.heap.pop()
650 1 : } else {
651 1 : if m.prefix != nil && !bytes.Equal(m.prefix, l.iterKey.UserKey[:m.split(l.iterKey.UserKey)]) {
652 1 : // Set keys without a matching prefix to their zero values when in prefix
653 1 : // iteration mode and remove iterated level from heap.
654 1 : l.iterKey = nil
655 1 : l.iterValue = base.LazyValue{}
656 1 : m.heap.pop()
657 1 : } else if m.heap.len() > 1 {
658 1 : m.heap.fix(0)
659 1 : }
660 1 : if l.rangeDelIter != oldRangeDelIter {
661 1 : // The rangeDelIter changed which indicates that the l.iter moved to the
662 1 : // next sstable. We have to update the tombstone for oldTopLevel as well.
663 1 : oldTopLevel--
664 1 : }
665 : }
666 :
667 : // The cached tombstones are only valid for the levels
668 : // [0,oldTopLevel]. Updated the cached tombstones for any levels in the range
669 : // [oldTopLevel+1,heap[0].index].
670 1 : return m.initMinRangeDelIters(oldTopLevel)
671 : }
672 :
673 : // isNextEntryDeleted starts from the current entry (as the next entry) and if
674 : // it is deleted, moves the iterators forward as needed and returns true, else
675 : // it returns false. item is the top item in the heap. If any of the required
676 : // iterator operations error, the error is returned without updating m.err.
677 : //
678 : // During prefix iteration mode, isNextEntryDeleted will exhaust the iterator by
679 : // clearing the heap if the deleted key(s) extend beyond the iteration prefix
680 : // during prefix-iteration mode.
681 1 : func (m *mergingIter) isNextEntryDeleted(item *mergingIterLevel) (bool, error) {
682 1 : // Look for a range deletion tombstone containing item.iterKey at higher
683 1 : // levels (level < item.index). If we find such a range tombstone we know
684 1 : // it deletes the key in the current level. Also look for a range
685 1 : // deletion at the current level (level == item.index). If we find such a
686 1 : // range deletion we need to check whether it is newer than the current
687 1 : // entry.
688 1 : for level := 0; level <= item.index; level++ {
689 1 : l := &m.levels[level]
690 1 : if l.rangeDelIter == nil || l.tombstone == nil {
691 1 : // If l.tombstone is nil, there are no further tombstones
692 1 : // in the current sstable in the current (forward) iteration
693 1 : // direction.
694 1 : continue
695 : }
696 1 : if m.heap.cmp(l.tombstone.End, item.iterKey.UserKey) <= 0 {
697 1 : // The current key is at or past the tombstone end key.
698 1 : //
699 1 : // NB: for the case that this l.rangeDelIter is provided by a levelIter we know that
700 1 : // the levelIter must be positioned at a key >= item.iterKey. So it is sufficient to seek the
701 1 : // current l.rangeDelIter (since any range del iterators that will be provided by the
702 1 : // levelIter in the future cannot contain item.iterKey). Also, it is possible that we
703 1 : // will encounter parts of the range delete that should be ignored -- we handle that
704 1 : // below.
705 1 : var err error
706 1 : l.tombstone, err = l.rangeDelIter.SeekGE(item.iterKey.UserKey)
707 1 : if err != nil {
708 1 : return false, err
709 1 : }
710 : }
711 1 : if l.tombstone == nil {
712 1 : continue
713 : }
714 :
715 1 : if l.tombstone.VisibleAt(m.snapshot) && l.tombstone.Contains(m.heap.cmp, item.iterKey.UserKey) {
716 1 : if level < item.index {
717 1 : // We could also do m.seekGE(..., level + 1). The levels from
718 1 : // [level + 1, item.index) are already after item.iterKey so seeking them may be
719 1 : // wasteful.
720 1 :
721 1 : // We can seek up to tombstone.End.
722 1 : //
723 1 : // Progress argument: Since this file is at a higher level than item.iterKey we know
724 1 : // that the iterator in this file must be positioned within its bounds and at a key
725 1 : // X > item.iterKey (otherwise it would be the min of the heap). It is not
726 1 : // possible for X.UserKey == item.iterKey.UserKey, since it is incompatible with
727 1 : // X > item.iterKey (a lower version cannot be in a higher sstable), so it must be that
728 1 : // X.UserKey > item.iterKey.UserKey. Which means l.largestUserKey > item.key.UserKey.
729 1 : // We also know that l.tombstone.End > item.iterKey.UserKey. So the min of these,
730 1 : // seekKey, computed below, is > item.iterKey.UserKey, so the call to seekGE() will
731 1 : // make forward progress.
732 1 : seekKey := l.tombstone.End
733 1 : // This seek is not directly due to a SeekGE call, so we don't know
734 1 : // enough about the underlying iterator positions, and so we keep the
735 1 : // try-seek-using-next optimization disabled. Additionally, if we're in
736 1 : // prefix-seek mode and a re-seek would have moved us past the original
737 1 : // prefix, we can remove all merging iter levels below the rangedel
738 1 : // tombstone's level and return immediately instead of re-seeking. This
739 1 : // is correct since those levels cannot provide a key that matches the
740 1 : // prefix, and is also visible. Additionally, this is important to make
741 1 : // subsequent `TrySeekUsingNext` work correctly, as a re-seek on a
742 1 : // different prefix could have resulted in this iterator skipping visible
743 1 : // keys at prefixes in between m.prefix and seekKey, that are currently
744 1 : // not in the heap due to a bloom filter mismatch.
745 1 : //
746 1 : // Additionally, we set the relative-seek flag. This is
747 1 : // important when iterating with lazy combined iteration. If
748 1 : // there's a range key between this level's current file and the
749 1 : // file the seek will land on, we need to detect it in order to
750 1 : // trigger construction of the combined iterator.
751 1 : if m.prefix != nil {
752 1 : if n := m.split(seekKey); !bytes.Equal(m.prefix, seekKey[:n]) {
753 1 : for i := item.index; i < len(m.levels); i++ {
754 1 : // Remove this level from the heap. Setting iterKey and iterValue
755 1 : // to their zero values should be sufficient for initMinHeap to not
756 1 : // re-initialize the heap with them in it. Other fields in
757 1 : // mergingIterLevel can remain as-is; the iter/rangeDelIter needs
758 1 : // to stay intact for future trySeekUsingNexts to work, the level
759 1 : // iter boundary context is owned by the levelIter which is not
760 1 : // being repositioned, and any tombstones in these levels will be
761 1 : // irrelevant for us anyway.
762 1 : m.levels[i].iterKey = nil
763 1 : m.levels[i].iterValue = base.LazyValue{}
764 1 : }
765 : // TODO(bilal): Consider a more efficient way of removing levels from
766 : // the heap without reinitializing all of it. This would likely
767 : // necessitate tracking the heap positions of each mergingIterHeap
768 : // item in the mergingIterLevel, and then swapping that item in the
769 : // heap with the last-positioned heap item, and shrinking the heap by
770 : // one.
771 1 : if err := m.initMinHeap(); err != nil {
772 0 : return false, err
773 0 : }
774 1 : return true, nil
775 : }
776 : }
777 1 : if err := m.seekGE(seekKey, item.index, base.SeekGEFlagsNone.EnableRelativeSeek()); err != nil {
778 0 : return false, err
779 0 : }
780 1 : return true, nil
781 : }
782 1 : if l.tombstone.CoversAt(m.snapshot, item.iterKey.SeqNum()) {
783 1 : if err := m.nextEntry(item, nil /* succKey */); err != nil {
784 0 : return false, err
785 0 : }
786 1 : return true, nil
787 : }
788 : }
789 : }
790 1 : return false, nil
791 : }
792 :
793 : // Starting from the current entry, finds the first (next) entry that can be returned.
794 : //
795 : // If an error occurs, m.err is updated to hold the error and findNextentry
796 : // returns a nil internal key.
797 1 : func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {
798 1 : for m.heap.len() > 0 && m.err == nil {
799 1 : item := m.heap.items[0]
800 1 : if m.levels[item.index].isSyntheticIterBoundsKey {
801 1 : break
802 : }
803 :
804 1 : m.addItemStats(item)
805 1 :
806 1 : // Skip ignorable boundary keys. These are not real keys and exist to
807 1 : // keep sstables open until we've surpassed their end boundaries so that
808 1 : // their range deletions are visible.
809 1 : if m.levels[item.index].isIgnorableBoundaryKey {
810 1 : m.err = m.nextEntry(item, nil /* succKey */)
811 1 : if m.err != nil {
812 0 : return nil, base.LazyValue{}
813 0 : }
814 1 : continue
815 : }
816 :
817 : // Check if the heap root key is deleted by a range tombstone in a
818 : // higher level. If it is, isNextEntryDeleted will advance the iterator
819 : // to a later key (through seeking or nexting).
820 1 : isDeleted, err := m.isNextEntryDeleted(item)
821 1 : if err != nil {
822 1 : m.err = err
823 1 : return nil, base.LazyValue{}
824 1 : } else if isDeleted {
825 1 : m.stats.PointsCoveredByRangeTombstones++
826 1 : continue
827 : }
828 :
829 : // Check if the key is visible at the iterator sequence numbers.
830 1 : if !item.iterKey.Visible(m.snapshot, m.batchSnapshot) {
831 1 : m.err = m.nextEntry(item, nil /* succKey */)
832 1 : if m.err != nil {
833 0 : return nil, base.LazyValue{}
834 0 : }
835 1 : continue
836 : }
837 :
838 : // The heap root is visible and not deleted by any range tombstones.
839 : // Return it.
840 1 : return item.iterKey, item.iterValue
841 : }
842 1 : return nil, base.LazyValue{}
843 : }
844 :
845 : // Steps to the prev entry. item is the current top item in the heap.
846 1 : func (m *mergingIter) prevEntry(l *mergingIterLevel) error {
847 1 : oldTopLevel := l.index
848 1 : oldRangeDelIter := l.rangeDelIter
849 1 : if l.iterKey, l.iterValue = l.iter.Prev(); l.iterKey != nil {
850 1 : if m.heap.len() > 1 {
851 1 : m.heap.fix(0)
852 1 : }
853 1 : if l.rangeDelIter != oldRangeDelIter && l.rangeDelIter != nil {
854 1 : // The rangeDelIter changed which indicates that the l.iter moved to the
855 1 : // previous sstable. We have to update the tombstone for oldTopLevel as
856 1 : // well.
857 1 : oldTopLevel--
858 1 : }
859 1 : } else {
860 1 : if err := l.iter.Error(); err != nil {
861 1 : return err
862 1 : }
863 1 : m.heap.pop()
864 : }
865 :
866 : // The cached tombstones are only valid for the levels
867 : // [0,oldTopLevel]. Updated the cached tombstones for any levels in the range
868 : // [oldTopLevel+1,heap[0].index].
869 1 : return m.initMaxRangeDelIters(oldTopLevel)
870 : }
871 :
872 : // isPrevEntryDeleted() starts from the current entry (as the prev entry) and if it is deleted,
873 : // moves the iterators backward as needed and returns true, else it returns false. item is the top
874 : // item in the heap.
875 1 : func (m *mergingIter) isPrevEntryDeleted(item *mergingIterLevel) (bool, error) {
876 1 : // Look for a range deletion tombstone containing item.iterKey at higher
877 1 : // levels (level < item.index). If we find such a range tombstone we know
878 1 : // it deletes the key in the current level. Also look for a range
879 1 : // deletion at the current level (level == item.index). If we find such a
880 1 : // range deletion we need to check whether it is newer than the current
881 1 : // entry.
882 1 : for level := 0; level <= item.index; level++ {
883 1 : l := &m.levels[level]
884 1 : if l.rangeDelIter == nil || l.tombstone == nil {
885 1 : // If l.tombstone is nil, there are no further tombstones
886 1 : // in the current sstable in the current (reverse) iteration
887 1 : // direction.
888 1 : continue
889 : }
890 1 : if m.heap.cmp(item.iterKey.UserKey, l.tombstone.Start) < 0 {
891 1 : // The current key is before the tombstone start key.
892 1 : //
893 1 : // NB: for the case that this l.rangeDelIter is provided by a levelIter we know that
894 1 : // the levelIter must be positioned at a key < item.iterKey. So it is sufficient to seek the
895 1 : // current l.rangeDelIter (since any range del iterators that will be provided by the
896 1 : // levelIter in the future cannot contain item.iterKey). Also, it is it is possible that we
897 1 : // will encounter parts of the range delete that should be ignored -- we handle that
898 1 : // below.
899 1 :
900 1 : tomb, err := keyspan.SeekLE(m.heap.cmp, l.rangeDelIter, item.iterKey.UserKey)
901 1 : if err != nil {
902 0 : return false, err
903 0 : }
904 1 : l.tombstone = tomb
905 : }
906 1 : if l.tombstone == nil {
907 1 : continue
908 : }
909 1 : if l.tombstone.Contains(m.heap.cmp, item.iterKey.UserKey) && l.tombstone.VisibleAt(m.snapshot) {
910 1 : if level < item.index {
911 1 : // We could also do m.seekLT(..., level + 1). The levels from
912 1 : // [level + 1, item.index) are already before item.iterKey so seeking them may be
913 1 : // wasteful.
914 1 :
915 1 : // We can seek up to tombstone.Start.UserKey.
916 1 : //
917 1 : // Progress argument: We know that the iterator in this file is positioned within
918 1 : // its bounds and at a key X < item.iterKey (otherwise it would be the max of the heap).
919 1 : // So smallestUserKey <= item.iterKey.UserKey and we already know that
920 1 : // l.tombstone.Start.UserKey <= item.iterKey.UserKey. So the seekKey computed below
921 1 : // is <= item.iterKey.UserKey, and since we do a seekLT() we will make backwards
922 1 : // progress.
923 1 : seekKey := l.tombstone.Start
924 1 : // We set the relative-seek flag. This is important when
925 1 : // iterating with lazy combined iteration. If there's a range
926 1 : // key between this level's current file and the file the seek
927 1 : // will land on, we need to detect it in order to trigger
928 1 : // construction of the combined iterator.
929 1 : if err := m.seekLT(seekKey, item.index, base.SeekLTFlagsNone.EnableRelativeSeek()); err != nil {
930 1 : return false, err
931 1 : }
932 1 : return true, nil
933 : }
934 1 : if l.tombstone.CoversAt(m.snapshot, item.iterKey.SeqNum()) {
935 1 : if err := m.prevEntry(item); err != nil {
936 0 : return false, err
937 0 : }
938 1 : return true, nil
939 : }
940 : }
941 : }
942 1 : return false, nil
943 : }
944 :
945 : // Starting from the current entry, finds the first (prev) entry that can be returned.
946 : //
947 : // If an error occurs, m.err is updated to hold the error and findNextentry
948 : // returns a nil internal key.
949 1 : func (m *mergingIter) findPrevEntry() (*InternalKey, base.LazyValue) {
950 1 : for m.heap.len() > 0 && m.err == nil {
951 1 : item := m.heap.items[0]
952 1 : if m.levels[item.index].isSyntheticIterBoundsKey {
953 1 : break
954 : }
955 1 : m.addItemStats(item)
956 1 : if isDeleted, err := m.isPrevEntryDeleted(item); err != nil {
957 1 : m.err = err
958 1 : return nil, base.LazyValue{}
959 1 : } else if isDeleted {
960 1 : m.stats.PointsCoveredByRangeTombstones++
961 1 : continue
962 : }
963 1 : if item.iterKey.Visible(m.snapshot, m.batchSnapshot) &&
964 1 : (!m.levels[item.index].isIgnorableBoundaryKey) {
965 1 : return item.iterKey, item.iterValue
966 1 : }
967 1 : m.err = m.prevEntry(item)
968 : }
969 1 : return nil, base.LazyValue{}
970 : }
971 :
972 : // Seeks levels >= level to >= key. Additionally uses range tombstones to extend the seeks.
973 : //
974 : // If an error occurs, seekGE returns the error without setting m.err.
975 1 : func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) error {
976 1 : // When seeking, we can use tombstones to adjust the key we seek to on each
977 1 : // level. Consider the series of range tombstones:
978 1 : //
979 1 : // 1: a---e
980 1 : // 2: d---h
981 1 : // 3: g---k
982 1 : // 4: j---n
983 1 : // 5: m---q
984 1 : //
985 1 : // If we SeekGE("b") we also find the tombstone "b" resides within in the
986 1 : // first level which is [a,e). Regardless of whether this tombstone deletes
987 1 : // "b" in that level, we know it deletes "b" in all lower levels, so we
988 1 : // adjust the search key in the next level to the tombstone end key "e". We
989 1 : // then SeekGE("e") in the second level and find the corresponding tombstone
990 1 : // [d,h). This process continues and we end up seeking for "h" in the 3rd
991 1 : // level, "k" in the 4th level and "n" in the last level.
992 1 : //
993 1 : // TODO(peter,rangedel): In addition to the above we can delay seeking a
994 1 : // level (and any lower levels) when the current iterator position is
995 1 : // contained within a range tombstone at a higher level.
996 1 :
997 1 : // Deterministically disable the TrySeekUsingNext optimizations sometimes in
998 1 : // invariant builds to encourage the metamorphic tests to surface bugs. Note
999 1 : // that we cannot disable the optimization within individual levels. It must
1000 1 : // be disabled for all levels or none. If one lower-level iterator performs
1001 1 : // a fresh seek whereas another takes advantage of its current iterator
1002 1 : // position, the heap can become inconsistent. Consider the following
1003 1 : // example:
1004 1 : //
1005 1 : // L5: [ [b-c) ] [ d ]*
1006 1 : // L6: [ b ] [e]*
1007 1 : //
1008 1 : // Imagine a SeekGE(a). The [b-c) range tombstone deletes the L6 point key
1009 1 : // 'b', resulting in the iterator positioned at d with the heap:
1010 1 : //
1011 1 : // {L5: d, L6: e}
1012 1 : //
1013 1 : // A subsequent SeekGE(b) is seeking to a larger key, so the caller may set
1014 1 : // TrySeekUsingNext()=true. If the L5 iterator used the TrySeekUsingNext
1015 1 : // optimization but the L6 iterator did not, the iterator would have the
1016 1 : // heap:
1017 1 : //
1018 1 : // {L6: b, L5: d}
1019 1 : //
1020 1 : // Because the L5 iterator has already advanced to the next sstable, the
1021 1 : // merging iterator cannot observe the [b-c) range tombstone and will
1022 1 : // mistakenly return L6's deleted point key 'b'.
1023 1 : if invariants.Enabled && flags.TrySeekUsingNext() && !m.forceEnableSeekOpt &&
1024 1 : disableSeekOpt(key, uintptr(unsafe.Pointer(m))) {
1025 1 : flags = flags.DisableTrySeekUsingNext()
1026 1 : }
1027 :
1028 1 : for ; level < len(m.levels); level++ {
1029 1 : if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
1030 0 : m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
1031 0 : }
1032 :
1033 1 : l := &m.levels[level]
1034 1 : if m.prefix != nil {
1035 1 : l.iterKey, l.iterValue = l.iter.SeekPrefixGE(m.prefix, key, flags)
1036 1 : if l.iterKey != nil {
1037 1 : if n := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:n]) {
1038 1 : // Prevent keys without a matching prefix from being added to the heap by setting
1039 1 : // iterKey and iterValue to their zero values before calling initMinHeap.
1040 1 : l.iterKey = nil
1041 1 : l.iterValue = base.LazyValue{}
1042 1 : }
1043 : }
1044 1 : } else {
1045 1 : l.iterKey, l.iterValue = l.iter.SeekGE(key, flags)
1046 1 : }
1047 1 : if l.iterKey == nil {
1048 1 : if err := l.iter.Error(); err != nil {
1049 1 : return err
1050 1 : }
1051 : }
1052 :
1053 : // If this level contains overlapping range tombstones, alter the seek
1054 : // key accordingly. Caveat: If we're performing lazy-combined iteration,
1055 : // we cannot alter the seek key: Range tombstones don't delete range
1056 : // keys, and there might exist live range keys within the range
1057 : // tombstone's span that need to be observed to trigger a switch to
1058 : // combined iteration.
1059 1 : if rangeDelIter := l.rangeDelIter; rangeDelIter != nil &&
1060 1 : (m.combinedIterState == nil || m.combinedIterState.initialized) {
1061 1 : // The level has a range-del iterator. Find the tombstone containing
1062 1 : // the search key.
1063 1 : var err error
1064 1 : l.tombstone, err = rangeDelIter.SeekGE(key)
1065 1 : if err != nil {
1066 1 : return err
1067 1 : }
1068 1 : if l.tombstone != nil && l.tombstone.VisibleAt(m.snapshot) && l.tombstone.Contains(m.heap.cmp, key) {
1069 1 : // Based on the containment condition tombstone.End > key, so
1070 1 : // the assignment to key results in a monotonically
1071 1 : // non-decreasing key across iterations of this loop.
1072 1 : //
1073 1 : // The adjustment of key here can only move it to a larger key.
1074 1 : // Since the caller of seekGE guaranteed that the original key
1075 1 : // was greater than or equal to m.lower, the new key will
1076 1 : // continue to be greater than or equal to m.lower.
1077 1 : key = l.tombstone.End
1078 1 : }
1079 : }
1080 : }
1081 1 : return m.initMinHeap()
1082 : }
1083 :
1084 0 : func (m *mergingIter) String() string {
1085 0 : return "merging"
1086 0 : }
1087 :
1088 : // SeekGE implements base.InternalIterator.SeekGE. Note that SeekGE only checks
1089 : // the upper bound. It is up to the caller to ensure that key is greater than
1090 : // or equal to the lower bound.
1091 1 : func (m *mergingIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, base.LazyValue) {
1092 1 : m.prefix = nil
1093 1 : m.err = m.seekGE(key, 0 /* start level */, flags)
1094 1 : if m.err != nil {
1095 1 : return nil, base.LazyValue{}
1096 1 : }
1097 1 : return m.findNextEntry()
1098 : }
1099 :
1100 : // SeekPrefixGE implements base.InternalIterator.SeekPrefixGE.
1101 : func (m *mergingIter) SeekPrefixGE(
1102 : prefix, key []byte, flags base.SeekGEFlags,
1103 1 : ) (*base.InternalKey, base.LazyValue) {
1104 1 : return m.SeekPrefixGEStrict(prefix, key, flags)
1105 1 : }
1106 :
1107 : // SeekPrefixGEStrict implements topLevelIterator.SeekPrefixGEStrict. Note that
1108 : // SeekPrefixGEStrict explicitly checks that the key has a matching prefix.
1109 : func (m *mergingIter) SeekPrefixGEStrict(
1110 : prefix, key []byte, flags base.SeekGEFlags,
1111 1 : ) (*base.InternalKey, base.LazyValue) {
1112 1 : m.prefix = prefix
1113 1 : m.err = m.seekGE(key, 0 /* start level */, flags)
1114 1 : if m.err != nil {
1115 1 : return nil, base.LazyValue{}
1116 1 : }
1117 :
1118 1 : iterKey, iterValue := m.findNextEntry()
1119 1 : if invariants.Enabled && iterKey != nil {
1120 1 : if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
1121 0 : m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKey, m.prefix)
1122 0 : }
1123 : }
1124 1 : return iterKey, iterValue
1125 : }
1126 :
1127 : // Seeks levels >= level to < key. Additionally uses range tombstones to extend the seeks.
1128 1 : func (m *mergingIter) seekLT(key []byte, level int, flags base.SeekLTFlags) error {
1129 1 : // See the comment in seekGE regarding using tombstones to adjust the seek
1130 1 : // target per level.
1131 1 : m.prefix = nil
1132 1 : for ; level < len(m.levels); level++ {
1133 1 : if invariants.Enabled && m.upper != nil && m.heap.cmp(key, m.upper) > 0 {
1134 0 : m.logger.Fatalf("mergingIter: upper bound violation: %s > %s\n%s", key, m.upper, debug.Stack())
1135 0 : }
1136 :
1137 1 : l := &m.levels[level]
1138 1 : l.iterKey, l.iterValue = l.iter.SeekLT(key, flags)
1139 1 : if l.iterKey == nil {
1140 1 : if err := l.iter.Error(); err != nil {
1141 1 : return err
1142 1 : }
1143 : }
1144 :
1145 : // If this level contains overlapping range tombstones, alter the seek
1146 : // key accordingly. Caveat: If we're performing lazy-combined iteration,
1147 : // we cannot alter the seek key: Range tombstones don't delete range
1148 : // keys, and there might exist live range keys within the range
1149 : // tombstone's span that need to be observed to trigger a switch to
1150 : // combined iteration.
1151 1 : if rangeDelIter := l.rangeDelIter; rangeDelIter != nil &&
1152 1 : (m.combinedIterState == nil || m.combinedIterState.initialized) {
1153 1 : // The level has a range-del iterator. Find the tombstone containing
1154 1 : // the search key.
1155 1 : tomb, err := keyspan.SeekLE(m.heap.cmp, rangeDelIter, key)
1156 1 : if err != nil {
1157 1 : return err
1158 1 : }
1159 1 : l.tombstone = tomb
1160 1 : if l.tombstone != nil && l.tombstone.VisibleAt(m.snapshot) &&
1161 1 : l.tombstone.Contains(m.heap.cmp, key) {
1162 1 : // NB: Based on the containment condition
1163 1 : // tombstone.Start.UserKey <= key, so the assignment to key
1164 1 : // results in a monotonically non-increasing key across
1165 1 : // iterations of this loop.
1166 1 : //
1167 1 : // The adjustment of key here can only move it to a smaller key.
1168 1 : // Since the caller of seekLT guaranteed that the original key
1169 1 : // was less than or equal to m.upper, the new key will continue
1170 1 : // to be less than or equal to m.upper.
1171 1 : key = l.tombstone.Start
1172 1 : }
1173 : }
1174 : }
1175 :
1176 1 : return m.initMaxHeap()
1177 : }
1178 :
1179 : // SeekLT implements base.InternalIterator.SeekLT. Note that SeekLT only checks
1180 : // the lower bound. It is up to the caller to ensure that key is less than the
1181 : // upper bound.
1182 1 : func (m *mergingIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, base.LazyValue) {
1183 1 : m.prefix = nil
1184 1 : m.err = m.seekLT(key, 0 /* start level */, flags)
1185 1 : if m.err != nil {
1186 1 : return nil, base.LazyValue{}
1187 1 : }
1188 1 : return m.findPrevEntry()
1189 : }
1190 :
1191 : // First implements base.InternalIterator.First. Note that First only checks
1192 : // the upper bound. It is up to the caller to ensure that key is greater than
1193 : // or equal to the lower bound (e.g. via a call to SeekGE(lower)).
1194 1 : func (m *mergingIter) First() (*InternalKey, base.LazyValue) {
1195 1 : m.err = nil // clear cached iteration error
1196 1 : m.prefix = nil
1197 1 : m.heap.items = m.heap.items[:0]
1198 1 : for i := range m.levels {
1199 1 : l := &m.levels[i]
1200 1 : l.iterKey, l.iterValue = l.iter.First()
1201 1 : if l.iterKey == nil {
1202 1 : if m.err = l.iter.Error(); m.err != nil {
1203 1 : return nil, base.LazyValue{}
1204 1 : }
1205 : }
1206 : }
1207 1 : if m.err = m.initMinHeap(); m.err != nil {
1208 1 : return nil, base.LazyValue{}
1209 1 : }
1210 1 : return m.findNextEntry()
1211 : }
1212 :
1213 : // Last implements base.InternalIterator.Last. Note that Last only checks the
1214 : // lower bound. It is up to the caller to ensure that key is less than the
1215 : // upper bound (e.g. via a call to SeekLT(upper))
1216 1 : func (m *mergingIter) Last() (*InternalKey, base.LazyValue) {
1217 1 : m.err = nil // clear cached iteration error
1218 1 : m.prefix = nil
1219 1 : for i := range m.levels {
1220 1 : l := &m.levels[i]
1221 1 : l.iterKey, l.iterValue = l.iter.Last()
1222 1 : if l.iterKey == nil {
1223 1 : if m.err = l.iter.Error(); m.err != nil {
1224 1 : return nil, base.LazyValue{}
1225 1 : }
1226 : }
1227 : }
1228 1 : if m.err = m.initMaxHeap(); m.err != nil {
1229 1 : return nil, base.LazyValue{}
1230 1 : }
1231 1 : return m.findPrevEntry()
1232 : }
1233 :
1234 1 : func (m *mergingIter) Next() (*InternalKey, base.LazyValue) {
1235 1 : if m.err != nil {
1236 1 : return nil, base.LazyValue{}
1237 1 : }
1238 :
1239 1 : if m.dir != 1 {
1240 1 : if m.err = m.switchToMinHeap(); m.err != nil {
1241 1 : return nil, base.LazyValue{}
1242 1 : }
1243 1 : return m.findNextEntry()
1244 : }
1245 :
1246 1 : if m.heap.len() == 0 {
1247 1 : return nil, base.LazyValue{}
1248 1 : }
1249 :
1250 : // NB: It's okay to call nextEntry directly even during prefix iteration
1251 : // mode. During prefix iteration mode, we rely on the caller to not call
1252 : // Next if the iterator has already advanced beyond the iteration prefix.
1253 : // See the comment above the base.InternalIterator interface.
1254 1 : if m.err = m.nextEntry(m.heap.items[0], nil /* succKey */); m.err != nil {
1255 1 : return nil, base.LazyValue{}
1256 1 : }
1257 :
1258 1 : iterKey, iterValue := m.findNextEntry()
1259 1 : if invariants.Enabled && m.prefix != nil && iterKey != nil {
1260 1 : if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
1261 0 : m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKey, m.prefix)
1262 0 : }
1263 : }
1264 1 : return iterKey, iterValue
1265 : }
1266 :
1267 1 : func (m *mergingIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
1268 1 : if m.dir != 1 {
1269 0 : panic("pebble: cannot switch directions with NextPrefix")
1270 : }
1271 1 : if m.err != nil || m.heap.len() == 0 {
1272 0 : return nil, LazyValue{}
1273 0 : }
1274 1 : if m.levelsPositioned == nil {
1275 1 : m.levelsPositioned = make([]bool, len(m.levels))
1276 1 : } else {
1277 1 : for i := range m.levelsPositioned {
1278 1 : m.levelsPositioned[i] = false
1279 1 : }
1280 : }
1281 :
1282 : // The heap root necessarily must be positioned at a key < succKey, because
1283 : // NextPrefix was invoked.
1284 1 : root := &m.heap.items[0]
1285 1 : m.levelsPositioned[(*root).index] = true
1286 1 : if invariants.Enabled && m.heap.cmp((*root).iterKey.UserKey, succKey) >= 0 {
1287 0 : m.logger.Fatalf("pebble: invariant violation: NextPrefix(%q) called on merging iterator already positioned at %q",
1288 0 : succKey, (*root).iterKey)
1289 0 : }
1290 1 : if m.err = m.nextEntry(*root, succKey); m.err != nil {
1291 1 : return nil, base.LazyValue{}
1292 1 : }
1293 : // NB: root is a pointer to the heap root. nextEntry may have changed
1294 : // the heap root, so we must not expect root to still point to the same
1295 : // level (or to even be valid, if the heap is now exhaused).
1296 :
1297 1 : for m.heap.len() > 0 {
1298 1 : if m.levelsPositioned[(*root).index] {
1299 1 : // A level we've previously positioned is at the top of the heap, so
1300 1 : // there are no other levels positioned at keys < succKey. We've
1301 1 : // advanced as far as we need to.
1302 1 : break
1303 : }
1304 : // Since this level was not the original heap root when NextPrefix was
1305 : // called, we don't know whether this level's current key has the
1306 : // previous prefix or a new one.
1307 1 : if m.heap.cmp((*root).iterKey.UserKey, succKey) >= 0 {
1308 1 : break
1309 : }
1310 1 : m.levelsPositioned[(*root).index] = true
1311 1 : if m.err = m.nextEntry(*root, succKey); m.err != nil {
1312 0 : return nil, base.LazyValue{}
1313 0 : }
1314 : }
1315 1 : return m.findNextEntry()
1316 : }
1317 :
1318 1 : func (m *mergingIter) Prev() (*InternalKey, base.LazyValue) {
1319 1 : if m.err != nil {
1320 0 : return nil, base.LazyValue{}
1321 0 : }
1322 :
1323 1 : if m.dir != -1 {
1324 1 : if m.prefix != nil {
1325 1 : m.err = errors.New("pebble: unsupported reverse prefix iteration")
1326 1 : return nil, base.LazyValue{}
1327 1 : }
1328 1 : if m.err = m.switchToMaxHeap(); m.err != nil {
1329 1 : return nil, base.LazyValue{}
1330 1 : }
1331 1 : return m.findPrevEntry()
1332 : }
1333 :
1334 1 : if m.heap.len() == 0 {
1335 1 : return nil, base.LazyValue{}
1336 1 : }
1337 1 : if m.err = m.prevEntry(m.heap.items[0]); m.err != nil {
1338 1 : return nil, base.LazyValue{}
1339 1 : }
1340 1 : return m.findPrevEntry()
1341 : }
1342 :
1343 1 : func (m *mergingIter) Error() error {
1344 1 : if m.heap.len() == 0 || m.err != nil {
1345 1 : return m.err
1346 1 : }
1347 1 : return m.levels[m.heap.items[0].index].iter.Error()
1348 : }
1349 :
1350 1 : func (m *mergingIter) Close() error {
1351 1 : for i := range m.levels {
1352 1 : iter := m.levels[i].iter
1353 1 : if err := iter.Close(); err != nil && m.err == nil {
1354 0 : m.err = err
1355 0 : }
1356 1 : if rangeDelIter := m.levels[i].rangeDelIter; rangeDelIter != nil {
1357 1 : if err := rangeDelIter.Close(); err != nil && m.err == nil {
1358 0 : m.err = err
1359 0 : }
1360 : }
1361 : }
1362 1 : m.levels = nil
1363 1 : m.heap.items = m.heap.items[:0]
1364 1 : return m.err
1365 : }
1366 :
1367 1 : func (m *mergingIter) SetBounds(lower, upper []byte) {
1368 1 : m.prefix = nil
1369 1 : m.lower = lower
1370 1 : m.upper = upper
1371 1 : for i := range m.levels {
1372 1 : m.levels[i].iter.SetBounds(lower, upper)
1373 1 : }
1374 1 : m.heap.clear()
1375 : }
1376 :
1377 1 : func (m *mergingIter) SetContext(ctx context.Context) {
1378 1 : for i := range m.levels {
1379 1 : m.levels[i].iter.SetContext(ctx)
1380 1 : }
1381 : }
1382 :
1383 0 : func (m *mergingIter) DebugString() string {
1384 0 : var buf bytes.Buffer
1385 0 : sep := ""
1386 0 : for m.heap.len() > 0 {
1387 0 : item := m.heap.pop()
1388 0 : fmt.Fprintf(&buf, "%s%s", sep, item.iterKey)
1389 0 : sep = " "
1390 0 : }
1391 0 : var err error
1392 0 : if m.dir == 1 {
1393 0 : err = m.initMinHeap()
1394 0 : } else {
1395 0 : err = m.initMaxHeap()
1396 0 : }
1397 0 : if err != nil {
1398 0 : fmt.Fprintf(&buf, "err=<%s>", err)
1399 0 : }
1400 0 : return buf.String()
1401 : }
1402 :
1403 1 : func (m *mergingIter) ForEachLevelIter(fn func(li *levelIter) bool) {
1404 1 : for _, ml := range m.levels {
1405 1 : if ml.levelIter != nil {
1406 1 : if done := fn(ml.levelIter); done {
1407 1 : break
1408 : }
1409 : }
1410 : }
1411 : }
1412 :
1413 1 : func (m *mergingIter) addItemStats(l *mergingIterLevel) {
1414 1 : m.stats.PointCount++
1415 1 : m.stats.KeyBytes += uint64(len(l.iterKey.UserKey))
1416 1 : m.stats.ValueBytes += uint64(len(l.iterValue.ValueOrHandle))
1417 1 : }
1418 :
1419 : var _ internalIterator = &mergingIter{}
|